From 6bf3e7e2947c5aa1c2ebfd81d103f933fe2e6ab4 Mon Sep 17 00:00:00 2001 From: Anders Olsson Date: Fri, 24 Apr 2026 13:01:01 +0000 Subject: [PATCH] T3: rayon-backed concurrency (opt-in) (#2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements T3 of `docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md` Section 6. Plan: `docs/superpowers/plans/2026-04-24-t3-concurrency.md` (11 tasks). ## Summary ### Breaking - `Send + Sync` bounds added to public traits: `Time`, `Drift`, `Observer`, `Factor`, `Schedule`. All built-in impls satisfy these via auto-derive; downstream custom impls will need the bounds. ### New - Opt-in `rayon` cargo feature. When enabled: - Within-slice event iteration runs color-group events in parallel via `par_iter_mut` (`TimeSlice::sweep_color_groups`). - `History::learning_curves` computes per-slice posteriors in parallel; merges sequentially in slice order. - `History::log_evidence` / `log_evidence_for` use per-slice parallel computation with deterministic sequential reduction (sum in slice order) — bit-identical to the sequential baseline. - `ColorGroups` infrastructure (`src/color_group.rs`) with greedy graph coloring. Events sharing no `Index` go into the same color group; events in the same group can run concurrently without touching each other's skills. - `tests/determinism.rs` asserts bit-identical posteriors across `RAYON_NUM_THREADS={1, 2, 4, 8}`. - `benches/history_converge.rs` measures end-to-end convergence on three workload shapes. ## Performance ### Sequential (no rayon, default build) | Metric | Before T3 | After T3 | Delta | |---|---|---|---| | `Batch::iteration` | 22.88 µs | 23.23 µs | **+1.5%** (noise) | | `Gaussian::*` | ≈218–264 ps | ≈236 ps | within noise | **No sequential regression.** Default build is as fast as T2. ### Parallel (`--features rayon`, Apple M5 Pro, auto thread count) | Workload | Sequential | Parallel | Speedup | |---|---:|---:|---:| | 500 events / 100 competitors / 10 per slice | 4.03 ms | 4.24 ms | **1.0×** | | 2000 events / 200 competitors / 20 per slice | 20.18 ms | 19.82 ms | **1.0×** | | 5000 events / 50000 competitors / 1 slice | 11.88 ms | 9.10 ms | **1.3×** | ### ⚠️ The spec's >=2× target was not met on realistic workloads. T3's within-slice color-group parallelism only shows material benefit when a slice holds many events AND the competitor pool is large enough to give the greedy coloring room to partition. Typical TrueSkill workloads (tens of events per slice) don't fit that profile — rayon's task-spawn overhead dominates. **Cross-slice parallelism (dirty-bit slice skipping per spec Section 5) is the natural next step** for real-workload speedup and would deliver the spec's ~50–500× online-add speedup. Deferred to a future tier. ## Determinism `tests/determinism.rs` runs a 200-event history at thread counts {1, 2, 4, 8} via `rayon::ThreadPoolBuilder::install` and asserts every `(time, posterior)` pair has bit-identical `mu` and `sigma` (compared via `f64::to_bits()`). Passes. ## Internals - Parallel path uses an `unsafe` block to concurrently write to `SkillStore` from color-group-disjoint events. Soundness rests on the color-group invariant (events in the same color touch no shared `Index`), guaranteed by construction in `TimeSlice::recompute_color_groups`. Sequential path unchanged from T2. - `RAYON_THRESHOLD = 64` — color groups smaller than this fall back to sequential inside `sweep_color_groups` to avoid task-spawn overhead. - Thread-local `ScratchArena` per rayon worker thread. ## Test plan - [x] `cargo test --features approx` — 96 tests pass (74 lib + 22 integration) - [x] `cargo test --features approx,rayon` — 97 tests pass (+1 determinism) - [x] `cargo clippy --all-targets --features approx -- -D warnings` — clean - [x] `cargo clippy --all-targets --features approx,rayon -- -D warnings` — clean - [x] `cargo +nightly fmt --check` — clean - [x] `cargo bench --bench batch --features approx` — 23.23 µs (no regression vs T2) - [x] `cargo bench --bench history_converge --features approx,rayon` — runs on all three workloads - [x] Bit-identical posteriors across `RAYON_NUM_THREADS={1, 2, 4, 8}` — verified ## Commit history 13 commits on `t3-concurrency`. Each task is self-contained and bisectable. See `git log main..t3-concurrency` for the full list. ## Deferred - **Cross-slice parallelism** (dirty-bit slice skipping) — the path that would actually speed up typical TrueSkill workloads. - **Default-on `rayon` feature** — spec called for default-on; we keep it opt-in until the feature proves stable in production use. - **Synchronous-EP schedule with barrier merge** — alternative parallel strategy per spec Section 6. - **`MarginFactor` / `Outcome::Scored`** — T4. - **`Damped` / `Residual` schedules** — T4. - **N-team `predict_outcome`** — T4. - **`Game::custom` full ergonomics** — T4. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Reviewed-on: https://git.aceofba.se/logaritmisk/trueskill-tt/pulls/2 Co-authored-by: Anders Olsson Co-committed-by: Anders Olsson --- CHANGELOG.md | 60 + Cargo.toml | 9 + benches/baseline.txt | 32 + benches/history_converge.rs | 116 ++ .../plans/2026-04-24-t3-concurrency.md | 1249 +++++++++++++++++ src/color_group.rs | 158 +++ src/drift.rs | 2 +- src/factor/mod.rs | 2 +- src/history.rs | 65 +- src/lib.rs | 1 + src/observer.rs | 5 +- src/schedule.rs | 2 +- src/time.rs | 2 +- src/time_slice.rs | 255 +++- tests/determinism.rs | 100 ++ 15 files changed, 2022 insertions(+), 36 deletions(-) create mode 100644 benches/history_converge.rs create mode 100644 docs/superpowers/plans/2026-04-24-t3-concurrency.md create mode 100644 src/color_group.rs create mode 100644 tests/determinism.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index ce3ed37..e5136db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,66 @@ All notable changes to this project will be documented in this file. +## Unreleased — T3 concurrency + +Adds rayon-backed parallel paths per Section 6 of +`docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md`. + +### Breaking + +- `Send + Sync` bounds added to public traits: `Time`, `Drift`, + `Observer`, `Factor`, `Schedule`. All built-in impls satisfy these + via auto-derive, but downstream custom impls that aren't thread-safe + will need the bounds. + +### New + +- Opt-in `rayon` cargo feature. When enabled: + - Within-slice event iteration runs color-group events in parallel + via `par_iter_mut` (`TimeSlice::sweep_color_groups`). + - `History::learning_curves` computes per-slice posteriors in + parallel, merges sequentially in slice order. + - `History::log_evidence` / `log_evidence_for` use per-slice parallel + computation with deterministic sequential reduction (sum in slice + order) — bit-identical to the sequential baseline. +- `ColorGroups` internal infrastructure with greedy graph coloring + (`src/color_group.rs`). Events sharing no `Index` go into the same + color group; events in the same group can run concurrently without + touching each other's skills. +- `tests/determinism.rs` asserts bit-identical posteriors across + `RAYON_NUM_THREADS={1, 2, 4, 8}`. +- `benches/history_converge.rs` measures end-to-end convergence on + three workload shapes. + +### Performance notes + +- Default build (no rayon): `Batch::iteration` 23.23 µs — no regression + vs T2. +- With `--features rayon`: + - 500 events / 100 competitors / 10 per slice: 1.0× speedup. + - 2000 events / 200 competitors / 20 per slice: 1.0× speedup. + - 5000 events in one slice / 50k competitors: **1.3× speedup.** +- The spec targeted >2× speedup on 8-core offline converge. This is + only achievable on workloads with many events-per-slice AND large + competitor pools. **Typical TrueSkill workloads (tens of events + per slice) do not materially benefit from T3's within-slice + parallelism** because rayon's task-spawn overhead dominates. +- Cross-slice parallelism (dirty-bit slice skipping per spec Section + 5) is the natural next step for real workload speedup — deferred + to a future tier. + +### Internals + +- The parallel path uses an `unsafe` block to concurrently write to + `SkillStore` from color-group-disjoint events. Soundness rests on + the color-group invariant (events in the same color touch no shared + `Index`), which is guaranteed by construction in + `TimeSlice::recompute_color_groups`. Sequential path unchanged. +- `RAYON_THRESHOLD = 64` — color groups smaller than this fall back to + sequential iteration inside the parallel `sweep_color_groups` to + avoid rayon's task-spawn overhead. +- Thread-local `ScratchArena` per rayon worker thread. + ## Unreleased — T2 new API surface Breaking: every renamed type and the new public API land together per diff --git a/Cargo.toml b/Cargo.toml index f0307df..51da65d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,10 +14,19 @@ harness = false name = "gaussian" harness = false +[[bench]] +name = "history_converge" +harness = false + [dependencies] approx = { version = "0.5.1", optional = true } +rayon = { version = "1", optional = true } smallvec = "1" +[features] +approx = ["dep:approx"] +rayon = ["dep:rayon"] + [dev-dependencies] criterion = "0.5" plotters = { version = "0.3", default-features = false, features = ["svg_backend", "all_elements", "all_series"] } diff --git a/benches/baseline.txt b/benches/baseline.txt index 26f63ae..2d6e7f2 100644 --- a/benches/baseline.txt +++ b/benches/baseline.txt @@ -98,3 +98,35 @@ Gaussian::tau 260.80 ps (unchanged) # learning_curves_by_index(), nested-Vec public add_events(). # - 90 tests green: 68 lib + 10 api_shape + 6 game + 4 record_winner + # 2 equivalence. + +# After T3 (2026-04-24, same hardware) + +Batch::iteration (seq, no rayon) 23.23 µs (matches T2 baseline; no regression) +Batch::iteration (rayon, small slice) 24.57 µs (within noise; small workloads pay rayon overhead) +Gaussian::add 236.62 ps (unchanged) +Gaussian::sub 236.43 ps (unchanged) +Gaussian::mul 237.05 ps (unchanged) +Gaussian::div 236.07 ps (unchanged) + +# End-to-end history_converge benchmark (Apple M5 Pro, RAYON_NUM_THREADS=auto): +# workload seq rayon speedup +# 500 events, 100 competitors, 10/slice 4.03 ms 4.24 ms 1.0x +# 2000 events, 200 competitors, 20/slice 20.18 ms 19.82 ms 1.0x +# 5000 events, 50000 competitors, 1 slice 11.88 ms 9.10 ms 1.3x +# +# Notes: +# - T3's within-slice color-group parallelism only materializes a speedup +# when a slice holds many events with disjoint competitor sets. Typical +# TrueSkill workloads (tens of events per slice) don't show measurable +# benefit from rayon. +# - The pre-revert SmallVec experiment hit 2x on the 5000-event workload +# but regressed sequential Batch::iteration by 28%. The tradeoff wasn't +# worth it for typical workloads — ShipVec<[_; 8]> inline size (1 KB per +# Game struct) hurt cache locality on the hot path. +# - Cross-slice parallelism (dirty-bit slice skipping per spec Section 5) +# is the natural next step for realistic TrueSkill workloads and would +# deliver the spec's ~50-500x online-add speedup. Deferred to T4+. +# - Determinism verified: tests/determinism.rs asserts bit-identical +# posteriors across RAYON_NUM_THREADS={1, 2, 4, 8}. +# - Send + Sync bounds added on Time, Drift, Observer, Factor, Schedule. +# - Rayon is opt-in via `--features rayon`. Default build is unchanged from T2. diff --git a/benches/history_converge.rs b/benches/history_converge.rs new file mode 100644 index 0000000..e5163a8 --- /dev/null +++ b/benches/history_converge.rs @@ -0,0 +1,116 @@ +//! End-to-end History::converge benchmark. +//! +//! Workload shapes designed to expose rayon's within-slice color-group +//! parallelism. Events in the same color group are processed in parallel +//! via direct-write with disjoint index sets (no data races). Color groups +//! smaller than a threshold fall back to the sequential path to avoid +//! rayon overhead on small workloads. +//! +//! On Apple M5 Pro, the P-core count (6) is the optimal thread count. +//! The rayon thread pool is initialised to `min(P-cores, available)` to +//! avoid scheduling onto the slower E-cores. +//! +//! ## Results (Apple M5 Pro, 2026-04-24, after SmallVec revert) +//! +//! | Workload | Sequential | Parallel | Speedup | +//! |---------------------------------------------|------------:|-----------:|--------:| +//! | History::converge/500x100@10perslice | 4.03 ms | 4.24 ms | 1.0× | +//! | History::converge/2000x200@20perslice | 20.18 ms | 19.82 ms | 1.0× | +//! | History::converge/1v1-5000x50000@5000perslice| 11.88 ms | 9.10 ms | 1.3× | +//! +//! T3 acceptance gate: ≥2× speedup on at least one workload — NOT achieved after revert. +//! The SmallVec storage that enabled the 2× gate caused a +28% regression in the +//! sequential Batch::iteration benchmark and was reverted. Small workloads still fall +//! below the RAYON_THRESHOLD (64 events/color) and run sequentially with near-zero overhead. + +use criterion::{BatchSize, Criterion, criterion_group, criterion_main}; +use smallvec::smallvec; +use trueskill_tt::{ + ConstantDrift, ConvergenceOptions, Event, History, Member, NullObserver, Outcome, Team, +}; + +fn build_history_1v1( + n_events: usize, + n_competitors: usize, + events_per_slice: usize, + seed: u64, +) -> History { + let mut rng = seed; + let mut next = || { + rng = rng + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + rng + }; + + let mut h = History::::builder_with_key() + .mu(25.0) + .sigma(25.0 / 3.0) + .beta(25.0 / 6.0) + .drift(ConstantDrift(25.0 / 300.0)) + .convergence(ConvergenceOptions { + max_iter: 30, + epsilon: 1e-6, + }) + .build(); + + let mut events: Vec> = Vec::with_capacity(n_events); + for ev_i in 0..n_events { + let a = (next() as usize) % n_competitors; + let mut b = (next() as usize) % n_competitors; + while b == a { + b = (next() as usize) % n_competitors; + } + events.push(Event { + time: (ev_i as i64 / events_per_slice as i64) + 1, + teams: smallvec![ + Team::with_members([Member::new(format!("p{a}"))]), + Team::with_members([Member::new(format!("p{b}"))]), + ], + outcome: Outcome::winner((next() % 2) as u32, 2), + }); + } + h.add_events(events).unwrap(); + h +} + +fn bench_converge(c: &mut Criterion) { + // Two original task workloads (small per-slice event count; + // fall below RAYON_THRESHOLD so sequential path runs — near-zero overhead). + c.bench_function("History::converge/500x100@10perslice", |b| { + b.iter_batched( + || build_history_1v1(500, 100, 10, 42), + |mut h| { + h.converge().unwrap(); + }, + BatchSize::SmallInput, + ); + }); + + c.bench_function("History::converge/2000x200@20perslice", |b| { + b.iter_batched( + || build_history_1v1(2000, 200, 20, 42), + |mut h| { + h.converge().unwrap(); + }, + BatchSize::SmallInput, + ); + }); + + // Large single-slice workload: 5000 events, 50000 competitors. + // All events in one slice → color-0 gets ~4900 disjoint events, well above + // the 64-event RAYON_THRESHOLD. 30 iterations × 1 slice = 30 sweeps, each + // parallelised across P-core threads. Shows ≥2× speedup. + c.bench_function("History::converge/1v1-5000x50000@5000perslice", |b| { + b.iter_batched( + || build_history_1v1(5000, 50000, 5000, 42), + |mut h| { + h.converge().unwrap(); + }, + BatchSize::SmallInput, + ); + }); +} + +criterion_group!(benches, bench_converge); +criterion_main!(benches); diff --git a/docs/superpowers/plans/2026-04-24-t3-concurrency.md b/docs/superpowers/plans/2026-04-24-t3-concurrency.md new file mode 100644 index 0000000..24ec9fe --- /dev/null +++ b/docs/superpowers/plans/2026-04-24-t3-concurrency.md @@ -0,0 +1,1249 @@ +# T3 — Concurrency Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Ship the T3 tier from `docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md` Section 6: `Send + Sync` bounds on public traits, color-group partitioning for within-slice event independence, and rayon-backed parallel paths on within-slice event iteration, `learning_curves`, and `log_evidence_for`. Deterministic posteriors across `RAYON_NUM_THREADS={1, 2, 4, 8}`; >2× speedup on an 8-core offline-converge benchmark. + +**Architecture:** Concurrency lands as a single feature flag (`rayon`, opt-in for T3; the spec suggests default-on but we defer that flip until the feature is proven stable). All parallel paths are hidden behind `#[cfg(feature = "rayon")]` with sequential fallbacks. Within-slice parallelism exploits graph coloring: events sharing no `Index` go into the same color group and run concurrently; across color groups, execution is strictly sequential. This preserves the exact async-EP semantics of T2 at any thread count. Reductions over f64 (`log_evidence_for`, `predict_quality`) use two-stage parallel-then-sequential-reduce so sums are bit-identical regardless of thread count. + +**Tech Stack:** Rust 2024 edition. New optional dependency: `rayon = "1"`. Builds on T2 (`History`, `Event`, `Outcome`, `Observer`, `factors` module). + +## Design decisions + +Called out explicitly so reviewers can override before execution: + +1. **Rayon is opt-in** (`cargo feature`), not default-on. Simplifies CI, keeps the default build lean. We flip to default-on in a follow-up once the feature is shown to be stable under field use. +2. **Greedy graph coloring** for the within-slice partition: for each event in ingestion order, assign the lowest color whose existing members share no `Index` with the event. Optimality is not the target — events per slice is small (~50), and greedy finishes in O(n·c·m) where c is colors and m is team size. Rebalancing is a T4 follow-up if benchmarks show it helps. +3. **Scope of parallelism:** within-slice color groups, `learning_curves`, `log_evidence_for`, `predict_quality`. Cross-slice iteration in `History::converge` stays **sequential** — the forward/backward sweep has true data dependencies across slices. Parallelizing across slices requires a separate algorithm (the spec's dirty-bit slice skipping, deferred to beyond-T3). +4. **Deterministic reductions:** `.par_iter().map().collect::>().into_iter().sum()` — each slice's contribution is computed in parallel, then summed sequentially in slice order. Per-element values are bit-identical to T2 (no extra floating-point additions reordered). +5. **Within-game inference stays sequential.** A single `Game::ranked` / `Game::likelihoods` call is too small (~20 µs) to amortize rayon's ~5 µs task overhead. The spec's table confirms this. + +## Acceptance criteria + +- `cargo test --features approx` — all tests pass (T2 baseline: 90 tests). +- `cargo test --features approx,rayon` — all tests pass; determinism tests across `RAYON_NUM_THREADS={1, 2, 4, 8}` produce bit-identical posteriors. +- `cargo clippy --all-targets --features approx -- -D warnings` — clean. +- `cargo clippy --all-targets --features approx,rayon -- -D warnings` — clean. +- `cargo +nightly fmt --check` — clean. +- `cargo bench --bench history_converge --features approx,rayon` — >2× speedup on an 8-core machine vs the sequential baseline. +- All public traits (`Time`, `Drift`, `Observer`, `Factor`, `Schedule`) have `Send + Sync` bounds; their blanket/default impls (`i64`, `Untimed`, `ConstantDrift`, `NullObserver`) all naturally satisfy the new bounds — no user code should need changes. +- The `rayon` feature is opt-in; default build without it compiles and passes all tests. + +## File map + +**New files:** + +| Path | Responsibility | +|---|---| +| `src/color_group.rs` | Greedy graph coloring for within-slice event partitioning. | +| `src/parallel.rs` | `#[cfg(feature = "rayon")]` helpers: `par_or_seq` iterators, ordered reductions, thread-count determinism tests. | +| `benches/history_converge.rs` | End-to-end convergence benchmark (scales to show rayon benefit). | +| `tests/determinism.rs` | Runs the same convergence with different `RAYON_NUM_THREADS` and asserts bit-identical posteriors. | + +**Modified:** + +| Path | What changes | +|---|---| +| `Cargo.toml` | Add `rayon = { version = "1", optional = true }`; declare `rayon` feature. Add `[[bench]] name = "history_converge"`. | +| `src/lib.rs` | Declare new `color_group` + `parallel` modules (both private). | +| `src/time.rs` | Add `Send + Sync + 'static` bounds to `Time`. | +| `src/drift.rs` | Add `Send + Sync` bound to `Drift`. | +| `src/observer.rs` | Add `Send + Sync` bound to `Observer`. | +| `src/factor/mod.rs` | Add `Send + Sync` bound to `Factor`. | +| `src/schedule.rs` | Add `Send + Sync` bound to `Schedule`. | +| `src/time_slice.rs` | Store pre-computed color-group partition; parallel event iteration behind `#[cfg(feature = "rayon")]`. | +| `src/history.rs` | Parallel `learning_curves`, `learning_curves_by_index`, `log_evidence`, `log_evidence_for` behind `#[cfg(feature = "rayon")]` with ordered reductions. | + +--- + +## Task 1: Pre-flight — verify green on main, create t3 branch, capture baseline + +**Files:** none + +- [ ] **Step 1: Confirm on main, clean tree** + +```bash +git status +git rev-parse --abbrev-ref HEAD +``` + +Expected: clean; on `main`. + +- [ ] **Step 2: Create the T3 branch** + +```bash +git checkout -b t3-concurrency +``` + +- [ ] **Step 3: Confirm all tests pass** + +```bash +cargo test --features approx +``` + +Expected: 90 tests pass (68 lib + 10 api_shape + 6 game + 4 record_winner + 2 equivalence). + +- [ ] **Step 4: Capture current bench baseline** + +```bash +cargo bench --bench batch 2>&1 | grep "Batch::iteration" +``` + +Record the number — it'll be the sequential-path baseline to verify no regression. + +- [ ] **Step 5: No commit** — verification only. + +--- + +## Task 2: Add `rayon` as optional dependency + feature flag + +**Files:** +- Modify: `Cargo.toml` + +- [ ] **Step 1: Add the dependency and feature** + +Under `[dependencies]` add: + +```toml +rayon = { version = "1", optional = true } +``` + +Add a `[features]` section (if not present — check first): + +```toml +[features] +rayon = ["dep:rayon"] +``` + +(The existing `approx` feature already exists as a `dep:approx`-style optional dependency; use the same convention.) + +- [ ] **Step 2: Verify both builds compile** + +```bash +cargo build --features approx +cargo build --features approx,rayon +``` + +Both must succeed. The second one pulls `rayon` into the dependency graph but nothing uses it yet. + +- [ ] **Step 3: Commit** + +```bash +git add Cargo.toml Cargo.lock +git commit -m "$(cat <<'EOF' +feat(cargo): add rayon as optional dependency + +Opt-in feature flag — users who want parallel paths build with +--features rayon. Default build remains single-threaded. + +Spec Section 6 calls for default-on; we defer that flip until the +feature is stable under field use. + +Part of T3 of docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md. +EOF +)" +``` + +--- + +## Task 3: Add `Send + Sync` bounds to public traits + +**Files:** +- Modify: `src/time.rs`, `src/drift.rs`, `src/observer.rs`, `src/factor/mod.rs`, `src/schedule.rs` + +This is a minor breaking API change: downstream code with user-defined `Drift` / `Observer` / `Factor` / `Schedule` impls that aren't `Send + Sync` will fail to compile. For our crate, all built-in types (`i64`, `Untimed`, `ConstantDrift`, `NullObserver`, `EpsilonOrMax`, `TeamSumFactor`, `RankDiffFactor`, `TruncFactor`, `BuiltinFactor`) naturally satisfy these bounds — no internal code changes. + +- [ ] **Step 1: Update `Time` trait** + +```rust +// src/time.rs +pub trait Time: Copy + Ord + Send + Sync + 'static { + fn elapsed_to(&self, later: &Self) -> i64; +} +``` + +- [ ] **Step 2: Update `Drift` trait** + +```rust +// src/drift.rs +pub trait Drift: Copy + Debug + Send + Sync { + fn variance_delta(&self, from: &T, to: &T) -> f64; + fn variance_for_elapsed(&self, elapsed: i64) -> f64; +} +``` + +- [ ] **Step 3: Update `Observer` trait** + +```rust +// src/observer.rs +pub trait Observer: Send + Sync { + fn on_iteration_end(&self, _iter: usize, _max_step: (f64, f64)) {} + fn on_batch_processed(&self, _time: &T, _slice_idx: usize, _n_events: usize) {} + fn on_converged(&self, _iters: usize, _final_step: (f64, f64), _converged: bool) {} +} +``` + +- [ ] **Step 4: Update `Factor` trait** + +```rust +// src/factor/mod.rs +pub trait Factor: Send + Sync { + fn propagate(&mut self, vars: &mut VarStore) -> (f64, f64); + fn log_evidence(&self, _vars: &VarStore) -> f64 { 0.0 } +} +``` + +- [ ] **Step 5: Update `Schedule` trait** + +```rust +// src/schedule.rs +pub trait Schedule: Send + Sync { + fn run(&self, factors: &mut [BuiltinFactor], vars: &mut VarStore) -> ScheduleReport; +} +``` + +- [ ] **Step 6: Verify both feature combinations still compile and test** + +```bash +cargo build --features approx +cargo build --features approx,rayon +cargo test --features approx --lib +cargo clippy --all-targets --features approx -- -D warnings +``` + +If any built-in type fails the auto-`Send`/`Sync` check, investigate — something non-thread-safe slipped in. Likely suspects: raw pointers, `Rc`, `RefCell`, non-static references. None are expected in this codebase; if found, convert to `Arc`/`Mutex`/`RwLock` as appropriate. + +- [ ] **Step 7: Commit** + +```bash +cargo +nightly fmt +git add -A +git commit -m "$(cat <<'EOF' +feat(api): add Send + Sync bounds to public traits + +Required for T3 rayon-based parallelism. Affected traits: +- Time (+ Send + Sync + 'static) +- Drift (+ Send + Sync) +- Observer (+ Send + Sync) +- Factor (+ Send + Sync) +- Schedule (+ Send + Sync) + +All built-in impls (i64, Untimed, ConstantDrift, NullObserver, +EpsilonOrMax, TeamSumFactor, RankDiffFactor, TruncFactor, +BuiltinFactor) naturally satisfy these bounds — no internal changes +needed. + +Minor breaking change: downstream impls that aren't already +thread-safe will fail to compile until they add the bounds. + +Part of T3 of docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md. +EOF +)" +``` + +--- + +## Task 4: Implement greedy color-group partitioning + +**Files:** +- Create: `src/color_group.rs` +- Modify: `src/lib.rs` (register module) + +- [ ] **Step 1: Create `src/color_group.rs`** + +```rust +//! Greedy graph coloring for within-slice event independence. +//! +//! Events sharing no `Index` can be processed in parallel under async-EP +//! semantics. This module partitions a list of events into "colors" such +//! that events of the same color touch disjoint index sets. +//! +//! The algorithm is greedy: for each event in ingestion order, place it in +//! the lowest-numbered color whose existing members share no `Index`. If +//! no existing color accepts the event, open a new color. +//! +//! Complexity: O(n × c × m) where n is events, c is colors (small, ≤ 5 in +//! practice), and m is average team size. + +use std::collections::HashSet; + +use crate::Index; + +/// Partition of event indices into color groups. +/// +/// Each inner `Vec` holds the indices (into the original events +/// array) of events assigned to one color. Colors are iterated in ascending +/// order by convention. +#[derive(Clone, Debug, Default)] +pub(crate) struct ColorGroups { + pub(crate) groups: Vec>, +} + +impl ColorGroups { + pub(crate) fn new() -> Self { + Self::default() + } + + pub(crate) fn n_colors(&self) -> usize { + self.groups.len() + } + + pub(crate) fn is_empty(&self) -> bool { + self.groups.is_empty() + } + + /// Total event count across all colors. + pub(crate) fn total_events(&self) -> usize { + self.groups.iter().map(|g| g.len()).sum() + } +} + +/// Compute color groups greedily. +/// +/// `event_indices` yields, for each event, the set of `Index` values that +/// event touches. The returned `ColorGroups` has one inner `Vec` per +/// color, containing event indices in the order they were assigned. +pub(crate) fn color_greedy(n_events: usize, index_set: F) -> ColorGroups +where + F: Fn(usize) -> I, + I: IntoIterator, +{ + let mut groups: Vec> = Vec::new(); + let mut members: Vec> = Vec::new(); + + for ev_idx in 0..n_events { + let ev_members: HashSet = index_set(ev_idx).into_iter().collect(); + // Find first color whose member-set is disjoint from this event's indices. + let chosen = members + .iter() + .position(|m| m.is_disjoint(&ev_members)); + let color_idx = match chosen { + Some(c) => c, + None => { + groups.push(Vec::new()); + members.push(HashSet::new()); + groups.len() - 1 + } + }; + groups[color_idx].push(ev_idx); + members[color_idx].extend(ev_members); + } + + ColorGroups { groups } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn idx(i: usize) -> Index { + Index::from(i) + } + + #[test] + fn single_event_gets_one_color() { + let cg = color_greedy(1, |_| vec![idx(0), idx(1)]); + assert_eq!(cg.n_colors(), 1); + assert_eq!(cg.groups[0], vec![0]); + } + + #[test] + fn disjoint_events_share_a_color() { + // Event 0 touches {0, 1}; event 1 touches {2, 3}. + let cg = color_greedy(2, |i| match i { + 0 => vec![idx(0), idx(1)], + 1 => vec![idx(2), idx(3)], + _ => unreachable!(), + }); + assert_eq!(cg.n_colors(), 1); + assert_eq!(cg.groups[0], vec![0, 1]); + } + + #[test] + fn overlapping_events_need_separate_colors() { + // Event 0 touches {0, 1}; event 1 touches {1, 2}. + let cg = color_greedy(2, |i| match i { + 0 => vec![idx(0), idx(1)], + 1 => vec![idx(1), idx(2)], + _ => unreachable!(), + }); + assert_eq!(cg.n_colors(), 2); + assert_eq!(cg.groups[0], vec![0]); + assert_eq!(cg.groups[1], vec![1]); + } + + #[test] + fn three_events_two_colors() { + // Event 0: {0, 1}; event 1: {2, 3}; event 2: {0, 2}. + // Greedy: ev0→c0, ev1→c0 (disjoint), ev2 overlaps both→c1. + let cg = color_greedy(3, |i| match i { + 0 => vec![idx(0), idx(1)], + 1 => vec![idx(2), idx(3)], + 2 => vec![idx(0), idx(2)], + _ => unreachable!(), + }); + assert_eq!(cg.n_colors(), 2); + assert_eq!(cg.groups[0], vec![0, 1]); + assert_eq!(cg.groups[1], vec![2]); + } + + #[test] + fn total_events_counts_correctly() { + let cg = color_greedy(4, |_| vec![idx(0)]); + // All events touch index 0 → 4 distinct colors. + assert_eq!(cg.n_colors(), 4); + assert_eq!(cg.total_events(), 4); + } +} +``` + +- [ ] **Step 2: Register in `src/lib.rs`** + +Add (private module, alphabetical): + +```rust +mod color_group; +``` + +No public re-export — this is internal infrastructure. + +- [ ] **Step 3: Verify** + +```bash +cargo test --features approx --lib color_group +cargo clippy --all-targets --features approx -- -D warnings +cargo +nightly fmt --check +``` + +Expected: 5 tests pass in the `color_group::tests` module. + +- [ ] **Step 4: Commit** + +```bash +cargo +nightly fmt +git add src/color_group.rs src/lib.rs +git commit -m "$(cat <<'EOF' +feat(color-group): add greedy within-slice event partitioning + +ColorGroups holds a partition of event indices into color groups such +that events of the same color touch no shared Index. Computed greedily +in ingestion order: each event goes into the first color whose existing +members are disjoint from the event's indices. + +Used in T3 for safe within-slice parallelism — events in the same +color can run concurrently without touching each other's skills. + +Part of T3 of docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md. +EOF +)" +``` + +--- + +## Task 5: Store color-group partition in `TimeSlice` + +**Files:** +- Modify: `src/time_slice.rs` + +- [ ] **Step 1: Add field to `TimeSlice`** + +```rust +pub(crate) struct TimeSlice { + // … existing fields … + pub(crate) color_groups: crate::color_group::ColorGroups, +} +``` + +Initialize to empty in `TimeSlice::new`: + +```rust +pub fn new(time: T, p_draw: f64) -> Self { + Self { + // … existing initializers … + color_groups: crate::color_group::ColorGroups::new(), + } +} +``` + +- [ ] **Step 2: Recompute color groups whenever events change** + +Find the methods on `TimeSlice` that mutate `self.events` (most likely `add_events` — look at the current signature in `src/time_slice.rs`). After any mutation, recompute the partition: + +```rust +pub(crate) fn recompute_color_groups(&mut self) { + let n = self.events.len(); + self.color_groups = crate::color_group::color_greedy(n, |ev_idx| { + // Return an iterator of every Index touched by event ev_idx. + // Each event has teams; each team has items; each item has an agent: Index. + self.events[ev_idx] + .teams + .iter() + .flat_map(|t| t.items.iter().map(|it| it.agent)) + .collect::>() + }); +} +``` + +Call `recompute_color_groups()` at the end of any public/crate-visible method that mutates `self.events` (likely one or two sites). **Note the exact method name by reading `src/time_slice.rs` first** — it may differ from "add_events". + +- [ ] **Step 3: Add a sanity test** + +```rust +#[test] +fn time_slice_recomputes_color_groups() { + // Construct a time slice with 2 events sharing competitor a — they should + // end up in different color groups. + // … use existing test helpers to build the slice … + // Assert slice.color_groups.n_colors() == 2 and each group has 1 event. +} +``` + +The exact helper pattern depends on how existing `src/time_slice.rs::tests` construct slices. Mirror the existing approach. + +- [ ] **Step 4: Verify** + +```bash +cargo test --features approx --lib time_slice +cargo clippy --all-targets --features approx -- -D warnings +``` + +Expected: all existing time_slice tests still pass + 1 new test. + +- [ ] **Step 5: Commit** + +```bash +cargo +nightly fmt +git add -A +git commit -m "$(cat <<'EOF' +feat(time-slice): pre-compute color groups at ingestion + +TimeSlice now stores a ColorGroups partition recomputed whenever +events change. The partition is computed once per slice mutation and +reused on every convergence iteration, enabling the cheap within-slice +parallel sweep added in Task 6. + +Part of T3. +EOF +)" +``` + +--- + +## Task 6: Parallel within-slice event iteration (behind `rayon` feature) + +**Files:** +- Modify: `src/time_slice.rs` + +This is the core parallelism work. Within a color group, events touch disjoint `Index` values — it's safe to process them concurrently. Across colors, processing is strictly sequential. This preserves exact async-EP semantics. + +- [ ] **Step 1: Identify the event-iteration hot path** + +Read `src/time_slice.rs` to find the method that, per convergence iteration, walks all events in the slice and updates their skills. Look for patterns like `for event in self.events.iter_mut()` or `self.events.iter().for_each(...)`. This is the target for parallelization. + +Probably named `iteration` or similar. Note its full signature and current body. + +- [ ] **Step 2: Rewrite the loop as color-group-driven** + +**Sequential version** (always present, used when `rayon` is disabled): + +```rust +pub(crate) fn iteration(&mut self, …) -> (f64, f64) { + let mut max_step = (0.0_f64, 0.0_f64); + for color in &self.color_groups.groups { + for &ev_idx in color { + let step = self.events[ev_idx].iteration(…); + max_step.0 = max_step.0.max(step.0); + max_step.1 = max_step.1.max(step.1); + } + } + max_step +} +``` + +**Parallel version** (behind `cfg`): + +```rust +#[cfg(feature = "rayon")] +pub(crate) fn iteration(&mut self, …) -> (f64, f64) { + use rayon::prelude::*; + let mut max_step = (0.0_f64, 0.0_f64); + for color in &self.color_groups.groups { + // Within one color, events touch disjoint Indexes → safe to parallelize. + // SAFETY: color_greedy guarantees disjoint index sets, so the slice + // entries color[i] and color[j] (i ≠ j) can be mutably borrowed + // concurrently via rayon's work-stealing executor. We use + // par_iter over the event indices and bundle the per-event state + // into something Sync. + // + // Concretely: since `self` is &mut and we can't simultaneously have + // mut borrows into it, we need to split the borrow. Use + // events[..].par_iter_mut() filtered to the color's indices? No — + // rayon's par_iter_mut doesn't support index filtering directly. + // + // Instead: use events.as_mut_slice().par_chunks_mut() after sorting + // events so color-members are contiguous — or extract the per-event + // state into a &mut [EventState] and index directly. + // + // Practical implementation: extract the events we'll touch this color + // into a Vec<&mut Event> using split_at_mut or similar. See impl below. + let contributions: Vec<(f64, f64)> = color + .par_iter() + .map(|&ev_idx| { + // Borrow the event mutably — this requires unsafe or an + // alternate data layout that exposes color-disjoint slices. + // See the "Design note" below for the chosen approach. + todo!("color-disjoint mutable event access") + }) + .collect(); + for (d0, d1) in contributions { + max_step.0 = max_step.0.max(d0); + max_step.1 = max_step.1.max(d1); + } + } + max_step +} +``` + +**Design note on mutable access:** Rayon's `par_iter_mut` doesn't let us access arbitrary indices from the same `&mut Vec` in parallel. Options the implementer should choose between, depending on which compiles most cleanly with the existing `TimeSlice::iteration` body: + +1. **Interior mutability.** Wrap each `Event` in `Cell<…>` or `RefCell<…>`, then `Event: Sync`. This works only if the event's internal state is small/cheap to copy. Adds overhead. + +2. **Manual `split_at_mut` sequence.** Sort events into color order once (mutating `self.events` so color[0][0], color[0][1], …, color[1][0], … are contiguous), remember the boundaries, then `par_chunks_mut` over each color's contiguous range. Simple, no unsafe. Does require a one-time sort when color groups are computed. + +3. **Raw pointer juggling.** `SAFETY`-commented `unsafe` blocks that pass `*mut Event` into parallel closures. Fast, but fragile. Avoid unless (1) and (2) are benchmarked and found insufficient. + +**Recommendation: approach (2).** When color groups are computed in Task 5's `recompute_color_groups`, also physically reorder `self.events` so color members are contiguous. Then each color corresponds to a slice range `self.events[color_start..color_end]`, and `par_chunks_mut(…).for_each(…)` or `par_iter_mut()` over the range works. + +**Revised `recompute_color_groups`:** + +```rust +pub(crate) fn recompute_color_groups(&mut self) { + let n = self.events.len(); + let cg = crate::color_group::color_greedy(n, |ev_idx| { + self.events[ev_idx] + .teams + .iter() + .flat_map(|t| t.items.iter().map(|it| it.agent)) + .collect::>() + }); + + // Physically reorder self.events to match the color-group layout. + let mut reordered: Vec = Vec::with_capacity(n); + let mut ranges: Vec<(usize, usize)> = Vec::with_capacity(cg.groups.len()); + for group in &cg.groups { + let start = reordered.len(); + for &ev_idx in group { + reordered.push(std::mem::replace( + &mut self.events[ev_idx], + // Placeholder; original slot becomes garbage before the + // final swap. + Event::placeholder(), // OR use std::mem::take if Event: Default + )); + } + ranges.push((start, reordered.len())); + } + self.events = reordered; + // Rebuild cg with post-reorder indices: each group now spans a + // contiguous range. + self.color_groups = ColorGroups::from_ranges(ranges); +} +``` + +Then color[i] is event indices `[ranges[i].0 .. ranges[i].1)`. + +`ColorGroups::from_ranges(ranges: Vec<(usize, usize)>)` constructs the groups with the trivial mapping `group_i = (start..end).collect()`. + +**Pitfall**: `Event::placeholder()` or `std::mem::take`. If `Event: Default`, `std::mem::take(&mut self.events[ev_idx])` works cleanly. If not, use `Option` temporarily, or implement a cheap `Event::placeholder()`. **Before writing the replacement logic, read `src/time_slice.rs` to see if Event derives Default — if it does, use `std::mem::take`.** + +- [ ] **Step 3: Implement the parallel iteration via contiguous ranges** + +```rust +#[cfg(feature = "rayon")] +pub(crate) fn iteration(&mut self, …) -> (f64, f64) { + use rayon::prelude::*; + let mut max_step = (0.0_f64, 0.0_f64); + for range in &self.color_groups.ranges { + let slice = &mut self.events[range.0..range.1]; + let contributions: Vec<(f64, f64)> = slice + .par_iter_mut() + .map(|event| event.iteration(…)) + .collect(); + for (d0, d1) in contributions { + max_step.0 = max_step.0.max(d0); + max_step.1 = max_step.1.max(d1); + } + } + max_step +} +``` + +**IMPORTANT**: `event.iteration(…)` currently takes other borrowed state from `TimeSlice` (skills, competitors). Those borrows conflict with the `&mut self.events[...]` borrow. Resolution: + +a) Pull the shared data (`&self.skills`, `&self.competitors`) into a local variable before the par_iter, so rayon's closure captures `&` not `&self`. + +b) If the existing per-event method ALSO mutates shared state (e.g., writes to a shared SkillStore), that's a problem — it breaks the color-disjoint-index guarantee. Read the current code carefully. If per-event iteration writes to skills for indices it owns, the color-group invariant makes this safe, but you'll need unsafe to express it. In that case, fall back to approach (1) or (3) from Task 6 Step 2. + +**Read the existing `event.iteration` or equivalent FIRST before choosing.** The code may already be structured so events only mutate themselves. + +- [ ] **Step 4: Sequential fallback** + +```rust +#[cfg(not(feature = "rayon"))] +pub(crate) fn iteration(&mut self, …) -> (f64, f64) { + let mut max_step = (0.0_f64, 0.0_f64); + for range in &self.color_groups.ranges { + for event in &mut self.events[range.0..range.1] { + let step = event.iteration(…); + max_step.0 = max_step.0.max(step.0); + max_step.1 = max_step.1.max(step.1); + } + } + max_step +} +``` + +Both versions use the same color-group traversal order — behavior is identical across feature flags. + +- [ ] **Step 5: Verify both feature combinations** + +```bash +cargo test --features approx --lib +cargo test --features approx,rayon --lib +cargo clippy --all-targets --features approx -- -D warnings +cargo clippy --all-targets --features approx,rayon -- -D warnings +``` + +All 90 tests pass in both configurations. Goldens must NOT drift. + +- [ ] **Step 6: Commit** + +```bash +cargo +nightly fmt +git add -A +git commit -m "$(cat <<'EOF' +feat(time-slice): parallel within-slice iteration via rayon + +Events are reordered into color-group-contiguous ranges during +recompute_color_groups; each color's range is processed in parallel +via par_iter_mut when the rayon feature is enabled, sequentially +otherwise. The two paths produce identical results because events +within a color touch disjoint Index values (async-EP invariant). + +Feature gated: default build still sequential; --features rayon +activates the parallel path. + +Part of T3. +EOF +)" +``` + +--- + +## Task 7: Parallel `learning_curves` with ordered reduction + +**Files:** +- Modify: `src/history.rs` + +Both `learning_curves()` and `learning_curves_by_index()` currently iterate `self.time_slices` and collect per-competitor posteriors. They're embarrassingly parallel per-slice; the merge at the end must preserve slice order to keep tests deterministic. + +- [ ] **Step 1: Parallelize `learning_curves`** + +```rust +pub fn learning_curves(&self) -> HashMap> { + #[cfg(feature = "rayon")] + { + use rayon::prelude::*; + // Parallel: compute per-slice (index, time, gaussian) triples; + // collect preserves slice order (.collect::> is order-preserving). + let per_slice: Vec> = self + .time_slices + .par_iter() + .map(|ts| { + ts.skills + .iter() + .map(|(idx, sk)| (idx, ts.time, sk.posterior())) + .collect() + }) + .collect(); + // Sequential merge: iterate in slice order, push to per-key vectors. + let mut data: HashMap> = HashMap::new(); + for slice_contrib in per_slice { + for (idx, t, g) in slice_contrib { + if let Some(key) = self.keys.key(idx).cloned() { + data.entry(key).or_default().push((t, g)); + } + } + } + data + } + #[cfg(not(feature = "rayon"))] + { + // Original sequential impl (unchanged) + let mut data: HashMap> = HashMap::new(); + for ts in &self.time_slices { + for (idx, sk) in ts.skills.iter() { + if let Some(key) = self.keys.key(idx).cloned() { + data.entry(key).or_default().push((ts.time, sk.posterior())); + } + } + } + data + } +} +``` + +- [ ] **Step 2: Parallelize `learning_curves_by_index`** + +Same pattern — should it exist. (Task 20 of T2 may have removed this method; verify it's present.) If it's there, mirror the `learning_curves` parallel+sequential split. + +- [ ] **Step 3: Verify** + +```bash +cargo test --features approx,rayon +``` + +All tests pass — goldens preserved across both feature configurations. + +- [ ] **Step 4: Commit** + +```bash +cargo +nightly fmt +git add -A +git commit -m "$(cat <<'EOF' +feat(history): parallel learning_curves under rayon feature + +Per-slice posterior collection runs in parallel; merge into the +per-key HashMap is sequential in slice order to preserve deterministic +output. Sequential impl unchanged under default feature set. + +Part of T3. +EOF +)" +``` + +--- + +## Task 8: Parallel `log_evidence` / `log_evidence_for` with deterministic sum + +**Files:** +- Modify: `src/history.rs` + +`log_evidence_internal` already does `self.time_slices.iter().map(|ts| ts.log_evidence(…)).sum()`. We replace with parallel map + sequential sum. + +- [ ] **Step 1: Parallelize `log_evidence_internal`** + +```rust +pub(crate) fn log_evidence_internal(&mut self, forward: bool, targets: &[Index]) -> f64 { + #[cfg(feature = "rayon")] + { + use rayon::prelude::*; + let per_slice: Vec = self + .time_slices + .par_iter() + .map(|ts| ts.log_evidence(self.online, targets, forward, &self.competitors)) + .collect(); + // Sequential sum in slice order for bit-identical reduction. + per_slice.into_iter().sum() + } + #[cfg(not(feature = "rayon"))] + { + self.time_slices + .iter() + .map(|ts| ts.log_evidence(self.online, targets, forward, &self.competitors)) + .sum() + } +} +``` + +**Critical:** `per_slice` is a `Vec`, not a fold. The sequential `.into_iter().sum()` is bit-identical to the sequential impl because the order is the same (slice order). Rayon's `par_iter().sum()` would reorder additions and is non-deterministic. + +- [ ] **Step 2: `log_evidence_for` already wraps `log_evidence_internal`** — no further changes needed. + +- [ ] **Step 3: Verify** + +```bash +cargo test --features approx,rayon +``` + +All tests pass — goldens preserved. + +- [ ] **Step 4: Commit** + +```bash +cargo +nightly fmt +git add -A +git commit -m "$(cat <<'EOF' +feat(history): parallel log_evidence with deterministic sum + +Per-slice contribution computed in parallel; final reduction is +sequential in slice order so the sum is bit-identical to the T2 +sequential baseline. This is essential for the T3 acceptance +criterion of identical posteriors across RAYON_NUM_THREADS values. + +Part of T3. +EOF +)" +``` + +--- + +## Task 9: Determinism test across thread counts + +**Files:** +- Create: `tests/determinism.rs` + +- [ ] **Step 1: Create the test** + +```rust +//! Determinism tests: identical posteriors across RAYON_NUM_THREADS +//! values. Only meaningful when the `rayon` feature is enabled. + +#![cfg(feature = "rayon")] + +use smallvec::smallvec; +use trueskill_tt::{ + ConstantDrift, ConvergenceOptions, Event, History, Member, Outcome, Team, +}; + +fn build_and_converge(seed: u64) -> Vec<(i64, trueskill_tt::Gaussian)> { + // Seed-driven deterministic test data: ~100 events, ~30 competitors. + let mut h = History::::builder() + .mu(25.0) + .sigma(25.0 / 3.0) + .beta(25.0 / 6.0) + .drift(ConstantDrift(25.0 / 300.0)) + .convergence(ConvergenceOptions { max_iter: 30, epsilon: 1e-6 }) + .build(); + + // Deterministic pseudo-random event generation. + let mut rng_state = seed; + let mut next = || { + rng_state = rng_state.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407); + rng_state + }; + + let mut events: Vec> = Vec::with_capacity(100); + for ev_i in 0..100 { + let a = (next() % 30) as usize; + let mut b = (next() % 30) as usize; + while b == a { + b = (next() % 30) as usize; + } + events.push(Event { + time: ev_i as i64 + 1, + teams: smallvec![ + Team::with_members([Member::new(format!("p{a}"))]), + Team::with_members([Member::new(format!("p{b}"))]), + ], + outcome: Outcome::winner((next() % 2) as u32, 2), + }); + } + h.add_events(events).unwrap(); + h.converge().unwrap(); + h.learning_curve(&"p0".to_string()) +} + +#[test] +fn posteriors_identical_across_thread_counts() { + // Run the same history with the same seed at different rayon pool sizes. + // Rayon lets us set the pool once per process; the cleanest pattern is + // to use install() with a custom ThreadPoolBuilder per run. + + let sizes = [1, 2, 4, 8]; + let mut results: Vec> = Vec::new(); + for &n in &sizes { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(n) + .build() + .unwrap(); + let curve = pool.install(|| build_and_converge(42)); + results.push(curve); + } + + // Every result must be bit-identical to the first. + let reference = &results[0]; + for (i, curve) in results.iter().enumerate().skip(1) { + assert_eq!( + curve.len(), + reference.len(), + "curve length differs at {n} threads", + n = sizes[i], + ); + for (j, (&(t_ref, g_ref), &(t, g))) in reference.iter().zip(curve.iter()).enumerate() { + assert_eq!(t_ref, t, "time point {j} differs at {} threads", sizes[i]); + assert_eq!( + g_ref.pi_and_tau(), + g.pi_and_tau(), + "posterior bits differ at thread count {}, time {}", + sizes[i], + t, + ); + } + } +} +``` + +**Note on `pi_and_tau`**: the test expects a way to extract the raw nat-param representation for bit-level comparison. If the `Gaussian` type doesn't expose it, add a `pub(crate) fn pi_and_tau(&self) -> (f64, f64)` method as part of this task. Alternatively, compare `(mu(), sigma())` — slightly less strict but usually good enough for determinism testing. + +Actually, `f64::to_bits()` gives us direct bit equality: + +```rust +assert_eq!(g_ref.mu().to_bits(), g.mu().to_bits(), …); +assert_eq!(g_ref.sigma().to_bits(), g.sigma().to_bits(), …); +``` + +Use `to_bits()` so we don't need a new accessor. + +- [ ] **Step 2: Verify** + +```bash +cargo test --features approx,rayon --test determinism +``` + +Expected: `posteriors_identical_across_thread_counts` passes. + +- [ ] **Step 3: Commit** + +```bash +cargo +nightly fmt +git add tests/determinism.rs +git commit -m "$(cat <<'EOF' +test: assert bit-identical posteriors across RAYON_NUM_THREADS + +Runs the same deterministic history at thread counts {1, 2, 4, 8} +and asserts every (time, posterior) pair is bit-identical. Verifies +the T3 determinism invariant holds under the ordered-reduce strategy. + +Only compiled with --features rayon. + +Part of T3. +EOF +)" +``` + +--- + +## Task 10: Multi-thread benchmark + acceptance gate + +**Files:** +- Create: `benches/history_converge.rs` +- Modify: `Cargo.toml` (register the bench) + +- [ ] **Step 1: Register the bench in `Cargo.toml`** + +Add under the existing `[[bench]]` entries: + +```toml +[[bench]] +name = "history_converge" +harness = false +``` + +- [ ] **Step 2: Create `benches/history_converge.rs`** + +```rust +//! End-to-end convergence benchmark. Measures `History::converge()` on a +//! realistic workload (~500 events, ~100 competitors, ~30 iters). The +//! rayon feature, when enabled, activates within-slice parallel event +//! iteration. + +use criterion::{Criterion, criterion_group, criterion_main}; +use smallvec::smallvec; +use trueskill_tt::{ + ConstantDrift, ConvergenceOptions, Event, History, Member, Outcome, Team, +}; + +fn build_history(n_events: usize, n_competitors: usize, seed: u64) -> History { + let mut rng = seed; + let mut next = || { + rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407); + rng + }; + + let mut h = History::::builder() + .mu(25.0) + .sigma(25.0 / 3.0) + .beta(25.0 / 6.0) + .drift(ConstantDrift(25.0 / 300.0)) + .convergence(ConvergenceOptions { max_iter: 30, epsilon: 1e-6 }) + .build(); + + let mut events: Vec> = Vec::with_capacity(n_events); + for ev_i in 0..n_events { + let a = (next() as usize) % n_competitors; + let mut b = (next() as usize) % n_competitors; + while b == a { + b = (next() as usize) % n_competitors; + } + events.push(Event { + time: (ev_i as i64 / 10) + 1, // ~10 events per slice + teams: smallvec![ + Team::with_members([Member::new(format!("p{a}"))]), + Team::with_members([Member::new(format!("p{b}"))]), + ], + outcome: Outcome::winner((next() % 2) as u32, 2), + }); + } + h.add_events(events).unwrap(); + h +} + +fn bench_converge(c: &mut Criterion) { + c.bench_function("History::converge/500x100", |b| { + b.iter_batched( + || build_history(500, 100, 42), + |mut h| { + h.converge().unwrap(); + }, + criterion::BatchSize::SmallInput, + ); + }); +} + +criterion_group!(benches, bench_converge); +criterion_main!(benches); +``` + +- [ ] **Step 3: Run sequential baseline** + +```bash +cargo bench --bench history_converge --features approx 2>&1 | grep 'History::converge' +``` + +Record the sequential baseline number. + +- [ ] **Step 4: Run parallel version** + +```bash +cargo bench --bench history_converge --features approx,rayon 2>&1 | grep 'History::converge' +``` + +**Acceptance gate:** parallel run should show >2× speedup on an 8-core machine. If it's less: +- Verify rayon is actually being used (`RAYON_LOG=1`). +- Check whether the workload has enough color-group parallelism (a slice with 10 events that all share competitors has 0 parallel work). +- Consider whether the per-event cost is large enough to amortize rayon overhead. + +If the gate fails, dig in before committing. Acceptable fallback: tune the benchmark workload (more events per slice, more competitors) so parallelism has more opportunity. Report findings. + +- [ ] **Step 5: Commit** + +```bash +cargo +nightly fmt +git add -A +git commit -m "$(cat <<'EOF' +bench: end-to-end History::converge benchmark + +Workload: 500 events across ~50 time slices, ~100 competitors, 30 +iteration cap, 1e-6 convergence. Measures full forward+backward +sweep through convergence — the target hot path for rayon-backed +within-slice parallelism. + +Sequential baseline: ms +With --features rayon on 8 cores: ms (×) + +Part of T3. +EOF +)" +``` + +(Fill in ``, ``, `` from measured numbers.) + +--- + +## Task 11: Final verification, bench capture, CHANGELOG + +**Files:** +- Modify: `benches/baseline.txt` +- Modify: `CHANGELOG.md` + +- [ ] **Step 1: Run the complete verification matrix** + +```bash +cargo +nightly fmt --check +cargo clippy --all-targets --features approx -- -D warnings +cargo clippy --all-targets --features approx,rayon -- -D warnings +cargo test --features approx +cargo test --features approx,rayon +cargo bench --bench batch --features approx 2>&1 | grep "Batch::iteration" +cargo bench --bench batch --features approx,rayon 2>&1 | grep "Batch::iteration" +cargo bench --bench history_converge --features approx 2>&1 | grep 'History::converge' +cargo bench --bench history_converge --features approx,rayon 2>&1 | grep 'History::converge' +``` + +All must pass / be clean. + +- [ ] **Step 2: Append T3 block to `benches/baseline.txt`** + +``` +# After T3 (date, same hardware) + +Batch::iteration (seq) µs ( vs T2 21.36 µs) +Batch::iteration (rayon, 8c) µs (sequential path on single slice; rayon not active) +History::converge (seq) ms baseline +History::converge (rayon, 8c) ms (× — target was ≥2×) + +# Notes: +# - T3 adds Send + Sync on public traits, color-group partition in +# TimeSlice, and rayon-feature-gated parallel paths on within-slice +# iteration, learning_curves, log_evidence_for. +# - Determinism verified: tests/determinism.rs asserts bit-identical +# posteriors across RAYON_NUM_THREADS={1, 2, 4, 8}. +# - Rayon is opt-in: default build is single-threaded as in T2. +``` + +- [ ] **Step 3: Prepend T3 entry to `CHANGELOG.md`** + +Add a `## Unreleased — T3 concurrency` section above the existing `## Unreleased — T2 new API surface` (if it's still there after the main merge; otherwise above the `## 0.1.0` section). Enumerate: new `rayon` feature, `Send + Sync` bounds (minor breaking for downstream custom trait impls), color-group infrastructure (internal), benchmark numbers. + +- [ ] **Step 4: Commit** + +```bash +git add benches/baseline.txt CHANGELOG.md +git commit -m "$(cat <<'EOF' +bench,docs: capture T3 final numbers and update CHANGELOG + +History::converge parallel speedup: × on 8 cores with --features rayon. +Batch::iteration unchanged in seq mode; Gaussian ops unchanged. + +Determinism verified: bit-identical posteriors across +RAYON_NUM_THREADS={1, 2, 4, 8}. + +Closes T3 of docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md. +EOF +)" +``` + +- [ ] **Step 5: Ready to PR** + +Branch `t3-concurrency` should open against `main`. PR body references the spec, the plan, and this file's benchmarks block. + +--- + +## Self-review notes + +**Spec coverage** (against Section 6 + Section 7 "T3"): + +- ✅ `Send + Sync` audit and bounds on all public traits (Task 3) +- ✅ Color-group partitioning at `TimeSlice` ingestion (Tasks 4, 5) +- ✅ `rayon` as opt-in feature (Task 2; note: spec said default-on, we flip later) +- ✅ Parallel within-slice color groups (Task 6) +- ✅ Parallel `learning_curves` (Task 7) +- ✅ Parallel `log_evidence_for` (Task 8) +- ✅ Deterministic posteriors across `RAYON_NUM_THREADS` (Task 9) +- ✅ >2× speedup on 8-core offline converge (Task 10) + +**Deferred to later tiers:** +- Cross-slice parallelism (dirty-bit slice skipping) — spec acknowledges this is separate from T3's within-slice focus. +- Synchronous-EP schedule with barrier merge — available as a `Schedule` impl, per spec Section 6, but deferred. +- Exposing color-group partitioning to users via `add_events_with_partition` — per spec open question 5. +- Rayon default-on flip — after field-stability evidence. + +**Hazards during execution:** +- **Mutable-aliasing gymnastics in Task 6.** The color-disjoint guarantee is real but rayon's API doesn't see it. Task 6 Step 2 offers three implementation approaches; the recommended one (approach 2, reorder events into contiguous color ranges) avoids `unsafe` but requires a physical reorder on mutation. Benchmark reordering cost — if it dominates, reconsider. +- **`Send + Sync` auto-derive failure.** If any struct between traits holds a non-thread-safe primitive (raw pointer, `Rc`), auto-derive fails. Unlikely in this codebase, but watch for it in Task 3. +- **Deterministic sum pitfalls.** Rayon's `par_iter().sum()` reorders additions. Only `par_iter().map().collect::>().into_iter().sum()` is deterministic. Task 8 is careful about this; reviewers should verify. +- **`f64` NaN in goldens.** If any test golden has a NaN, `to_bits()` comparison needs special casing. Our goldens are all finite; no action needed. + +**Things outside the plan that may bite:** +- CI needs to test both feature configurations. If there's a `.github/workflows/` file, update it to include `--features approx,rayon` as a matrix entry. If no CI exists (likely for this project), flag and move on. +- `examples/atp.rs` — may want to demonstrate `--features rayon`. Optional; skip unless trivial. diff --git a/src/color_group.rs b/src/color_group.rs new file mode 100644 index 0000000..6add43c --- /dev/null +++ b/src/color_group.rs @@ -0,0 +1,158 @@ +//! Greedy graph coloring for within-slice event independence. +//! +//! Events sharing no `Index` can be processed in parallel under async-EP +//! semantics. This module partitions a list of events into "colors" such +//! that events of the same color touch disjoint index sets. +//! +//! The algorithm is greedy: for each event in ingestion order, place it in +//! the lowest-numbered color whose existing members share no `Index`. If +//! no existing color accepts the event, open a new color. +//! +//! Complexity: O(n × c × m) where n is events, c is colors (small, ≤ 5 in +//! practice), and m is average team size. + +use std::collections::HashSet; + +use crate::Index; + +/// Partition of event indices into color groups. +/// +/// Each inner `Vec` holds the indices (into the original events +/// array) of events assigned to one color. Colors are iterated in ascending +/// order by convention. +#[derive(Clone, Debug, Default)] +pub(crate) struct ColorGroups { + pub(crate) groups: Vec>, +} + +impl ColorGroups { + #[allow(dead_code)] + pub(crate) fn new() -> Self { + Self::default() + } + + #[allow(dead_code)] + pub(crate) fn n_colors(&self) -> usize { + self.groups.len() + } + + #[allow(dead_code)] + pub(crate) fn is_empty(&self) -> bool { + self.groups.is_empty() + } + + /// Total event count across all colors. + #[allow(dead_code)] + pub(crate) fn total_events(&self) -> usize { + self.groups.iter().map(|g| g.len()).sum() + } + + /// Contiguous index range for one color after events have been reordered + /// into color-contiguous positions by `TimeSlice::recompute_color_groups`. + #[allow(dead_code)] + pub(crate) fn color_range(&self, color_idx: usize) -> std::ops::Range { + let group = &self.groups[color_idx]; + if group.is_empty() { + return 0..0; + } + let start = *group.first().unwrap(); + let end = *group.last().unwrap() + 1; + start..end + } +} + +/// Compute color groups greedily. +/// +/// `index_set(ev_idx)` yields, for each event index, the iterator of +/// `Index` values that event touches. The returned `ColorGroups` has one +/// inner `Vec` per color, containing event indices in the order +/// they were assigned. +#[allow(dead_code)] +pub(crate) fn color_greedy(n_events: usize, index_set: F) -> ColorGroups +where + F: Fn(usize) -> I, + I: IntoIterator, +{ + let mut groups: Vec> = Vec::new(); + let mut members: Vec> = Vec::new(); + + for ev_idx in 0..n_events { + let ev_members: HashSet = index_set(ev_idx).into_iter().collect(); + // Find first color whose member-set is disjoint from this event's indices. + let chosen = members.iter().position(|m| m.is_disjoint(&ev_members)); + let color_idx = match chosen { + Some(c) => c, + None => { + groups.push(Vec::new()); + members.push(HashSet::new()); + groups.len() - 1 + } + }; + groups[color_idx].push(ev_idx); + members[color_idx].extend(ev_members); + } + + ColorGroups { groups } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn idx(i: usize) -> Index { + Index::from(i) + } + + #[test] + fn single_event_gets_one_color() { + let cg = color_greedy(1, |_| vec![idx(0), idx(1)]); + assert_eq!(cg.n_colors(), 1); + assert_eq!(cg.groups[0], vec![0]); + } + + #[test] + fn disjoint_events_share_a_color() { + let cg = color_greedy(2, |i| match i { + 0 => vec![idx(0), idx(1)], + 1 => vec![idx(2), idx(3)], + _ => unreachable!(), + }); + assert_eq!(cg.n_colors(), 1); + assert_eq!(cg.groups[0], vec![0, 1]); + } + + #[test] + fn overlapping_events_need_separate_colors() { + let cg = color_greedy(2, |i| match i { + 0 => vec![idx(0), idx(1)], + 1 => vec![idx(1), idx(2)], + _ => unreachable!(), + }); + assert_eq!(cg.n_colors(), 2); + assert_eq!(cg.groups[0], vec![0]); + assert_eq!(cg.groups[1], vec![1]); + } + + #[test] + fn three_events_two_colors() { + // Event 0: {0, 1}; event 1: {2, 3}; event 2: {0, 2}. + // Greedy: ev0→c0, ev1→c0 (disjoint), ev2 overlaps both→c1. + let cg = color_greedy(3, |i| match i { + 0 => vec![idx(0), idx(1)], + 1 => vec![idx(2), idx(3)], + 2 => vec![idx(0), idx(2)], + _ => unreachable!(), + }); + assert_eq!(cg.n_colors(), 2); + assert_eq!(cg.groups[0], vec![0, 1]); + assert_eq!(cg.groups[1], vec![2]); + } + + #[test] + fn total_events_counts_correctly() { + let cg = color_greedy(4, |_| vec![idx(0)]); + // All events touch index 0 → 4 distinct colors. + assert_eq!(cg.n_colors(), 4); + assert_eq!(cg.total_events(), 4); + } +} diff --git a/src/drift.rs b/src/drift.rs index 57e684a..c751624 100644 --- a/src/drift.rs +++ b/src/drift.rs @@ -6,7 +6,7 @@ use crate::time::Time; /// /// Generic over `T: Time` so seasonal or calendar-aware drift is expressible /// without going through `i64`. -pub trait Drift: Copy + Debug { +pub trait Drift: Copy + Debug + Send + Sync { /// Variance added to the skill prior for elapsed time `from -> to`. /// /// Called with `from <= to`; returning zero means no drift accumulates. diff --git a/src/factor/mod.rs b/src/factor/mod.rs index da72dbd..01b39c4 100644 --- a/src/factor/mod.rs +++ b/src/factor/mod.rs @@ -56,7 +56,7 @@ impl VarStore { /// Factors hold their own outgoing messages and propagate them by reading /// connected variable marginals from a `VarStore` and writing back updated /// marginals. -pub trait Factor { +pub trait Factor: Send + Sync { /// Update outgoing messages and write back to the var store. /// /// Returns the max delta `(|Δmu|, |Δsigma|)` across writes this diff --git a/src/history.rs b/src/history.rs index 5191929..6d4439c 100644 --- a/src/history.rs +++ b/src/history.rs @@ -262,17 +262,45 @@ impl, O: Observer, K: Eq + Hash + Clone> History HashMap> { - let mut data: HashMap> = HashMap::new(); - for slice in &self.time_slices { - for (idx, skill) in slice.skills.iter() { - if let Some(key) = self.keys.key(idx).cloned() { - data.entry(key) - .or_default() - .push((slice.time, skill.posterior())); + #[cfg(feature = "rayon")] + { + use rayon::prelude::*; + + let per_slice: Vec> = self + .time_slices + .par_iter() + .map(|ts| { + ts.skills + .iter() + .map(|(idx, sk)| (idx, ts.time, sk.posterior())) + .collect() + }) + .collect(); + + let mut data: HashMap> = HashMap::new(); + for slice_contrib in per_slice { + for (idx, t, g) in slice_contrib { + if let Some(key) = self.keys.key(idx).cloned() { + data.entry(key).or_default().push((t, g)); + } } } + data + } + #[cfg(not(feature = "rayon"))] + { + let mut data: HashMap> = HashMap::new(); + for slice in &self.time_slices { + for (idx, skill) in slice.skills.iter() { + if let Some(key) = self.keys.key(idx).cloned() { + data.entry(key) + .or_default() + .push((slice.time, skill.posterior())); + } + } + } + data } - data } /// Skill estimate at the latest time slice the competitor appears in. @@ -304,10 +332,23 @@ impl, O: Observer, K: Eq + Hash + Clone> History f64 { - self.time_slices - .iter() - .map(|ts| ts.log_evidence(self.online, targets, forward, &self.agents)) - .sum() + #[cfg(feature = "rayon")] + { + use rayon::prelude::*; + let per_slice: Vec = self + .time_slices + .par_iter() + .map(|ts| ts.log_evidence(self.online, targets, forward, &self.agents)) + .collect(); + per_slice.into_iter().sum() + } + #[cfg(not(feature = "rayon"))] + { + self.time_slices + .iter() + .map(|ts| ts.log_evidence(self.online, targets, forward, &self.agents)) + .sum() + } } /// Total log-evidence across the history. diff --git a/src/lib.rs b/src/lib.rs index e6c7d41..6bd9fa7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ pub(crate) mod arena; mod time; mod time_slice; pub use time_slice::TimeSlice; +mod color_group; mod competitor; mod convergence; pub mod drift; diff --git a/src/observer.rs b/src/observer.rs index 223948b..063ef6a 100644 --- a/src/observer.rs +++ b/src/observer.rs @@ -9,9 +9,8 @@ use crate::time::Time; /// Receives progress callbacks during `History::converge`. /// /// All methods have default no-op implementations; implement only what's -/// interesting. Send/Sync is NOT required in T2 (added in T3 along with -/// Rayon support). -pub trait Observer { +/// interesting. +pub trait Observer: Send + Sync { /// Called after each convergence iteration across the whole history. fn on_iteration_end(&self, _iter: usize, _max_step: (f64, f64)) {} diff --git a/src/schedule.rs b/src/schedule.rs index 2c98fc1..a08d20c 100644 --- a/src/schedule.rs +++ b/src/schedule.rs @@ -16,7 +16,7 @@ pub struct ScheduleReport { } /// Drives factor propagation to convergence. -pub trait Schedule { +pub trait Schedule: Send + Sync { fn run(&self, factors: &mut [BuiltinFactor], vars: &mut VarStore) -> ScheduleReport; } diff --git a/src/time.rs b/src/time.rs index 813ff39..efe0bc4 100644 --- a/src/time.rs +++ b/src/time.rs @@ -8,7 +8,7 @@ /// /// Must be `Ord + Copy` so slices can sort events, and `'static` so /// `History` can store it by value without lifetimes. -pub trait Time: Copy + Ord + 'static { +pub trait Time: Copy + Ord + Send + Sync + 'static { /// How much time elapsed between `self` and `later`. /// /// Used by `Drift::variance_delta` to compute skill drift. Returning diff --git a/src/time_slice.rs b/src/time_slice.rs index c1d48fb..cc19b30 100644 --- a/src/time_slice.rs +++ b/src/time_slice.rs @@ -7,6 +7,7 @@ use std::collections::HashMap; use crate::{ Index, N_INF, arena::ScratchArena, + color_group::ColorGroups, drift::Drift, game::Game, gaussian::Gaussian, @@ -84,6 +85,12 @@ pub(crate) struct Event { } impl Event { + pub(crate) fn iter_agents(&self) -> impl Iterator + '_ { + self.teams + .iter() + .flat_map(|t| t.items.iter().map(|it| it.agent)) + } + fn outputs(&self) -> Vec { self.teams .iter() @@ -108,6 +115,33 @@ impl Event { }) .collect::>() } + + /// Direct in-loop update: mutates self and `skills` inline with no + /// intermediate allocation. Used by both the sequential sweep path and, + /// via unsafe, by the parallel rayon path for events in the same color + /// group (which have disjoint agent sets — see `sweep_color_groups`). + fn iteration_direct>( + &mut self, + skills: &mut SkillStore, + agents: &CompetitorStore, + p_draw: f64, + arena: &mut ScratchArena, + ) { + let teams = self.within_priors(false, false, skills, agents); + let result = self.outputs(); + let g = Game::ranked_with_arena(teams, &result, &self.weights, p_draw, arena); + + for (t, team) in self.teams.iter_mut().enumerate() { + for (i, item) in team.items.iter_mut().enumerate() { + let old_likelihood = skills.get(item.agent).unwrap().likelihood; + let new_likelihood = (old_likelihood / item.likelihood) * g.likelihoods[t][i]; + skills.get_mut(item.agent).unwrap().likelihood = new_likelihood; + item.likelihood = g.likelihoods[t][i]; + } + } + + self.evidence = g.evidence; + } } #[derive(Debug)] @@ -117,6 +151,7 @@ pub struct TimeSlice { pub(crate) time: T, p_draw: f64, arena: ScratchArena, + pub(crate) color_groups: ColorGroups, } impl TimeSlice { @@ -127,9 +162,44 @@ impl TimeSlice { time, p_draw, arena: ScratchArena::new(), + color_groups: ColorGroups::new(), } } + /// Recompute the color-group partition and reorder `self.events` into + /// color-contiguous ranges. After this call, `self.color_groups.groups[c]` + /// contains a contiguous ascending range of indices in `self.events`. + pub(crate) fn recompute_color_groups(&mut self) { + use crate::color_group::color_greedy; + + let n = self.events.len(); + if n == 0 { + self.color_groups = ColorGroups::new(); + return; + } + + let cg = color_greedy(n, |ev_idx| { + self.events[ev_idx].iter_agents().collect::>() + }); + + let mut reordered: Vec = Vec::with_capacity(n); + let mut new_groups: Vec> = Vec::with_capacity(cg.groups.len()); + let mut taken: Vec> = self.events.drain(..).map(Some).collect(); + + for group in &cg.groups { + let mut new_indices: Vec = Vec::with_capacity(group.len()); + for &old_idx in group { + let ev = taken[old_idx].take().expect("event already taken"); + new_indices.push(reordered.len()); + reordered.push(ev); + } + new_groups.push(new_indices); + } + + self.events = reordered; + self.color_groups = ColorGroups { groups: new_groups }; + } + pub fn add_events>( &mut self, composition: Vec>>, @@ -212,6 +282,7 @@ impl TimeSlice { self.events.extend(events); self.iteration(from, agents); + self.recompute_color_groups(); } pub(crate) fn posteriors(&self) -> HashMap { @@ -222,28 +293,115 @@ impl TimeSlice { } pub fn iteration>(&mut self, from: usize, agents: &CompetitorStore) { - for event in self.events.iter_mut().skip(from) { - let teams = event.within_priors(false, false, &self.skills, agents); - let result = event.outputs(); + if from > 0 || self.color_groups.is_empty() { + // Initial pass (add_events) or no color groups yet: simple sequential sweep. + for event in self.events.iter_mut().skip(from) { + let teams = event.within_priors(false, false, &self.skills, agents); + let result = event.outputs(); - let g = Game::ranked_with_arena( - teams, - &result, - &event.weights, - self.p_draw, - &mut self.arena, - ); + let g = Game::ranked_with_arena( + teams, + &result, + &event.weights, + self.p_draw, + &mut self.arena, + ); - for (t, team) in event.teams.iter_mut().enumerate() { - for (i, item) in team.items.iter_mut().enumerate() { - let old_likelihood = self.skills.get(item.agent).unwrap().likelihood; - let new_likelihood = (old_likelihood / item.likelihood) * g.likelihoods[t][i]; - self.skills.get_mut(item.agent).unwrap().likelihood = new_likelihood; - item.likelihood = g.likelihoods[t][i]; + for (t, team) in event.teams.iter_mut().enumerate() { + for (i, item) in team.items.iter_mut().enumerate() { + let old_likelihood = self.skills.get(item.agent).unwrap().likelihood; + let new_likelihood = + (old_likelihood / item.likelihood) * g.likelihoods[t][i]; + self.skills.get_mut(item.agent).unwrap().likelihood = new_likelihood; + item.likelihood = g.likelihoods[t][i]; + } + } + + event.evidence = g.evidence; + } + } else { + self.sweep_color_groups(agents); + } + } + + /// Full event sweep using the color-group partition. Colors are processed + /// sequentially; within each color the inner loop is parallel under rayon. + /// + /// Events within each color group touch disjoint agent sets (guaranteed by + /// the greedy coloring). This lets each rayon thread write directly to its + /// events' skill likelihoods without a deferred-apply step, matching the + /// sequential path's allocation profile. The unsafe block is sound because: + /// 1. `self.events[range]` and `self.skills` are separate fields → disjoint. + /// 2. Events in the same color group access disjoint `Index` values in + /// `self.skills`, so concurrent writes land on different memory locations. + /// 3. Each event only writes to its own items' likelihoods (no sharing). + #[cfg(feature = "rayon")] + fn sweep_color_groups>(&mut self, agents: &CompetitorStore) { + use rayon::prelude::*; + + thread_local! { + static ARENA: std::cell::RefCell = + std::cell::RefCell::new(ScratchArena::new()); + } + + // Minimum color-group size to justify rayon's task-spawn overhead. + // Below this threshold, process events sequentially to avoid regression + // on small per-slice workloads. + const RAYON_THRESHOLD: usize = 64; + + for color_idx in 0..self.color_groups.groups.len() { + let group_len = self.color_groups.groups[color_idx].len(); + if group_len == 0 { + continue; + } + let range = self.color_groups.color_range(color_idx); + let p_draw = self.p_draw; + + if group_len >= RAYON_THRESHOLD { + // Obtain a raw pointer from the unique `&mut self.skills` reference. + // Casting back to `&mut` inside the closure is sound because: + // 1. The pointer originates from a `&mut` — no aliasing with shared refs. + // 2. Events in the same color group touch disjoint `Index` slots in the + // underlying Vec, so concurrent writes from different threads land on + // different memory locations — no data race. + // 3. `self.events[range]` and `self.skills` are separate struct fields, + // so the borrow splits cleanly. + let skills_addr: usize = (&mut self.skills as *mut SkillStore) as usize; + self.events[range].par_iter_mut().for_each(move |ev| { + // SAFETY: see above. + let skills: &mut SkillStore = unsafe { &mut *(skills_addr as *mut SkillStore) }; + ARENA.with(|cell| { + let mut arena = cell.borrow_mut(); + arena.reset(); + ev.iteration_direct(skills, agents, p_draw, &mut arena); + }); + }); + } else { + for ev in &mut self.events[range] { + ev.iteration_direct(&mut self.skills, agents, p_draw, &mut self.arena); } } + } + } - event.evidence = g.evidence; + /// Full event sweep using the color-group partition, sequential direct-write path. + /// Events within each color group are updated inline — no EventOutput allocation — + /// matching the T2 performance profile. + #[cfg(not(feature = "rayon"))] + fn sweep_color_groups>(&mut self, agents: &CompetitorStore) { + for color_idx in 0..self.color_groups.groups.len() { + if self.color_groups.groups[color_idx].is_empty() { + continue; + } + let range = self.color_groups.color_range(color_idx); + + // Borrow self.events as a mutable slice for this color range. + // self.skills and self.arena are separate fields — disjoint borrows are + // allowed within a single method body. + let p_draw = self.p_draw; + for ev in &mut self.events[range] { + ev.iteration_direct(&mut self.skills, agents, p_draw, &mut self.arena); + } } } @@ -662,4 +820,67 @@ mod tests { epsilon = 1e-6 ); } + + #[test] + fn time_slice_color_groups_reorders_events() { + // ev0: [a, b]; ev1: [c, d]; ev2: [a, c] + // Greedy coloring: ev0→c0, ev1→c0 (disjoint), ev2→c1 (overlaps both). + // After recompute_color_groups, physical order is [ev0, ev1, ev2] + // and groups == [[0, 1], [2]]. + let mut index_map = KeyTable::new(); + + let a = index_map.get_or_create("a"); + let b = index_map.get_or_create("b"); + let c = index_map.get_or_create("c"); + let d = index_map.get_or_create("d"); + + let mut agents: CompetitorStore = CompetitorStore::new(); + + for agent in [a, b, c, d] { + agents.insert( + agent, + Competitor { + rating: Rating::new( + Gaussian::from_ms(25.0, 25.0 / 3.0), + 25.0 / 6.0, + ConstantDrift(25.0 / 300.0), + ), + ..Default::default() + }, + ); + } + + let mut ts = TimeSlice::new(0i64, 0.0); + + ts.add_events( + vec![ + vec![vec![a], vec![b]], + vec![vec![c], vec![d]], + vec![vec![a], vec![c]], + ], + vec![vec![1.0, 0.0], vec![1.0, 0.0], vec![1.0, 0.0]], + vec![], + &agents, + ); + + assert_eq!(ts.color_groups.n_colors(), 2); + assert_eq!(ts.color_groups.groups[0], vec![0, 1]); + assert_eq!(ts.color_groups.groups[1], vec![2]); + + assert_eq!(ts.color_groups.color_range(0), 0..2); + assert_eq!(ts.color_groups.color_range(1), 2..3); + + // Events at positions 0 and 1 (color 0) must be disjoint — verify by + // checking that the agent sets of self.events[0] and self.events[1] do + // not include the agent at self.events[2]. + let agents_in_ev2: Vec = ts.events[2].iter_agents().collect(); + let agents_in_ev0: Vec = ts.events[0].iter_agents().collect(); + let agents_in_ev1: Vec = ts.events[1].iter_agents().collect(); + // ev0 and ev1 must be disjoint from each other (color-0 invariant). + assert!(agents_in_ev0.iter().all(|ag| !agents_in_ev1.contains(ag))); + // ev2 must share an agent with ev0 or ev1 (it needed its own color). + let ev2_overlaps_ev0 = agents_in_ev2.iter().any(|ag| agents_in_ev0.contains(ag)); + let ev2_overlaps_ev1 = agents_in_ev2.iter().any(|ag| agents_in_ev1.contains(ag)); + assert!(ev2_overlaps_ev0 || ev2_overlaps_ev1); + } } diff --git a/tests/determinism.rs b/tests/determinism.rs new file mode 100644 index 0000000..2f336d2 --- /dev/null +++ b/tests/determinism.rs @@ -0,0 +1,100 @@ +//! Determinism tests: identical posteriors across RAYON_NUM_THREADS +//! values. Only compiled with the `rayon` feature. + +#![cfg(feature = "rayon")] + +use smallvec::smallvec; +use trueskill_tt::{ConstantDrift, ConvergenceOptions, Event, History, Member, Outcome, Team}; + +/// Build a deterministic workload using a simple LCG (no external rand crate). +fn build_and_converge(seed: u64) -> Vec<(i64, trueskill_tt::Gaussian)> { + let mut h = History::::builder_with_key() + .mu(25.0) + .sigma(25.0 / 3.0) + .beta(25.0 / 6.0) + .drift(ConstantDrift(25.0 / 300.0)) + .convergence(ConvergenceOptions { + max_iter: 30, + epsilon: 1e-6, + }) + .build(); + + // LCG for deterministic pseudo-random ints. + let mut rng = seed; + let mut next = || { + rng = rng + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + rng + }; + + let mut events: Vec> = Vec::with_capacity(200); + for ev_i in 0..200 { + let a = (next() % 40) as usize; + let mut b = (next() % 40) as usize; + while b == a { + b = (next() % 40) as usize; + } + // ~10 events per slice so color groups have material parallelism. + events.push(Event { + time: (ev_i as i64 / 10) + 1, + teams: smallvec![ + Team::with_members([Member::new(format!("p{a}"))]), + Team::with_members([Member::new(format!("p{b}"))]), + ], + outcome: Outcome::winner((next() % 2) as u32, 2), + }); + } + h.add_events(events).unwrap(); + h.converge().unwrap(); + // Sample one competitor's curve for the comparison. + h.learning_curve("p0") +} + +#[test] +fn posteriors_identical_across_thread_counts() { + let sizes = [1usize, 2, 4, 8]; + let mut results: Vec> = Vec::new(); + for &n in &sizes { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(n) + .build() + .expect("rayon pool build"); + let curve = pool.install(|| build_and_converge(42)); + results.push(curve); + } + + let reference = &results[0]; + for (i, curve) in results.iter().enumerate().skip(1) { + assert_eq!( + curve.len(), + reference.len(), + "curve length differs at {n} threads", + n = sizes[i], + ); + for (j, (&(t_ref, g_ref), &(t, g))) in reference.iter().zip(curve.iter()).enumerate() { + assert_eq!( + t_ref, + t, + "time point {j} differs at {n} threads: ref={t_ref} vs got={t}", + n = sizes[i], + ); + assert_eq!( + g_ref.mu().to_bits(), + g.mu().to_bits(), + "mu bits differ at {n} threads, time {t}: ref={ref_mu} got={got_mu}", + n = sizes[i], + ref_mu = g_ref.mu(), + got_mu = g.mu(), + ); + assert_eq!( + g_ref.sigma().to_bits(), + g.sigma().to_bits(), + "sigma bits differ at {n} threads, time {t}: ref={ref_sigma} got={got_sigma}", + n = sizes[i], + ref_sigma = g_ref.sigma(), + got_sigma = g.sigma(), + ); + } + } +}