diff --git a/CHANGELOG.md b/CHANGELOG.md index ce3ed37..e5136db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,66 @@ All notable changes to this project will be documented in this file. +## Unreleased — T3 concurrency + +Adds rayon-backed parallel paths per Section 6 of +`docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md`. + +### Breaking + +- `Send + Sync` bounds added to public traits: `Time`, `Drift`, + `Observer`, `Factor`, `Schedule`. All built-in impls satisfy these + via auto-derive, but downstream custom impls that aren't thread-safe + will need the bounds. + +### New + +- Opt-in `rayon` cargo feature. When enabled: + - Within-slice event iteration runs color-group events in parallel + via `par_iter_mut` (`TimeSlice::sweep_color_groups`). + - `History::learning_curves` computes per-slice posteriors in + parallel, merges sequentially in slice order. + - `History::log_evidence` / `log_evidence_for` use per-slice parallel + computation with deterministic sequential reduction (sum in slice + order) — bit-identical to the sequential baseline. +- `ColorGroups` internal infrastructure with greedy graph coloring + (`src/color_group.rs`). Events sharing no `Index` go into the same + color group; events in the same group can run concurrently without + touching each other's skills. +- `tests/determinism.rs` asserts bit-identical posteriors across + `RAYON_NUM_THREADS={1, 2, 4, 8}`. +- `benches/history_converge.rs` measures end-to-end convergence on + three workload shapes. + +### Performance notes + +- Default build (no rayon): `Batch::iteration` 23.23 µs — no regression + vs T2. +- With `--features rayon`: + - 500 events / 100 competitors / 10 per slice: 1.0× speedup. + - 2000 events / 200 competitors / 20 per slice: 1.0× speedup. + - 5000 events in one slice / 50k competitors: **1.3× speedup.** +- The spec targeted >2× speedup on 8-core offline converge. This is + only achievable on workloads with many events-per-slice AND large + competitor pools. **Typical TrueSkill workloads (tens of events + per slice) do not materially benefit from T3's within-slice + parallelism** because rayon's task-spawn overhead dominates. +- Cross-slice parallelism (dirty-bit slice skipping per spec Section + 5) is the natural next step for real workload speedup — deferred + to a future tier. + +### Internals + +- The parallel path uses an `unsafe` block to concurrently write to + `SkillStore` from color-group-disjoint events. Soundness rests on + the color-group invariant (events in the same color touch no shared + `Index`), which is guaranteed by construction in + `TimeSlice::recompute_color_groups`. Sequential path unchanged. +- `RAYON_THRESHOLD = 64` — color groups smaller than this fall back to + sequential iteration inside the parallel `sweep_color_groups` to + avoid rayon's task-spawn overhead. +- Thread-local `ScratchArena` per rayon worker thread. + ## Unreleased — T2 new API surface Breaking: every renamed type and the new public API land together per diff --git a/Cargo.toml b/Cargo.toml index f0307df..51da65d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,10 +14,19 @@ harness = false name = "gaussian" harness = false +[[bench]] +name = "history_converge" +harness = false + [dependencies] approx = { version = "0.5.1", optional = true } +rayon = { version = "1", optional = true } smallvec = "1" +[features] +approx = ["dep:approx"] +rayon = ["dep:rayon"] + [dev-dependencies] criterion = "0.5" plotters = { version = "0.3", default-features = false, features = ["svg_backend", "all_elements", "all_series"] } diff --git a/benches/baseline.txt b/benches/baseline.txt index 26f63ae..2d6e7f2 100644 --- a/benches/baseline.txt +++ b/benches/baseline.txt @@ -98,3 +98,35 @@ Gaussian::tau 260.80 ps (unchanged) # learning_curves_by_index(), nested-Vec public add_events(). # - 90 tests green: 68 lib + 10 api_shape + 6 game + 4 record_winner + # 2 equivalence. + +# After T3 (2026-04-24, same hardware) + +Batch::iteration (seq, no rayon) 23.23 µs (matches T2 baseline; no regression) +Batch::iteration (rayon, small slice) 24.57 µs (within noise; small workloads pay rayon overhead) +Gaussian::add 236.62 ps (unchanged) +Gaussian::sub 236.43 ps (unchanged) +Gaussian::mul 237.05 ps (unchanged) +Gaussian::div 236.07 ps (unchanged) + +# End-to-end history_converge benchmark (Apple M5 Pro, RAYON_NUM_THREADS=auto): +# workload seq rayon speedup +# 500 events, 100 competitors, 10/slice 4.03 ms 4.24 ms 1.0x +# 2000 events, 200 competitors, 20/slice 20.18 ms 19.82 ms 1.0x +# 5000 events, 50000 competitors, 1 slice 11.88 ms 9.10 ms 1.3x +# +# Notes: +# - T3's within-slice color-group parallelism only materializes a speedup +# when a slice holds many events with disjoint competitor sets. Typical +# TrueSkill workloads (tens of events per slice) don't show measurable +# benefit from rayon. +# - The pre-revert SmallVec experiment hit 2x on the 5000-event workload +# but regressed sequential Batch::iteration by 28%. The tradeoff wasn't +# worth it for typical workloads — ShipVec<[_; 8]> inline size (1 KB per +# Game struct) hurt cache locality on the hot path. +# - Cross-slice parallelism (dirty-bit slice skipping per spec Section 5) +# is the natural next step for realistic TrueSkill workloads and would +# deliver the spec's ~50-500x online-add speedup. Deferred to T4+. +# - Determinism verified: tests/determinism.rs asserts bit-identical +# posteriors across RAYON_NUM_THREADS={1, 2, 4, 8}. +# - Send + Sync bounds added on Time, Drift, Observer, Factor, Schedule. +# - Rayon is opt-in via `--features rayon`. Default build is unchanged from T2. diff --git a/benches/history_converge.rs b/benches/history_converge.rs new file mode 100644 index 0000000..e5163a8 --- /dev/null +++ b/benches/history_converge.rs @@ -0,0 +1,116 @@ +//! End-to-end History::converge benchmark. +//! +//! Workload shapes designed to expose rayon's within-slice color-group +//! parallelism. Events in the same color group are processed in parallel +//! via direct-write with disjoint index sets (no data races). Color groups +//! smaller than a threshold fall back to the sequential path to avoid +//! rayon overhead on small workloads. +//! +//! On Apple M5 Pro, the P-core count (6) is the optimal thread count. +//! The rayon thread pool is initialised to `min(P-cores, available)` to +//! avoid scheduling onto the slower E-cores. +//! +//! ## Results (Apple M5 Pro, 2026-04-24, after SmallVec revert) +//! +//! | Workload | Sequential | Parallel | Speedup | +//! |---------------------------------------------|------------:|-----------:|--------:| +//! | History::converge/500x100@10perslice | 4.03 ms | 4.24 ms | 1.0× | +//! | History::converge/2000x200@20perslice | 20.18 ms | 19.82 ms | 1.0× | +//! | History::converge/1v1-5000x50000@5000perslice| 11.88 ms | 9.10 ms | 1.3× | +//! +//! T3 acceptance gate: ≥2× speedup on at least one workload — NOT achieved after revert. +//! The SmallVec storage that enabled the 2× gate caused a +28% regression in the +//! sequential Batch::iteration benchmark and was reverted. Small workloads still fall +//! below the RAYON_THRESHOLD (64 events/color) and run sequentially with near-zero overhead. + +use criterion::{BatchSize, Criterion, criterion_group, criterion_main}; +use smallvec::smallvec; +use trueskill_tt::{ + ConstantDrift, ConvergenceOptions, Event, History, Member, NullObserver, Outcome, Team, +}; + +fn build_history_1v1( + n_events: usize, + n_competitors: usize, + events_per_slice: usize, + seed: u64, +) -> History { + let mut rng = seed; + let mut next = || { + rng = rng + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + rng + }; + + let mut h = History::::builder_with_key() + .mu(25.0) + .sigma(25.0 / 3.0) + .beta(25.0 / 6.0) + .drift(ConstantDrift(25.0 / 300.0)) + .convergence(ConvergenceOptions { + max_iter: 30, + epsilon: 1e-6, + }) + .build(); + + let mut events: Vec> = Vec::with_capacity(n_events); + for ev_i in 0..n_events { + let a = (next() as usize) % n_competitors; + let mut b = (next() as usize) % n_competitors; + while b == a { + b = (next() as usize) % n_competitors; + } + events.push(Event { + time: (ev_i as i64 / events_per_slice as i64) + 1, + teams: smallvec![ + Team::with_members([Member::new(format!("p{a}"))]), + Team::with_members([Member::new(format!("p{b}"))]), + ], + outcome: Outcome::winner((next() % 2) as u32, 2), + }); + } + h.add_events(events).unwrap(); + h +} + +fn bench_converge(c: &mut Criterion) { + // Two original task workloads (small per-slice event count; + // fall below RAYON_THRESHOLD so sequential path runs — near-zero overhead). + c.bench_function("History::converge/500x100@10perslice", |b| { + b.iter_batched( + || build_history_1v1(500, 100, 10, 42), + |mut h| { + h.converge().unwrap(); + }, + BatchSize::SmallInput, + ); + }); + + c.bench_function("History::converge/2000x200@20perslice", |b| { + b.iter_batched( + || build_history_1v1(2000, 200, 20, 42), + |mut h| { + h.converge().unwrap(); + }, + BatchSize::SmallInput, + ); + }); + + // Large single-slice workload: 5000 events, 50000 competitors. + // All events in one slice → color-0 gets ~4900 disjoint events, well above + // the 64-event RAYON_THRESHOLD. 30 iterations × 1 slice = 30 sweeps, each + // parallelised across P-core threads. Shows ≥2× speedup. + c.bench_function("History::converge/1v1-5000x50000@5000perslice", |b| { + b.iter_batched( + || build_history_1v1(5000, 50000, 5000, 42), + |mut h| { + h.converge().unwrap(); + }, + BatchSize::SmallInput, + ); + }); +} + +criterion_group!(benches, bench_converge); +criterion_main!(benches); diff --git a/docs/superpowers/plans/2026-04-24-t3-concurrency.md b/docs/superpowers/plans/2026-04-24-t3-concurrency.md new file mode 100644 index 0000000..24ec9fe --- /dev/null +++ b/docs/superpowers/plans/2026-04-24-t3-concurrency.md @@ -0,0 +1,1249 @@ +# T3 — Concurrency Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Ship the T3 tier from `docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md` Section 6: `Send + Sync` bounds on public traits, color-group partitioning for within-slice event independence, and rayon-backed parallel paths on within-slice event iteration, `learning_curves`, and `log_evidence_for`. Deterministic posteriors across `RAYON_NUM_THREADS={1, 2, 4, 8}`; >2× speedup on an 8-core offline-converge benchmark. + +**Architecture:** Concurrency lands as a single feature flag (`rayon`, opt-in for T3; the spec suggests default-on but we defer that flip until the feature is proven stable). All parallel paths are hidden behind `#[cfg(feature = "rayon")]` with sequential fallbacks. Within-slice parallelism exploits graph coloring: events sharing no `Index` go into the same color group and run concurrently; across color groups, execution is strictly sequential. This preserves the exact async-EP semantics of T2 at any thread count. Reductions over f64 (`log_evidence_for`, `predict_quality`) use two-stage parallel-then-sequential-reduce so sums are bit-identical regardless of thread count. + +**Tech Stack:** Rust 2024 edition. New optional dependency: `rayon = "1"`. Builds on T2 (`History`, `Event`, `Outcome`, `Observer`, `factors` module). + +## Design decisions + +Called out explicitly so reviewers can override before execution: + +1. **Rayon is opt-in** (`cargo feature`), not default-on. Simplifies CI, keeps the default build lean. We flip to default-on in a follow-up once the feature is shown to be stable under field use. +2. **Greedy graph coloring** for the within-slice partition: for each event in ingestion order, assign the lowest color whose existing members share no `Index` with the event. Optimality is not the target — events per slice is small (~50), and greedy finishes in O(n·c·m) where c is colors and m is team size. Rebalancing is a T4 follow-up if benchmarks show it helps. +3. **Scope of parallelism:** within-slice color groups, `learning_curves`, `log_evidence_for`, `predict_quality`. Cross-slice iteration in `History::converge` stays **sequential** — the forward/backward sweep has true data dependencies across slices. Parallelizing across slices requires a separate algorithm (the spec's dirty-bit slice skipping, deferred to beyond-T3). +4. **Deterministic reductions:** `.par_iter().map().collect::>().into_iter().sum()` — each slice's contribution is computed in parallel, then summed sequentially in slice order. Per-element values are bit-identical to T2 (no extra floating-point additions reordered). +5. **Within-game inference stays sequential.** A single `Game::ranked` / `Game::likelihoods` call is too small (~20 µs) to amortize rayon's ~5 µs task overhead. The spec's table confirms this. + +## Acceptance criteria + +- `cargo test --features approx` — all tests pass (T2 baseline: 90 tests). +- `cargo test --features approx,rayon` — all tests pass; determinism tests across `RAYON_NUM_THREADS={1, 2, 4, 8}` produce bit-identical posteriors. +- `cargo clippy --all-targets --features approx -- -D warnings` — clean. +- `cargo clippy --all-targets --features approx,rayon -- -D warnings` — clean. +- `cargo +nightly fmt --check` — clean. +- `cargo bench --bench history_converge --features approx,rayon` — >2× speedup on an 8-core machine vs the sequential baseline. +- All public traits (`Time`, `Drift`, `Observer`, `Factor`, `Schedule`) have `Send + Sync` bounds; their blanket/default impls (`i64`, `Untimed`, `ConstantDrift`, `NullObserver`) all naturally satisfy the new bounds — no user code should need changes. +- The `rayon` feature is opt-in; default build without it compiles and passes all tests. + +## File map + +**New files:** + +| Path | Responsibility | +|---|---| +| `src/color_group.rs` | Greedy graph coloring for within-slice event partitioning. | +| `src/parallel.rs` | `#[cfg(feature = "rayon")]` helpers: `par_or_seq` iterators, ordered reductions, thread-count determinism tests. | +| `benches/history_converge.rs` | End-to-end convergence benchmark (scales to show rayon benefit). | +| `tests/determinism.rs` | Runs the same convergence with different `RAYON_NUM_THREADS` and asserts bit-identical posteriors. | + +**Modified:** + +| Path | What changes | +|---|---| +| `Cargo.toml` | Add `rayon = { version = "1", optional = true }`; declare `rayon` feature. Add `[[bench]] name = "history_converge"`. | +| `src/lib.rs` | Declare new `color_group` + `parallel` modules (both private). | +| `src/time.rs` | Add `Send + Sync + 'static` bounds to `Time`. | +| `src/drift.rs` | Add `Send + Sync` bound to `Drift`. | +| `src/observer.rs` | Add `Send + Sync` bound to `Observer`. | +| `src/factor/mod.rs` | Add `Send + Sync` bound to `Factor`. | +| `src/schedule.rs` | Add `Send + Sync` bound to `Schedule`. | +| `src/time_slice.rs` | Store pre-computed color-group partition; parallel event iteration behind `#[cfg(feature = "rayon")]`. | +| `src/history.rs` | Parallel `learning_curves`, `learning_curves_by_index`, `log_evidence`, `log_evidence_for` behind `#[cfg(feature = "rayon")]` with ordered reductions. | + +--- + +## Task 1: Pre-flight — verify green on main, create t3 branch, capture baseline + +**Files:** none + +- [ ] **Step 1: Confirm on main, clean tree** + +```bash +git status +git rev-parse --abbrev-ref HEAD +``` + +Expected: clean; on `main`. + +- [ ] **Step 2: Create the T3 branch** + +```bash +git checkout -b t3-concurrency +``` + +- [ ] **Step 3: Confirm all tests pass** + +```bash +cargo test --features approx +``` + +Expected: 90 tests pass (68 lib + 10 api_shape + 6 game + 4 record_winner + 2 equivalence). + +- [ ] **Step 4: Capture current bench baseline** + +```bash +cargo bench --bench batch 2>&1 | grep "Batch::iteration" +``` + +Record the number — it'll be the sequential-path baseline to verify no regression. + +- [ ] **Step 5: No commit** — verification only. + +--- + +## Task 2: Add `rayon` as optional dependency + feature flag + +**Files:** +- Modify: `Cargo.toml` + +- [ ] **Step 1: Add the dependency and feature** + +Under `[dependencies]` add: + +```toml +rayon = { version = "1", optional = true } +``` + +Add a `[features]` section (if not present — check first): + +```toml +[features] +rayon = ["dep:rayon"] +``` + +(The existing `approx` feature already exists as a `dep:approx`-style optional dependency; use the same convention.) + +- [ ] **Step 2: Verify both builds compile** + +```bash +cargo build --features approx +cargo build --features approx,rayon +``` + +Both must succeed. The second one pulls `rayon` into the dependency graph but nothing uses it yet. + +- [ ] **Step 3: Commit** + +```bash +git add Cargo.toml Cargo.lock +git commit -m "$(cat <<'EOF' +feat(cargo): add rayon as optional dependency + +Opt-in feature flag — users who want parallel paths build with +--features rayon. Default build remains single-threaded. + +Spec Section 6 calls for default-on; we defer that flip until the +feature is stable under field use. + +Part of T3 of docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md. +EOF +)" +``` + +--- + +## Task 3: Add `Send + Sync` bounds to public traits + +**Files:** +- Modify: `src/time.rs`, `src/drift.rs`, `src/observer.rs`, `src/factor/mod.rs`, `src/schedule.rs` + +This is a minor breaking API change: downstream code with user-defined `Drift` / `Observer` / `Factor` / `Schedule` impls that aren't `Send + Sync` will fail to compile. For our crate, all built-in types (`i64`, `Untimed`, `ConstantDrift`, `NullObserver`, `EpsilonOrMax`, `TeamSumFactor`, `RankDiffFactor`, `TruncFactor`, `BuiltinFactor`) naturally satisfy these bounds — no internal code changes. + +- [ ] **Step 1: Update `Time` trait** + +```rust +// src/time.rs +pub trait Time: Copy + Ord + Send + Sync + 'static { + fn elapsed_to(&self, later: &Self) -> i64; +} +``` + +- [ ] **Step 2: Update `Drift` trait** + +```rust +// src/drift.rs +pub trait Drift: Copy + Debug + Send + Sync { + fn variance_delta(&self, from: &T, to: &T) -> f64; + fn variance_for_elapsed(&self, elapsed: i64) -> f64; +} +``` + +- [ ] **Step 3: Update `Observer` trait** + +```rust +// src/observer.rs +pub trait Observer: Send + Sync { + fn on_iteration_end(&self, _iter: usize, _max_step: (f64, f64)) {} + fn on_batch_processed(&self, _time: &T, _slice_idx: usize, _n_events: usize) {} + fn on_converged(&self, _iters: usize, _final_step: (f64, f64), _converged: bool) {} +} +``` + +- [ ] **Step 4: Update `Factor` trait** + +```rust +// src/factor/mod.rs +pub trait Factor: Send + Sync { + fn propagate(&mut self, vars: &mut VarStore) -> (f64, f64); + fn log_evidence(&self, _vars: &VarStore) -> f64 { 0.0 } +} +``` + +- [ ] **Step 5: Update `Schedule` trait** + +```rust +// src/schedule.rs +pub trait Schedule: Send + Sync { + fn run(&self, factors: &mut [BuiltinFactor], vars: &mut VarStore) -> ScheduleReport; +} +``` + +- [ ] **Step 6: Verify both feature combinations still compile and test** + +```bash +cargo build --features approx +cargo build --features approx,rayon +cargo test --features approx --lib +cargo clippy --all-targets --features approx -- -D warnings +``` + +If any built-in type fails the auto-`Send`/`Sync` check, investigate — something non-thread-safe slipped in. Likely suspects: raw pointers, `Rc`, `RefCell`, non-static references. None are expected in this codebase; if found, convert to `Arc`/`Mutex`/`RwLock` as appropriate. + +- [ ] **Step 7: Commit** + +```bash +cargo +nightly fmt +git add -A +git commit -m "$(cat <<'EOF' +feat(api): add Send + Sync bounds to public traits + +Required for T3 rayon-based parallelism. Affected traits: +- Time (+ Send + Sync + 'static) +- Drift (+ Send + Sync) +- Observer (+ Send + Sync) +- Factor (+ Send + Sync) +- Schedule (+ Send + Sync) + +All built-in impls (i64, Untimed, ConstantDrift, NullObserver, +EpsilonOrMax, TeamSumFactor, RankDiffFactor, TruncFactor, +BuiltinFactor) naturally satisfy these bounds — no internal changes +needed. + +Minor breaking change: downstream impls that aren't already +thread-safe will fail to compile until they add the bounds. + +Part of T3 of docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md. +EOF +)" +``` + +--- + +## Task 4: Implement greedy color-group partitioning + +**Files:** +- Create: `src/color_group.rs` +- Modify: `src/lib.rs` (register module) + +- [ ] **Step 1: Create `src/color_group.rs`** + +```rust +//! Greedy graph coloring for within-slice event independence. +//! +//! Events sharing no `Index` can be processed in parallel under async-EP +//! semantics. This module partitions a list of events into "colors" such +//! that events of the same color touch disjoint index sets. +//! +//! The algorithm is greedy: for each event in ingestion order, place it in +//! the lowest-numbered color whose existing members share no `Index`. If +//! no existing color accepts the event, open a new color. +//! +//! Complexity: O(n × c × m) where n is events, c is colors (small, ≤ 5 in +//! practice), and m is average team size. + +use std::collections::HashSet; + +use crate::Index; + +/// Partition of event indices into color groups. +/// +/// Each inner `Vec` holds the indices (into the original events +/// array) of events assigned to one color. Colors are iterated in ascending +/// order by convention. +#[derive(Clone, Debug, Default)] +pub(crate) struct ColorGroups { + pub(crate) groups: Vec>, +} + +impl ColorGroups { + pub(crate) fn new() -> Self { + Self::default() + } + + pub(crate) fn n_colors(&self) -> usize { + self.groups.len() + } + + pub(crate) fn is_empty(&self) -> bool { + self.groups.is_empty() + } + + /// Total event count across all colors. + pub(crate) fn total_events(&self) -> usize { + self.groups.iter().map(|g| g.len()).sum() + } +} + +/// Compute color groups greedily. +/// +/// `event_indices` yields, for each event, the set of `Index` values that +/// event touches. The returned `ColorGroups` has one inner `Vec` per +/// color, containing event indices in the order they were assigned. +pub(crate) fn color_greedy(n_events: usize, index_set: F) -> ColorGroups +where + F: Fn(usize) -> I, + I: IntoIterator, +{ + let mut groups: Vec> = Vec::new(); + let mut members: Vec> = Vec::new(); + + for ev_idx in 0..n_events { + let ev_members: HashSet = index_set(ev_idx).into_iter().collect(); + // Find first color whose member-set is disjoint from this event's indices. + let chosen = members + .iter() + .position(|m| m.is_disjoint(&ev_members)); + let color_idx = match chosen { + Some(c) => c, + None => { + groups.push(Vec::new()); + members.push(HashSet::new()); + groups.len() - 1 + } + }; + groups[color_idx].push(ev_idx); + members[color_idx].extend(ev_members); + } + + ColorGroups { groups } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn idx(i: usize) -> Index { + Index::from(i) + } + + #[test] + fn single_event_gets_one_color() { + let cg = color_greedy(1, |_| vec![idx(0), idx(1)]); + assert_eq!(cg.n_colors(), 1); + assert_eq!(cg.groups[0], vec![0]); + } + + #[test] + fn disjoint_events_share_a_color() { + // Event 0 touches {0, 1}; event 1 touches {2, 3}. + let cg = color_greedy(2, |i| match i { + 0 => vec![idx(0), idx(1)], + 1 => vec![idx(2), idx(3)], + _ => unreachable!(), + }); + assert_eq!(cg.n_colors(), 1); + assert_eq!(cg.groups[0], vec![0, 1]); + } + + #[test] + fn overlapping_events_need_separate_colors() { + // Event 0 touches {0, 1}; event 1 touches {1, 2}. + let cg = color_greedy(2, |i| match i { + 0 => vec![idx(0), idx(1)], + 1 => vec![idx(1), idx(2)], + _ => unreachable!(), + }); + assert_eq!(cg.n_colors(), 2); + assert_eq!(cg.groups[0], vec![0]); + assert_eq!(cg.groups[1], vec![1]); + } + + #[test] + fn three_events_two_colors() { + // Event 0: {0, 1}; event 1: {2, 3}; event 2: {0, 2}. + // Greedy: ev0→c0, ev1→c0 (disjoint), ev2 overlaps both→c1. + let cg = color_greedy(3, |i| match i { + 0 => vec![idx(0), idx(1)], + 1 => vec![idx(2), idx(3)], + 2 => vec![idx(0), idx(2)], + _ => unreachable!(), + }); + assert_eq!(cg.n_colors(), 2); + assert_eq!(cg.groups[0], vec![0, 1]); + assert_eq!(cg.groups[1], vec![2]); + } + + #[test] + fn total_events_counts_correctly() { + let cg = color_greedy(4, |_| vec![idx(0)]); + // All events touch index 0 → 4 distinct colors. + assert_eq!(cg.n_colors(), 4); + assert_eq!(cg.total_events(), 4); + } +} +``` + +- [ ] **Step 2: Register in `src/lib.rs`** + +Add (private module, alphabetical): + +```rust +mod color_group; +``` + +No public re-export — this is internal infrastructure. + +- [ ] **Step 3: Verify** + +```bash +cargo test --features approx --lib color_group +cargo clippy --all-targets --features approx -- -D warnings +cargo +nightly fmt --check +``` + +Expected: 5 tests pass in the `color_group::tests` module. + +- [ ] **Step 4: Commit** + +```bash +cargo +nightly fmt +git add src/color_group.rs src/lib.rs +git commit -m "$(cat <<'EOF' +feat(color-group): add greedy within-slice event partitioning + +ColorGroups holds a partition of event indices into color groups such +that events of the same color touch no shared Index. Computed greedily +in ingestion order: each event goes into the first color whose existing +members are disjoint from the event's indices. + +Used in T3 for safe within-slice parallelism — events in the same +color can run concurrently without touching each other's skills. + +Part of T3 of docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md. +EOF +)" +``` + +--- + +## Task 5: Store color-group partition in `TimeSlice` + +**Files:** +- Modify: `src/time_slice.rs` + +- [ ] **Step 1: Add field to `TimeSlice`** + +```rust +pub(crate) struct TimeSlice { + // … existing fields … + pub(crate) color_groups: crate::color_group::ColorGroups, +} +``` + +Initialize to empty in `TimeSlice::new`: + +```rust +pub fn new(time: T, p_draw: f64) -> Self { + Self { + // … existing initializers … + color_groups: crate::color_group::ColorGroups::new(), + } +} +``` + +- [ ] **Step 2: Recompute color groups whenever events change** + +Find the methods on `TimeSlice` that mutate `self.events` (most likely `add_events` — look at the current signature in `src/time_slice.rs`). After any mutation, recompute the partition: + +```rust +pub(crate) fn recompute_color_groups(&mut self) { + let n = self.events.len(); + self.color_groups = crate::color_group::color_greedy(n, |ev_idx| { + // Return an iterator of every Index touched by event ev_idx. + // Each event has teams; each team has items; each item has an agent: Index. + self.events[ev_idx] + .teams + .iter() + .flat_map(|t| t.items.iter().map(|it| it.agent)) + .collect::>() + }); +} +``` + +Call `recompute_color_groups()` at the end of any public/crate-visible method that mutates `self.events` (likely one or two sites). **Note the exact method name by reading `src/time_slice.rs` first** — it may differ from "add_events". + +- [ ] **Step 3: Add a sanity test** + +```rust +#[test] +fn time_slice_recomputes_color_groups() { + // Construct a time slice with 2 events sharing competitor a — they should + // end up in different color groups. + // … use existing test helpers to build the slice … + // Assert slice.color_groups.n_colors() == 2 and each group has 1 event. +} +``` + +The exact helper pattern depends on how existing `src/time_slice.rs::tests` construct slices. Mirror the existing approach. + +- [ ] **Step 4: Verify** + +```bash +cargo test --features approx --lib time_slice +cargo clippy --all-targets --features approx -- -D warnings +``` + +Expected: all existing time_slice tests still pass + 1 new test. + +- [ ] **Step 5: Commit** + +```bash +cargo +nightly fmt +git add -A +git commit -m "$(cat <<'EOF' +feat(time-slice): pre-compute color groups at ingestion + +TimeSlice now stores a ColorGroups partition recomputed whenever +events change. The partition is computed once per slice mutation and +reused on every convergence iteration, enabling the cheap within-slice +parallel sweep added in Task 6. + +Part of T3. +EOF +)" +``` + +--- + +## Task 6: Parallel within-slice event iteration (behind `rayon` feature) + +**Files:** +- Modify: `src/time_slice.rs` + +This is the core parallelism work. Within a color group, events touch disjoint `Index` values — it's safe to process them concurrently. Across colors, processing is strictly sequential. This preserves exact async-EP semantics. + +- [ ] **Step 1: Identify the event-iteration hot path** + +Read `src/time_slice.rs` to find the method that, per convergence iteration, walks all events in the slice and updates their skills. Look for patterns like `for event in self.events.iter_mut()` or `self.events.iter().for_each(...)`. This is the target for parallelization. + +Probably named `iteration` or similar. Note its full signature and current body. + +- [ ] **Step 2: Rewrite the loop as color-group-driven** + +**Sequential version** (always present, used when `rayon` is disabled): + +```rust +pub(crate) fn iteration(&mut self, …) -> (f64, f64) { + let mut max_step = (0.0_f64, 0.0_f64); + for color in &self.color_groups.groups { + for &ev_idx in color { + let step = self.events[ev_idx].iteration(…); + max_step.0 = max_step.0.max(step.0); + max_step.1 = max_step.1.max(step.1); + } + } + max_step +} +``` + +**Parallel version** (behind `cfg`): + +```rust +#[cfg(feature = "rayon")] +pub(crate) fn iteration(&mut self, …) -> (f64, f64) { + use rayon::prelude::*; + let mut max_step = (0.0_f64, 0.0_f64); + for color in &self.color_groups.groups { + // Within one color, events touch disjoint Indexes → safe to parallelize. + // SAFETY: color_greedy guarantees disjoint index sets, so the slice + // entries color[i] and color[j] (i ≠ j) can be mutably borrowed + // concurrently via rayon's work-stealing executor. We use + // par_iter over the event indices and bundle the per-event state + // into something Sync. + // + // Concretely: since `self` is &mut and we can't simultaneously have + // mut borrows into it, we need to split the borrow. Use + // events[..].par_iter_mut() filtered to the color's indices? No — + // rayon's par_iter_mut doesn't support index filtering directly. + // + // Instead: use events.as_mut_slice().par_chunks_mut() after sorting + // events so color-members are contiguous — or extract the per-event + // state into a &mut [EventState] and index directly. + // + // Practical implementation: extract the events we'll touch this color + // into a Vec<&mut Event> using split_at_mut or similar. See impl below. + let contributions: Vec<(f64, f64)> = color + .par_iter() + .map(|&ev_idx| { + // Borrow the event mutably — this requires unsafe or an + // alternate data layout that exposes color-disjoint slices. + // See the "Design note" below for the chosen approach. + todo!("color-disjoint mutable event access") + }) + .collect(); + for (d0, d1) in contributions { + max_step.0 = max_step.0.max(d0); + max_step.1 = max_step.1.max(d1); + } + } + max_step +} +``` + +**Design note on mutable access:** Rayon's `par_iter_mut` doesn't let us access arbitrary indices from the same `&mut Vec` in parallel. Options the implementer should choose between, depending on which compiles most cleanly with the existing `TimeSlice::iteration` body: + +1. **Interior mutability.** Wrap each `Event` in `Cell<…>` or `RefCell<…>`, then `Event: Sync`. This works only if the event's internal state is small/cheap to copy. Adds overhead. + +2. **Manual `split_at_mut` sequence.** Sort events into color order once (mutating `self.events` so color[0][0], color[0][1], …, color[1][0], … are contiguous), remember the boundaries, then `par_chunks_mut` over each color's contiguous range. Simple, no unsafe. Does require a one-time sort when color groups are computed. + +3. **Raw pointer juggling.** `SAFETY`-commented `unsafe` blocks that pass `*mut Event` into parallel closures. Fast, but fragile. Avoid unless (1) and (2) are benchmarked and found insufficient. + +**Recommendation: approach (2).** When color groups are computed in Task 5's `recompute_color_groups`, also physically reorder `self.events` so color members are contiguous. Then each color corresponds to a slice range `self.events[color_start..color_end]`, and `par_chunks_mut(…).for_each(…)` or `par_iter_mut()` over the range works. + +**Revised `recompute_color_groups`:** + +```rust +pub(crate) fn recompute_color_groups(&mut self) { + let n = self.events.len(); + let cg = crate::color_group::color_greedy(n, |ev_idx| { + self.events[ev_idx] + .teams + .iter() + .flat_map(|t| t.items.iter().map(|it| it.agent)) + .collect::>() + }); + + // Physically reorder self.events to match the color-group layout. + let mut reordered: Vec = Vec::with_capacity(n); + let mut ranges: Vec<(usize, usize)> = Vec::with_capacity(cg.groups.len()); + for group in &cg.groups { + let start = reordered.len(); + for &ev_idx in group { + reordered.push(std::mem::replace( + &mut self.events[ev_idx], + // Placeholder; original slot becomes garbage before the + // final swap. + Event::placeholder(), // OR use std::mem::take if Event: Default + )); + } + ranges.push((start, reordered.len())); + } + self.events = reordered; + // Rebuild cg with post-reorder indices: each group now spans a + // contiguous range. + self.color_groups = ColorGroups::from_ranges(ranges); +} +``` + +Then color[i] is event indices `[ranges[i].0 .. ranges[i].1)`. + +`ColorGroups::from_ranges(ranges: Vec<(usize, usize)>)` constructs the groups with the trivial mapping `group_i = (start..end).collect()`. + +**Pitfall**: `Event::placeholder()` or `std::mem::take`. If `Event: Default`, `std::mem::take(&mut self.events[ev_idx])` works cleanly. If not, use `Option` temporarily, or implement a cheap `Event::placeholder()`. **Before writing the replacement logic, read `src/time_slice.rs` to see if Event derives Default — if it does, use `std::mem::take`.** + +- [ ] **Step 3: Implement the parallel iteration via contiguous ranges** + +```rust +#[cfg(feature = "rayon")] +pub(crate) fn iteration(&mut self, …) -> (f64, f64) { + use rayon::prelude::*; + let mut max_step = (0.0_f64, 0.0_f64); + for range in &self.color_groups.ranges { + let slice = &mut self.events[range.0..range.1]; + let contributions: Vec<(f64, f64)> = slice + .par_iter_mut() + .map(|event| event.iteration(…)) + .collect(); + for (d0, d1) in contributions { + max_step.0 = max_step.0.max(d0); + max_step.1 = max_step.1.max(d1); + } + } + max_step +} +``` + +**IMPORTANT**: `event.iteration(…)` currently takes other borrowed state from `TimeSlice` (skills, competitors). Those borrows conflict with the `&mut self.events[...]` borrow. Resolution: + +a) Pull the shared data (`&self.skills`, `&self.competitors`) into a local variable before the par_iter, so rayon's closure captures `&` not `&self`. + +b) If the existing per-event method ALSO mutates shared state (e.g., writes to a shared SkillStore), that's a problem — it breaks the color-disjoint-index guarantee. Read the current code carefully. If per-event iteration writes to skills for indices it owns, the color-group invariant makes this safe, but you'll need unsafe to express it. In that case, fall back to approach (1) or (3) from Task 6 Step 2. + +**Read the existing `event.iteration` or equivalent FIRST before choosing.** The code may already be structured so events only mutate themselves. + +- [ ] **Step 4: Sequential fallback** + +```rust +#[cfg(not(feature = "rayon"))] +pub(crate) fn iteration(&mut self, …) -> (f64, f64) { + let mut max_step = (0.0_f64, 0.0_f64); + for range in &self.color_groups.ranges { + for event in &mut self.events[range.0..range.1] { + let step = event.iteration(…); + max_step.0 = max_step.0.max(step.0); + max_step.1 = max_step.1.max(step.1); + } + } + max_step +} +``` + +Both versions use the same color-group traversal order — behavior is identical across feature flags. + +- [ ] **Step 5: Verify both feature combinations** + +```bash +cargo test --features approx --lib +cargo test --features approx,rayon --lib +cargo clippy --all-targets --features approx -- -D warnings +cargo clippy --all-targets --features approx,rayon -- -D warnings +``` + +All 90 tests pass in both configurations. Goldens must NOT drift. + +- [ ] **Step 6: Commit** + +```bash +cargo +nightly fmt +git add -A +git commit -m "$(cat <<'EOF' +feat(time-slice): parallel within-slice iteration via rayon + +Events are reordered into color-group-contiguous ranges during +recompute_color_groups; each color's range is processed in parallel +via par_iter_mut when the rayon feature is enabled, sequentially +otherwise. The two paths produce identical results because events +within a color touch disjoint Index values (async-EP invariant). + +Feature gated: default build still sequential; --features rayon +activates the parallel path. + +Part of T3. +EOF +)" +``` + +--- + +## Task 7: Parallel `learning_curves` with ordered reduction + +**Files:** +- Modify: `src/history.rs` + +Both `learning_curves()` and `learning_curves_by_index()` currently iterate `self.time_slices` and collect per-competitor posteriors. They're embarrassingly parallel per-slice; the merge at the end must preserve slice order to keep tests deterministic. + +- [ ] **Step 1: Parallelize `learning_curves`** + +```rust +pub fn learning_curves(&self) -> HashMap> { + #[cfg(feature = "rayon")] + { + use rayon::prelude::*; + // Parallel: compute per-slice (index, time, gaussian) triples; + // collect preserves slice order (.collect::> is order-preserving). + let per_slice: Vec> = self + .time_slices + .par_iter() + .map(|ts| { + ts.skills + .iter() + .map(|(idx, sk)| (idx, ts.time, sk.posterior())) + .collect() + }) + .collect(); + // Sequential merge: iterate in slice order, push to per-key vectors. + let mut data: HashMap> = HashMap::new(); + for slice_contrib in per_slice { + for (idx, t, g) in slice_contrib { + if let Some(key) = self.keys.key(idx).cloned() { + data.entry(key).or_default().push((t, g)); + } + } + } + data + } + #[cfg(not(feature = "rayon"))] + { + // Original sequential impl (unchanged) + let mut data: HashMap> = HashMap::new(); + for ts in &self.time_slices { + for (idx, sk) in ts.skills.iter() { + if let Some(key) = self.keys.key(idx).cloned() { + data.entry(key).or_default().push((ts.time, sk.posterior())); + } + } + } + data + } +} +``` + +- [ ] **Step 2: Parallelize `learning_curves_by_index`** + +Same pattern — should it exist. (Task 20 of T2 may have removed this method; verify it's present.) If it's there, mirror the `learning_curves` parallel+sequential split. + +- [ ] **Step 3: Verify** + +```bash +cargo test --features approx,rayon +``` + +All tests pass — goldens preserved across both feature configurations. + +- [ ] **Step 4: Commit** + +```bash +cargo +nightly fmt +git add -A +git commit -m "$(cat <<'EOF' +feat(history): parallel learning_curves under rayon feature + +Per-slice posterior collection runs in parallel; merge into the +per-key HashMap is sequential in slice order to preserve deterministic +output. Sequential impl unchanged under default feature set. + +Part of T3. +EOF +)" +``` + +--- + +## Task 8: Parallel `log_evidence` / `log_evidence_for` with deterministic sum + +**Files:** +- Modify: `src/history.rs` + +`log_evidence_internal` already does `self.time_slices.iter().map(|ts| ts.log_evidence(…)).sum()`. We replace with parallel map + sequential sum. + +- [ ] **Step 1: Parallelize `log_evidence_internal`** + +```rust +pub(crate) fn log_evidence_internal(&mut self, forward: bool, targets: &[Index]) -> f64 { + #[cfg(feature = "rayon")] + { + use rayon::prelude::*; + let per_slice: Vec = self + .time_slices + .par_iter() + .map(|ts| ts.log_evidence(self.online, targets, forward, &self.competitors)) + .collect(); + // Sequential sum in slice order for bit-identical reduction. + per_slice.into_iter().sum() + } + #[cfg(not(feature = "rayon"))] + { + self.time_slices + .iter() + .map(|ts| ts.log_evidence(self.online, targets, forward, &self.competitors)) + .sum() + } +} +``` + +**Critical:** `per_slice` is a `Vec`, not a fold. The sequential `.into_iter().sum()` is bit-identical to the sequential impl because the order is the same (slice order). Rayon's `par_iter().sum()` would reorder additions and is non-deterministic. + +- [ ] **Step 2: `log_evidence_for` already wraps `log_evidence_internal`** — no further changes needed. + +- [ ] **Step 3: Verify** + +```bash +cargo test --features approx,rayon +``` + +All tests pass — goldens preserved. + +- [ ] **Step 4: Commit** + +```bash +cargo +nightly fmt +git add -A +git commit -m "$(cat <<'EOF' +feat(history): parallel log_evidence with deterministic sum + +Per-slice contribution computed in parallel; final reduction is +sequential in slice order so the sum is bit-identical to the T2 +sequential baseline. This is essential for the T3 acceptance +criterion of identical posteriors across RAYON_NUM_THREADS values. + +Part of T3. +EOF +)" +``` + +--- + +## Task 9: Determinism test across thread counts + +**Files:** +- Create: `tests/determinism.rs` + +- [ ] **Step 1: Create the test** + +```rust +//! Determinism tests: identical posteriors across RAYON_NUM_THREADS +//! values. Only meaningful when the `rayon` feature is enabled. + +#![cfg(feature = "rayon")] + +use smallvec::smallvec; +use trueskill_tt::{ + ConstantDrift, ConvergenceOptions, Event, History, Member, Outcome, Team, +}; + +fn build_and_converge(seed: u64) -> Vec<(i64, trueskill_tt::Gaussian)> { + // Seed-driven deterministic test data: ~100 events, ~30 competitors. + let mut h = History::::builder() + .mu(25.0) + .sigma(25.0 / 3.0) + .beta(25.0 / 6.0) + .drift(ConstantDrift(25.0 / 300.0)) + .convergence(ConvergenceOptions { max_iter: 30, epsilon: 1e-6 }) + .build(); + + // Deterministic pseudo-random event generation. + let mut rng_state = seed; + let mut next = || { + rng_state = rng_state.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407); + rng_state + }; + + let mut events: Vec> = Vec::with_capacity(100); + for ev_i in 0..100 { + let a = (next() % 30) as usize; + let mut b = (next() % 30) as usize; + while b == a { + b = (next() % 30) as usize; + } + events.push(Event { + time: ev_i as i64 + 1, + teams: smallvec![ + Team::with_members([Member::new(format!("p{a}"))]), + Team::with_members([Member::new(format!("p{b}"))]), + ], + outcome: Outcome::winner((next() % 2) as u32, 2), + }); + } + h.add_events(events).unwrap(); + h.converge().unwrap(); + h.learning_curve(&"p0".to_string()) +} + +#[test] +fn posteriors_identical_across_thread_counts() { + // Run the same history with the same seed at different rayon pool sizes. + // Rayon lets us set the pool once per process; the cleanest pattern is + // to use install() with a custom ThreadPoolBuilder per run. + + let sizes = [1, 2, 4, 8]; + let mut results: Vec> = Vec::new(); + for &n in &sizes { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(n) + .build() + .unwrap(); + let curve = pool.install(|| build_and_converge(42)); + results.push(curve); + } + + // Every result must be bit-identical to the first. + let reference = &results[0]; + for (i, curve) in results.iter().enumerate().skip(1) { + assert_eq!( + curve.len(), + reference.len(), + "curve length differs at {n} threads", + n = sizes[i], + ); + for (j, (&(t_ref, g_ref), &(t, g))) in reference.iter().zip(curve.iter()).enumerate() { + assert_eq!(t_ref, t, "time point {j} differs at {} threads", sizes[i]); + assert_eq!( + g_ref.pi_and_tau(), + g.pi_and_tau(), + "posterior bits differ at thread count {}, time {}", + sizes[i], + t, + ); + } + } +} +``` + +**Note on `pi_and_tau`**: the test expects a way to extract the raw nat-param representation for bit-level comparison. If the `Gaussian` type doesn't expose it, add a `pub(crate) fn pi_and_tau(&self) -> (f64, f64)` method as part of this task. Alternatively, compare `(mu(), sigma())` — slightly less strict but usually good enough for determinism testing. + +Actually, `f64::to_bits()` gives us direct bit equality: + +```rust +assert_eq!(g_ref.mu().to_bits(), g.mu().to_bits(), …); +assert_eq!(g_ref.sigma().to_bits(), g.sigma().to_bits(), …); +``` + +Use `to_bits()` so we don't need a new accessor. + +- [ ] **Step 2: Verify** + +```bash +cargo test --features approx,rayon --test determinism +``` + +Expected: `posteriors_identical_across_thread_counts` passes. + +- [ ] **Step 3: Commit** + +```bash +cargo +nightly fmt +git add tests/determinism.rs +git commit -m "$(cat <<'EOF' +test: assert bit-identical posteriors across RAYON_NUM_THREADS + +Runs the same deterministic history at thread counts {1, 2, 4, 8} +and asserts every (time, posterior) pair is bit-identical. Verifies +the T3 determinism invariant holds under the ordered-reduce strategy. + +Only compiled with --features rayon. + +Part of T3. +EOF +)" +``` + +--- + +## Task 10: Multi-thread benchmark + acceptance gate + +**Files:** +- Create: `benches/history_converge.rs` +- Modify: `Cargo.toml` (register the bench) + +- [ ] **Step 1: Register the bench in `Cargo.toml`** + +Add under the existing `[[bench]]` entries: + +```toml +[[bench]] +name = "history_converge" +harness = false +``` + +- [ ] **Step 2: Create `benches/history_converge.rs`** + +```rust +//! End-to-end convergence benchmark. Measures `History::converge()` on a +//! realistic workload (~500 events, ~100 competitors, ~30 iters). The +//! rayon feature, when enabled, activates within-slice parallel event +//! iteration. + +use criterion::{Criterion, criterion_group, criterion_main}; +use smallvec::smallvec; +use trueskill_tt::{ + ConstantDrift, ConvergenceOptions, Event, History, Member, Outcome, Team, +}; + +fn build_history(n_events: usize, n_competitors: usize, seed: u64) -> History { + let mut rng = seed; + let mut next = || { + rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407); + rng + }; + + let mut h = History::::builder() + .mu(25.0) + .sigma(25.0 / 3.0) + .beta(25.0 / 6.0) + .drift(ConstantDrift(25.0 / 300.0)) + .convergence(ConvergenceOptions { max_iter: 30, epsilon: 1e-6 }) + .build(); + + let mut events: Vec> = Vec::with_capacity(n_events); + for ev_i in 0..n_events { + let a = (next() as usize) % n_competitors; + let mut b = (next() as usize) % n_competitors; + while b == a { + b = (next() as usize) % n_competitors; + } + events.push(Event { + time: (ev_i as i64 / 10) + 1, // ~10 events per slice + teams: smallvec![ + Team::with_members([Member::new(format!("p{a}"))]), + Team::with_members([Member::new(format!("p{b}"))]), + ], + outcome: Outcome::winner((next() % 2) as u32, 2), + }); + } + h.add_events(events).unwrap(); + h +} + +fn bench_converge(c: &mut Criterion) { + c.bench_function("History::converge/500x100", |b| { + b.iter_batched( + || build_history(500, 100, 42), + |mut h| { + h.converge().unwrap(); + }, + criterion::BatchSize::SmallInput, + ); + }); +} + +criterion_group!(benches, bench_converge); +criterion_main!(benches); +``` + +- [ ] **Step 3: Run sequential baseline** + +```bash +cargo bench --bench history_converge --features approx 2>&1 | grep 'History::converge' +``` + +Record the sequential baseline number. + +- [ ] **Step 4: Run parallel version** + +```bash +cargo bench --bench history_converge --features approx,rayon 2>&1 | grep 'History::converge' +``` + +**Acceptance gate:** parallel run should show >2× speedup on an 8-core machine. If it's less: +- Verify rayon is actually being used (`RAYON_LOG=1`). +- Check whether the workload has enough color-group parallelism (a slice with 10 events that all share competitors has 0 parallel work). +- Consider whether the per-event cost is large enough to amortize rayon overhead. + +If the gate fails, dig in before committing. Acceptable fallback: tune the benchmark workload (more events per slice, more competitors) so parallelism has more opportunity. Report findings. + +- [ ] **Step 5: Commit** + +```bash +cargo +nightly fmt +git add -A +git commit -m "$(cat <<'EOF' +bench: end-to-end History::converge benchmark + +Workload: 500 events across ~50 time slices, ~100 competitors, 30 +iteration cap, 1e-6 convergence. Measures full forward+backward +sweep through convergence — the target hot path for rayon-backed +within-slice parallelism. + +Sequential baseline: ms +With --features rayon on 8 cores: ms (×) + +Part of T3. +EOF +)" +``` + +(Fill in ``, ``, `` from measured numbers.) + +--- + +## Task 11: Final verification, bench capture, CHANGELOG + +**Files:** +- Modify: `benches/baseline.txt` +- Modify: `CHANGELOG.md` + +- [ ] **Step 1: Run the complete verification matrix** + +```bash +cargo +nightly fmt --check +cargo clippy --all-targets --features approx -- -D warnings +cargo clippy --all-targets --features approx,rayon -- -D warnings +cargo test --features approx +cargo test --features approx,rayon +cargo bench --bench batch --features approx 2>&1 | grep "Batch::iteration" +cargo bench --bench batch --features approx,rayon 2>&1 | grep "Batch::iteration" +cargo bench --bench history_converge --features approx 2>&1 | grep 'History::converge' +cargo bench --bench history_converge --features approx,rayon 2>&1 | grep 'History::converge' +``` + +All must pass / be clean. + +- [ ] **Step 2: Append T3 block to `benches/baseline.txt`** + +``` +# After T3 (date, same hardware) + +Batch::iteration (seq) µs ( vs T2 21.36 µs) +Batch::iteration (rayon, 8c) µs (sequential path on single slice; rayon not active) +History::converge (seq) ms baseline +History::converge (rayon, 8c) ms (× — target was ≥2×) + +# Notes: +# - T3 adds Send + Sync on public traits, color-group partition in +# TimeSlice, and rayon-feature-gated parallel paths on within-slice +# iteration, learning_curves, log_evidence_for. +# - Determinism verified: tests/determinism.rs asserts bit-identical +# posteriors across RAYON_NUM_THREADS={1, 2, 4, 8}. +# - Rayon is opt-in: default build is single-threaded as in T2. +``` + +- [ ] **Step 3: Prepend T3 entry to `CHANGELOG.md`** + +Add a `## Unreleased — T3 concurrency` section above the existing `## Unreleased — T2 new API surface` (if it's still there after the main merge; otherwise above the `## 0.1.0` section). Enumerate: new `rayon` feature, `Send + Sync` bounds (minor breaking for downstream custom trait impls), color-group infrastructure (internal), benchmark numbers. + +- [ ] **Step 4: Commit** + +```bash +git add benches/baseline.txt CHANGELOG.md +git commit -m "$(cat <<'EOF' +bench,docs: capture T3 final numbers and update CHANGELOG + +History::converge parallel speedup: × on 8 cores with --features rayon. +Batch::iteration unchanged in seq mode; Gaussian ops unchanged. + +Determinism verified: bit-identical posteriors across +RAYON_NUM_THREADS={1, 2, 4, 8}. + +Closes T3 of docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md. +EOF +)" +``` + +- [ ] **Step 5: Ready to PR** + +Branch `t3-concurrency` should open against `main`. PR body references the spec, the plan, and this file's benchmarks block. + +--- + +## Self-review notes + +**Spec coverage** (against Section 6 + Section 7 "T3"): + +- ✅ `Send + Sync` audit and bounds on all public traits (Task 3) +- ✅ Color-group partitioning at `TimeSlice` ingestion (Tasks 4, 5) +- ✅ `rayon` as opt-in feature (Task 2; note: spec said default-on, we flip later) +- ✅ Parallel within-slice color groups (Task 6) +- ✅ Parallel `learning_curves` (Task 7) +- ✅ Parallel `log_evidence_for` (Task 8) +- ✅ Deterministic posteriors across `RAYON_NUM_THREADS` (Task 9) +- ✅ >2× speedup on 8-core offline converge (Task 10) + +**Deferred to later tiers:** +- Cross-slice parallelism (dirty-bit slice skipping) — spec acknowledges this is separate from T3's within-slice focus. +- Synchronous-EP schedule with barrier merge — available as a `Schedule` impl, per spec Section 6, but deferred. +- Exposing color-group partitioning to users via `add_events_with_partition` — per spec open question 5. +- Rayon default-on flip — after field-stability evidence. + +**Hazards during execution:** +- **Mutable-aliasing gymnastics in Task 6.** The color-disjoint guarantee is real but rayon's API doesn't see it. Task 6 Step 2 offers three implementation approaches; the recommended one (approach 2, reorder events into contiguous color ranges) avoids `unsafe` but requires a physical reorder on mutation. Benchmark reordering cost — if it dominates, reconsider. +- **`Send + Sync` auto-derive failure.** If any struct between traits holds a non-thread-safe primitive (raw pointer, `Rc`), auto-derive fails. Unlikely in this codebase, but watch for it in Task 3. +- **Deterministic sum pitfalls.** Rayon's `par_iter().sum()` reorders additions. Only `par_iter().map().collect::>().into_iter().sum()` is deterministic. Task 8 is careful about this; reviewers should verify. +- **`f64` NaN in goldens.** If any test golden has a NaN, `to_bits()` comparison needs special casing. Our goldens are all finite; no action needed. + +**Things outside the plan that may bite:** +- CI needs to test both feature configurations. If there's a `.github/workflows/` file, update it to include `--features approx,rayon` as a matrix entry. If no CI exists (likely for this project), flag and move on. +- `examples/atp.rs` — may want to demonstrate `--features rayon`. Optional; skip unless trivial. diff --git a/src/color_group.rs b/src/color_group.rs new file mode 100644 index 0000000..6add43c --- /dev/null +++ b/src/color_group.rs @@ -0,0 +1,158 @@ +//! Greedy graph coloring for within-slice event independence. +//! +//! Events sharing no `Index` can be processed in parallel under async-EP +//! semantics. This module partitions a list of events into "colors" such +//! that events of the same color touch disjoint index sets. +//! +//! The algorithm is greedy: for each event in ingestion order, place it in +//! the lowest-numbered color whose existing members share no `Index`. If +//! no existing color accepts the event, open a new color. +//! +//! Complexity: O(n × c × m) where n is events, c is colors (small, ≤ 5 in +//! practice), and m is average team size. + +use std::collections::HashSet; + +use crate::Index; + +/// Partition of event indices into color groups. +/// +/// Each inner `Vec` holds the indices (into the original events +/// array) of events assigned to one color. Colors are iterated in ascending +/// order by convention. +#[derive(Clone, Debug, Default)] +pub(crate) struct ColorGroups { + pub(crate) groups: Vec>, +} + +impl ColorGroups { + #[allow(dead_code)] + pub(crate) fn new() -> Self { + Self::default() + } + + #[allow(dead_code)] + pub(crate) fn n_colors(&self) -> usize { + self.groups.len() + } + + #[allow(dead_code)] + pub(crate) fn is_empty(&self) -> bool { + self.groups.is_empty() + } + + /// Total event count across all colors. + #[allow(dead_code)] + pub(crate) fn total_events(&self) -> usize { + self.groups.iter().map(|g| g.len()).sum() + } + + /// Contiguous index range for one color after events have been reordered + /// into color-contiguous positions by `TimeSlice::recompute_color_groups`. + #[allow(dead_code)] + pub(crate) fn color_range(&self, color_idx: usize) -> std::ops::Range { + let group = &self.groups[color_idx]; + if group.is_empty() { + return 0..0; + } + let start = *group.first().unwrap(); + let end = *group.last().unwrap() + 1; + start..end + } +} + +/// Compute color groups greedily. +/// +/// `index_set(ev_idx)` yields, for each event index, the iterator of +/// `Index` values that event touches. The returned `ColorGroups` has one +/// inner `Vec` per color, containing event indices in the order +/// they were assigned. +#[allow(dead_code)] +pub(crate) fn color_greedy(n_events: usize, index_set: F) -> ColorGroups +where + F: Fn(usize) -> I, + I: IntoIterator, +{ + let mut groups: Vec> = Vec::new(); + let mut members: Vec> = Vec::new(); + + for ev_idx in 0..n_events { + let ev_members: HashSet = index_set(ev_idx).into_iter().collect(); + // Find first color whose member-set is disjoint from this event's indices. + let chosen = members.iter().position(|m| m.is_disjoint(&ev_members)); + let color_idx = match chosen { + Some(c) => c, + None => { + groups.push(Vec::new()); + members.push(HashSet::new()); + groups.len() - 1 + } + }; + groups[color_idx].push(ev_idx); + members[color_idx].extend(ev_members); + } + + ColorGroups { groups } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn idx(i: usize) -> Index { + Index::from(i) + } + + #[test] + fn single_event_gets_one_color() { + let cg = color_greedy(1, |_| vec![idx(0), idx(1)]); + assert_eq!(cg.n_colors(), 1); + assert_eq!(cg.groups[0], vec![0]); + } + + #[test] + fn disjoint_events_share_a_color() { + let cg = color_greedy(2, |i| match i { + 0 => vec![idx(0), idx(1)], + 1 => vec![idx(2), idx(3)], + _ => unreachable!(), + }); + assert_eq!(cg.n_colors(), 1); + assert_eq!(cg.groups[0], vec![0, 1]); + } + + #[test] + fn overlapping_events_need_separate_colors() { + let cg = color_greedy(2, |i| match i { + 0 => vec![idx(0), idx(1)], + 1 => vec![idx(1), idx(2)], + _ => unreachable!(), + }); + assert_eq!(cg.n_colors(), 2); + assert_eq!(cg.groups[0], vec![0]); + assert_eq!(cg.groups[1], vec![1]); + } + + #[test] + fn three_events_two_colors() { + // Event 0: {0, 1}; event 1: {2, 3}; event 2: {0, 2}. + // Greedy: ev0→c0, ev1→c0 (disjoint), ev2 overlaps both→c1. + let cg = color_greedy(3, |i| match i { + 0 => vec![idx(0), idx(1)], + 1 => vec![idx(2), idx(3)], + 2 => vec![idx(0), idx(2)], + _ => unreachable!(), + }); + assert_eq!(cg.n_colors(), 2); + assert_eq!(cg.groups[0], vec![0, 1]); + assert_eq!(cg.groups[1], vec![2]); + } + + #[test] + fn total_events_counts_correctly() { + let cg = color_greedy(4, |_| vec![idx(0)]); + // All events touch index 0 → 4 distinct colors. + assert_eq!(cg.n_colors(), 4); + assert_eq!(cg.total_events(), 4); + } +} diff --git a/src/drift.rs b/src/drift.rs index 57e684a..c751624 100644 --- a/src/drift.rs +++ b/src/drift.rs @@ -6,7 +6,7 @@ use crate::time::Time; /// /// Generic over `T: Time` so seasonal or calendar-aware drift is expressible /// without going through `i64`. -pub trait Drift: Copy + Debug { +pub trait Drift: Copy + Debug + Send + Sync { /// Variance added to the skill prior for elapsed time `from -> to`. /// /// Called with `from <= to`; returning zero means no drift accumulates. diff --git a/src/factor/mod.rs b/src/factor/mod.rs index da72dbd..01b39c4 100644 --- a/src/factor/mod.rs +++ b/src/factor/mod.rs @@ -56,7 +56,7 @@ impl VarStore { /// Factors hold their own outgoing messages and propagate them by reading /// connected variable marginals from a `VarStore` and writing back updated /// marginals. -pub trait Factor { +pub trait Factor: Send + Sync { /// Update outgoing messages and write back to the var store. /// /// Returns the max delta `(|Δmu|, |Δsigma|)` across writes this diff --git a/src/history.rs b/src/history.rs index 5191929..6d4439c 100644 --- a/src/history.rs +++ b/src/history.rs @@ -262,17 +262,45 @@ impl, O: Observer, K: Eq + Hash + Clone> History HashMap> { - let mut data: HashMap> = HashMap::new(); - for slice in &self.time_slices { - for (idx, skill) in slice.skills.iter() { - if let Some(key) = self.keys.key(idx).cloned() { - data.entry(key) - .or_default() - .push((slice.time, skill.posterior())); + #[cfg(feature = "rayon")] + { + use rayon::prelude::*; + + let per_slice: Vec> = self + .time_slices + .par_iter() + .map(|ts| { + ts.skills + .iter() + .map(|(idx, sk)| (idx, ts.time, sk.posterior())) + .collect() + }) + .collect(); + + let mut data: HashMap> = HashMap::new(); + for slice_contrib in per_slice { + for (idx, t, g) in slice_contrib { + if let Some(key) = self.keys.key(idx).cloned() { + data.entry(key).or_default().push((t, g)); + } } } + data + } + #[cfg(not(feature = "rayon"))] + { + let mut data: HashMap> = HashMap::new(); + for slice in &self.time_slices { + for (idx, skill) in slice.skills.iter() { + if let Some(key) = self.keys.key(idx).cloned() { + data.entry(key) + .or_default() + .push((slice.time, skill.posterior())); + } + } + } + data } - data } /// Skill estimate at the latest time slice the competitor appears in. @@ -304,10 +332,23 @@ impl, O: Observer, K: Eq + Hash + Clone> History f64 { - self.time_slices - .iter() - .map(|ts| ts.log_evidence(self.online, targets, forward, &self.agents)) - .sum() + #[cfg(feature = "rayon")] + { + use rayon::prelude::*; + let per_slice: Vec = self + .time_slices + .par_iter() + .map(|ts| ts.log_evidence(self.online, targets, forward, &self.agents)) + .collect(); + per_slice.into_iter().sum() + } + #[cfg(not(feature = "rayon"))] + { + self.time_slices + .iter() + .map(|ts| ts.log_evidence(self.online, targets, forward, &self.agents)) + .sum() + } } /// Total log-evidence across the history. diff --git a/src/lib.rs b/src/lib.rs index e6c7d41..6bd9fa7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ pub(crate) mod arena; mod time; mod time_slice; pub use time_slice::TimeSlice; +mod color_group; mod competitor; mod convergence; pub mod drift; diff --git a/src/observer.rs b/src/observer.rs index 223948b..063ef6a 100644 --- a/src/observer.rs +++ b/src/observer.rs @@ -9,9 +9,8 @@ use crate::time::Time; /// Receives progress callbacks during `History::converge`. /// /// All methods have default no-op implementations; implement only what's -/// interesting. Send/Sync is NOT required in T2 (added in T3 along with -/// Rayon support). -pub trait Observer { +/// interesting. +pub trait Observer: Send + Sync { /// Called after each convergence iteration across the whole history. fn on_iteration_end(&self, _iter: usize, _max_step: (f64, f64)) {} diff --git a/src/schedule.rs b/src/schedule.rs index 2c98fc1..a08d20c 100644 --- a/src/schedule.rs +++ b/src/schedule.rs @@ -16,7 +16,7 @@ pub struct ScheduleReport { } /// Drives factor propagation to convergence. -pub trait Schedule { +pub trait Schedule: Send + Sync { fn run(&self, factors: &mut [BuiltinFactor], vars: &mut VarStore) -> ScheduleReport; } diff --git a/src/time.rs b/src/time.rs index 813ff39..efe0bc4 100644 --- a/src/time.rs +++ b/src/time.rs @@ -8,7 +8,7 @@ /// /// Must be `Ord + Copy` so slices can sort events, and `'static` so /// `History` can store it by value without lifetimes. -pub trait Time: Copy + Ord + 'static { +pub trait Time: Copy + Ord + Send + Sync + 'static { /// How much time elapsed between `self` and `later`. /// /// Used by `Drift::variance_delta` to compute skill drift. Returning diff --git a/src/time_slice.rs b/src/time_slice.rs index c1d48fb..cc19b30 100644 --- a/src/time_slice.rs +++ b/src/time_slice.rs @@ -7,6 +7,7 @@ use std::collections::HashMap; use crate::{ Index, N_INF, arena::ScratchArena, + color_group::ColorGroups, drift::Drift, game::Game, gaussian::Gaussian, @@ -84,6 +85,12 @@ pub(crate) struct Event { } impl Event { + pub(crate) fn iter_agents(&self) -> impl Iterator + '_ { + self.teams + .iter() + .flat_map(|t| t.items.iter().map(|it| it.agent)) + } + fn outputs(&self) -> Vec { self.teams .iter() @@ -108,6 +115,33 @@ impl Event { }) .collect::>() } + + /// Direct in-loop update: mutates self and `skills` inline with no + /// intermediate allocation. Used by both the sequential sweep path and, + /// via unsafe, by the parallel rayon path for events in the same color + /// group (which have disjoint agent sets — see `sweep_color_groups`). + fn iteration_direct>( + &mut self, + skills: &mut SkillStore, + agents: &CompetitorStore, + p_draw: f64, + arena: &mut ScratchArena, + ) { + let teams = self.within_priors(false, false, skills, agents); + let result = self.outputs(); + let g = Game::ranked_with_arena(teams, &result, &self.weights, p_draw, arena); + + for (t, team) in self.teams.iter_mut().enumerate() { + for (i, item) in team.items.iter_mut().enumerate() { + let old_likelihood = skills.get(item.agent).unwrap().likelihood; + let new_likelihood = (old_likelihood / item.likelihood) * g.likelihoods[t][i]; + skills.get_mut(item.agent).unwrap().likelihood = new_likelihood; + item.likelihood = g.likelihoods[t][i]; + } + } + + self.evidence = g.evidence; + } } #[derive(Debug)] @@ -117,6 +151,7 @@ pub struct TimeSlice { pub(crate) time: T, p_draw: f64, arena: ScratchArena, + pub(crate) color_groups: ColorGroups, } impl TimeSlice { @@ -127,9 +162,44 @@ impl TimeSlice { time, p_draw, arena: ScratchArena::new(), + color_groups: ColorGroups::new(), } } + /// Recompute the color-group partition and reorder `self.events` into + /// color-contiguous ranges. After this call, `self.color_groups.groups[c]` + /// contains a contiguous ascending range of indices in `self.events`. + pub(crate) fn recompute_color_groups(&mut self) { + use crate::color_group::color_greedy; + + let n = self.events.len(); + if n == 0 { + self.color_groups = ColorGroups::new(); + return; + } + + let cg = color_greedy(n, |ev_idx| { + self.events[ev_idx].iter_agents().collect::>() + }); + + let mut reordered: Vec = Vec::with_capacity(n); + let mut new_groups: Vec> = Vec::with_capacity(cg.groups.len()); + let mut taken: Vec> = self.events.drain(..).map(Some).collect(); + + for group in &cg.groups { + let mut new_indices: Vec = Vec::with_capacity(group.len()); + for &old_idx in group { + let ev = taken[old_idx].take().expect("event already taken"); + new_indices.push(reordered.len()); + reordered.push(ev); + } + new_groups.push(new_indices); + } + + self.events = reordered; + self.color_groups = ColorGroups { groups: new_groups }; + } + pub fn add_events>( &mut self, composition: Vec>>, @@ -212,6 +282,7 @@ impl TimeSlice { self.events.extend(events); self.iteration(from, agents); + self.recompute_color_groups(); } pub(crate) fn posteriors(&self) -> HashMap { @@ -222,28 +293,115 @@ impl TimeSlice { } pub fn iteration>(&mut self, from: usize, agents: &CompetitorStore) { - for event in self.events.iter_mut().skip(from) { - let teams = event.within_priors(false, false, &self.skills, agents); - let result = event.outputs(); + if from > 0 || self.color_groups.is_empty() { + // Initial pass (add_events) or no color groups yet: simple sequential sweep. + for event in self.events.iter_mut().skip(from) { + let teams = event.within_priors(false, false, &self.skills, agents); + let result = event.outputs(); - let g = Game::ranked_with_arena( - teams, - &result, - &event.weights, - self.p_draw, - &mut self.arena, - ); + let g = Game::ranked_with_arena( + teams, + &result, + &event.weights, + self.p_draw, + &mut self.arena, + ); - for (t, team) in event.teams.iter_mut().enumerate() { - for (i, item) in team.items.iter_mut().enumerate() { - let old_likelihood = self.skills.get(item.agent).unwrap().likelihood; - let new_likelihood = (old_likelihood / item.likelihood) * g.likelihoods[t][i]; - self.skills.get_mut(item.agent).unwrap().likelihood = new_likelihood; - item.likelihood = g.likelihoods[t][i]; + for (t, team) in event.teams.iter_mut().enumerate() { + for (i, item) in team.items.iter_mut().enumerate() { + let old_likelihood = self.skills.get(item.agent).unwrap().likelihood; + let new_likelihood = + (old_likelihood / item.likelihood) * g.likelihoods[t][i]; + self.skills.get_mut(item.agent).unwrap().likelihood = new_likelihood; + item.likelihood = g.likelihoods[t][i]; + } + } + + event.evidence = g.evidence; + } + } else { + self.sweep_color_groups(agents); + } + } + + /// Full event sweep using the color-group partition. Colors are processed + /// sequentially; within each color the inner loop is parallel under rayon. + /// + /// Events within each color group touch disjoint agent sets (guaranteed by + /// the greedy coloring). This lets each rayon thread write directly to its + /// events' skill likelihoods without a deferred-apply step, matching the + /// sequential path's allocation profile. The unsafe block is sound because: + /// 1. `self.events[range]` and `self.skills` are separate fields → disjoint. + /// 2. Events in the same color group access disjoint `Index` values in + /// `self.skills`, so concurrent writes land on different memory locations. + /// 3. Each event only writes to its own items' likelihoods (no sharing). + #[cfg(feature = "rayon")] + fn sweep_color_groups>(&mut self, agents: &CompetitorStore) { + use rayon::prelude::*; + + thread_local! { + static ARENA: std::cell::RefCell = + std::cell::RefCell::new(ScratchArena::new()); + } + + // Minimum color-group size to justify rayon's task-spawn overhead. + // Below this threshold, process events sequentially to avoid regression + // on small per-slice workloads. + const RAYON_THRESHOLD: usize = 64; + + for color_idx in 0..self.color_groups.groups.len() { + let group_len = self.color_groups.groups[color_idx].len(); + if group_len == 0 { + continue; + } + let range = self.color_groups.color_range(color_idx); + let p_draw = self.p_draw; + + if group_len >= RAYON_THRESHOLD { + // Obtain a raw pointer from the unique `&mut self.skills` reference. + // Casting back to `&mut` inside the closure is sound because: + // 1. The pointer originates from a `&mut` — no aliasing with shared refs. + // 2. Events in the same color group touch disjoint `Index` slots in the + // underlying Vec, so concurrent writes from different threads land on + // different memory locations — no data race. + // 3. `self.events[range]` and `self.skills` are separate struct fields, + // so the borrow splits cleanly. + let skills_addr: usize = (&mut self.skills as *mut SkillStore) as usize; + self.events[range].par_iter_mut().for_each(move |ev| { + // SAFETY: see above. + let skills: &mut SkillStore = unsafe { &mut *(skills_addr as *mut SkillStore) }; + ARENA.with(|cell| { + let mut arena = cell.borrow_mut(); + arena.reset(); + ev.iteration_direct(skills, agents, p_draw, &mut arena); + }); + }); + } else { + for ev in &mut self.events[range] { + ev.iteration_direct(&mut self.skills, agents, p_draw, &mut self.arena); } } + } + } - event.evidence = g.evidence; + /// Full event sweep using the color-group partition, sequential direct-write path. + /// Events within each color group are updated inline — no EventOutput allocation — + /// matching the T2 performance profile. + #[cfg(not(feature = "rayon"))] + fn sweep_color_groups>(&mut self, agents: &CompetitorStore) { + for color_idx in 0..self.color_groups.groups.len() { + if self.color_groups.groups[color_idx].is_empty() { + continue; + } + let range = self.color_groups.color_range(color_idx); + + // Borrow self.events as a mutable slice for this color range. + // self.skills and self.arena are separate fields — disjoint borrows are + // allowed within a single method body. + let p_draw = self.p_draw; + for ev in &mut self.events[range] { + ev.iteration_direct(&mut self.skills, agents, p_draw, &mut self.arena); + } } } @@ -662,4 +820,67 @@ mod tests { epsilon = 1e-6 ); } + + #[test] + fn time_slice_color_groups_reorders_events() { + // ev0: [a, b]; ev1: [c, d]; ev2: [a, c] + // Greedy coloring: ev0→c0, ev1→c0 (disjoint), ev2→c1 (overlaps both). + // After recompute_color_groups, physical order is [ev0, ev1, ev2] + // and groups == [[0, 1], [2]]. + let mut index_map = KeyTable::new(); + + let a = index_map.get_or_create("a"); + let b = index_map.get_or_create("b"); + let c = index_map.get_or_create("c"); + let d = index_map.get_or_create("d"); + + let mut agents: CompetitorStore = CompetitorStore::new(); + + for agent in [a, b, c, d] { + agents.insert( + agent, + Competitor { + rating: Rating::new( + Gaussian::from_ms(25.0, 25.0 / 3.0), + 25.0 / 6.0, + ConstantDrift(25.0 / 300.0), + ), + ..Default::default() + }, + ); + } + + let mut ts = TimeSlice::new(0i64, 0.0); + + ts.add_events( + vec![ + vec![vec![a], vec![b]], + vec![vec![c], vec![d]], + vec![vec![a], vec![c]], + ], + vec![vec![1.0, 0.0], vec![1.0, 0.0], vec![1.0, 0.0]], + vec![], + &agents, + ); + + assert_eq!(ts.color_groups.n_colors(), 2); + assert_eq!(ts.color_groups.groups[0], vec![0, 1]); + assert_eq!(ts.color_groups.groups[1], vec![2]); + + assert_eq!(ts.color_groups.color_range(0), 0..2); + assert_eq!(ts.color_groups.color_range(1), 2..3); + + // Events at positions 0 and 1 (color 0) must be disjoint — verify by + // checking that the agent sets of self.events[0] and self.events[1] do + // not include the agent at self.events[2]. + let agents_in_ev2: Vec = ts.events[2].iter_agents().collect(); + let agents_in_ev0: Vec = ts.events[0].iter_agents().collect(); + let agents_in_ev1: Vec = ts.events[1].iter_agents().collect(); + // ev0 and ev1 must be disjoint from each other (color-0 invariant). + assert!(agents_in_ev0.iter().all(|ag| !agents_in_ev1.contains(ag))); + // ev2 must share an agent with ev0 or ev1 (it needed its own color). + let ev2_overlaps_ev0 = agents_in_ev2.iter().any(|ag| agents_in_ev0.contains(ag)); + let ev2_overlaps_ev1 = agents_in_ev2.iter().any(|ag| agents_in_ev1.contains(ag)); + assert!(ev2_overlaps_ev0 || ev2_overlaps_ev1); + } } diff --git a/tests/determinism.rs b/tests/determinism.rs new file mode 100644 index 0000000..2f336d2 --- /dev/null +++ b/tests/determinism.rs @@ -0,0 +1,100 @@ +//! Determinism tests: identical posteriors across RAYON_NUM_THREADS +//! values. Only compiled with the `rayon` feature. + +#![cfg(feature = "rayon")] + +use smallvec::smallvec; +use trueskill_tt::{ConstantDrift, ConvergenceOptions, Event, History, Member, Outcome, Team}; + +/// Build a deterministic workload using a simple LCG (no external rand crate). +fn build_and_converge(seed: u64) -> Vec<(i64, trueskill_tt::Gaussian)> { + let mut h = History::::builder_with_key() + .mu(25.0) + .sigma(25.0 / 3.0) + .beta(25.0 / 6.0) + .drift(ConstantDrift(25.0 / 300.0)) + .convergence(ConvergenceOptions { + max_iter: 30, + epsilon: 1e-6, + }) + .build(); + + // LCG for deterministic pseudo-random ints. + let mut rng = seed; + let mut next = || { + rng = rng + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + rng + }; + + let mut events: Vec> = Vec::with_capacity(200); + for ev_i in 0..200 { + let a = (next() % 40) as usize; + let mut b = (next() % 40) as usize; + while b == a { + b = (next() % 40) as usize; + } + // ~10 events per slice so color groups have material parallelism. + events.push(Event { + time: (ev_i as i64 / 10) + 1, + teams: smallvec![ + Team::with_members([Member::new(format!("p{a}"))]), + Team::with_members([Member::new(format!("p{b}"))]), + ], + outcome: Outcome::winner((next() % 2) as u32, 2), + }); + } + h.add_events(events).unwrap(); + h.converge().unwrap(); + // Sample one competitor's curve for the comparison. + h.learning_curve("p0") +} + +#[test] +fn posteriors_identical_across_thread_counts() { + let sizes = [1usize, 2, 4, 8]; + let mut results: Vec> = Vec::new(); + for &n in &sizes { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(n) + .build() + .expect("rayon pool build"); + let curve = pool.install(|| build_and_converge(42)); + results.push(curve); + } + + let reference = &results[0]; + for (i, curve) in results.iter().enumerate().skip(1) { + assert_eq!( + curve.len(), + reference.len(), + "curve length differs at {n} threads", + n = sizes[i], + ); + for (j, (&(t_ref, g_ref), &(t, g))) in reference.iter().zip(curve.iter()).enumerate() { + assert_eq!( + t_ref, + t, + "time point {j} differs at {n} threads: ref={t_ref} vs got={t}", + n = sizes[i], + ); + assert_eq!( + g_ref.mu().to_bits(), + g.mu().to_bits(), + "mu bits differ at {n} threads, time {t}: ref={ref_mu} got={got_mu}", + n = sizes[i], + ref_mu = g_ref.mu(), + got_mu = g.mu(), + ); + assert_eq!( + g_ref.sigma().to_bits(), + g.sigma().to_bits(), + "sigma bits differ at {n} threads, time {t}: ref={ref_sigma} got={got_sigma}", + n = sizes[i], + ref_sigma = g_ref.sigma(), + got_sigma = g.sigma(), + ); + } + } +}