# T3 — Concurrency Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Ship the T3 tier from `docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md` Section 6: `Send + Sync` bounds on public traits, color-group partitioning for within-slice event independence, and rayon-backed parallel paths on within-slice event iteration, `learning_curves`, and `log_evidence_for`. Deterministic posteriors across `RAYON_NUM_THREADS={1, 2, 4, 8}`; >2× speedup on an 8-core offline-converge benchmark. **Architecture:** Concurrency lands as a single feature flag (`rayon`, opt-in for T3; the spec suggests default-on but we defer that flip until the feature is proven stable). All parallel paths are hidden behind `#[cfg(feature = "rayon")]` with sequential fallbacks. Within-slice parallelism exploits graph coloring: events sharing no `Index` go into the same color group and run concurrently; across color groups, execution is strictly sequential. This preserves the exact async-EP semantics of T2 at any thread count. Reductions over f64 (`log_evidence_for`, `predict_quality`) use two-stage parallel-then-sequential-reduce so sums are bit-identical regardless of thread count. **Tech Stack:** Rust 2024 edition. New optional dependency: `rayon = "1"`. Builds on T2 (`History`, `Event`, `Outcome`, `Observer`, `factors` module). ## Design decisions Called out explicitly so reviewers can override before execution: 1. **Rayon is opt-in** (`cargo feature`), not default-on. Simplifies CI, keeps the default build lean. We flip to default-on in a follow-up once the feature is shown to be stable under field use. 2. **Greedy graph coloring** for the within-slice partition: for each event in ingestion order, assign the lowest color whose existing members share no `Index` with the event. Optimality is not the target — events per slice is small (~50), and greedy finishes in O(n·c·m) where c is colors and m is team size. Rebalancing is a T4 follow-up if benchmarks show it helps. 3. **Scope of parallelism:** within-slice color groups, `learning_curves`, `log_evidence_for`, `predict_quality`. Cross-slice iteration in `History::converge` stays **sequential** — the forward/backward sweep has true data dependencies across slices. Parallelizing across slices requires a separate algorithm (the spec's dirty-bit slice skipping, deferred to beyond-T3). 4. **Deterministic reductions:** `.par_iter().map().collect::>().into_iter().sum()` — each slice's contribution is computed in parallel, then summed sequentially in slice order. Per-element values are bit-identical to T2 (no extra floating-point additions reordered). 5. **Within-game inference stays sequential.** A single `Game::ranked` / `Game::likelihoods` call is too small (~20 µs) to amortize rayon's ~5 µs task overhead. The spec's table confirms this. ## Acceptance criteria - `cargo test --features approx` — all tests pass (T2 baseline: 90 tests). - `cargo test --features approx,rayon` — all tests pass; determinism tests across `RAYON_NUM_THREADS={1, 2, 4, 8}` produce bit-identical posteriors. - `cargo clippy --all-targets --features approx -- -D warnings` — clean. - `cargo clippy --all-targets --features approx,rayon -- -D warnings` — clean. - `cargo +nightly fmt --check` — clean. - `cargo bench --bench history_converge --features approx,rayon` — >2× speedup on an 8-core machine vs the sequential baseline. - All public traits (`Time`, `Drift`, `Observer`, `Factor`, `Schedule`) have `Send + Sync` bounds; their blanket/default impls (`i64`, `Untimed`, `ConstantDrift`, `NullObserver`) all naturally satisfy the new bounds — no user code should need changes. - The `rayon` feature is opt-in; default build without it compiles and passes all tests. ## File map **New files:** | Path | Responsibility | |---|---| | `src/color_group.rs` | Greedy graph coloring for within-slice event partitioning. | | `src/parallel.rs` | `#[cfg(feature = "rayon")]` helpers: `par_or_seq` iterators, ordered reductions, thread-count determinism tests. | | `benches/history_converge.rs` | End-to-end convergence benchmark (scales to show rayon benefit). | | `tests/determinism.rs` | Runs the same convergence with different `RAYON_NUM_THREADS` and asserts bit-identical posteriors. | **Modified:** | Path | What changes | |---|---| | `Cargo.toml` | Add `rayon = { version = "1", optional = true }`; declare `rayon` feature. Add `[[bench]] name = "history_converge"`. | | `src/lib.rs` | Declare new `color_group` + `parallel` modules (both private). | | `src/time.rs` | Add `Send + Sync + 'static` bounds to `Time`. | | `src/drift.rs` | Add `Send + Sync` bound to `Drift`. | | `src/observer.rs` | Add `Send + Sync` bound to `Observer`. | | `src/factor/mod.rs` | Add `Send + Sync` bound to `Factor`. | | `src/schedule.rs` | Add `Send + Sync` bound to `Schedule`. | | `src/time_slice.rs` | Store pre-computed color-group partition; parallel event iteration behind `#[cfg(feature = "rayon")]`. | | `src/history.rs` | Parallel `learning_curves`, `learning_curves_by_index`, `log_evidence`, `log_evidence_for` behind `#[cfg(feature = "rayon")]` with ordered reductions. | --- ## Task 1: Pre-flight — verify green on main, create t3 branch, capture baseline **Files:** none - [ ] **Step 1: Confirm on main, clean tree** ```bash git status git rev-parse --abbrev-ref HEAD ``` Expected: clean; on `main`. - [ ] **Step 2: Create the T3 branch** ```bash git checkout -b t3-concurrency ``` - [ ] **Step 3: Confirm all tests pass** ```bash cargo test --features approx ``` Expected: 90 tests pass (68 lib + 10 api_shape + 6 game + 4 record_winner + 2 equivalence). - [ ] **Step 4: Capture current bench baseline** ```bash cargo bench --bench batch 2>&1 | grep "Batch::iteration" ``` Record the number — it'll be the sequential-path baseline to verify no regression. - [ ] **Step 5: No commit** — verification only. --- ## Task 2: Add `rayon` as optional dependency + feature flag **Files:** - Modify: `Cargo.toml` - [ ] **Step 1: Add the dependency and feature** Under `[dependencies]` add: ```toml rayon = { version = "1", optional = true } ``` Add a `[features]` section (if not present — check first): ```toml [features] rayon = ["dep:rayon"] ``` (The existing `approx` feature already exists as a `dep:approx`-style optional dependency; use the same convention.) - [ ] **Step 2: Verify both builds compile** ```bash cargo build --features approx cargo build --features approx,rayon ``` Both must succeed. The second one pulls `rayon` into the dependency graph but nothing uses it yet. - [ ] **Step 3: Commit** ```bash git add Cargo.toml Cargo.lock git commit -m "$(cat <<'EOF' feat(cargo): add rayon as optional dependency Opt-in feature flag — users who want parallel paths build with --features rayon. Default build remains single-threaded. Spec Section 6 calls for default-on; we defer that flip until the feature is stable under field use. Part of T3 of docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md. EOF )" ``` --- ## Task 3: Add `Send + Sync` bounds to public traits **Files:** - Modify: `src/time.rs`, `src/drift.rs`, `src/observer.rs`, `src/factor/mod.rs`, `src/schedule.rs` This is a minor breaking API change: downstream code with user-defined `Drift` / `Observer` / `Factor` / `Schedule` impls that aren't `Send + Sync` will fail to compile. For our crate, all built-in types (`i64`, `Untimed`, `ConstantDrift`, `NullObserver`, `EpsilonOrMax`, `TeamSumFactor`, `RankDiffFactor`, `TruncFactor`, `BuiltinFactor`) naturally satisfy these bounds — no internal code changes. - [ ] **Step 1: Update `Time` trait** ```rust // src/time.rs pub trait Time: Copy + Ord + Send + Sync + 'static { fn elapsed_to(&self, later: &Self) -> i64; } ``` - [ ] **Step 2: Update `Drift` trait** ```rust // src/drift.rs pub trait Drift: Copy + Debug + Send + Sync { fn variance_delta(&self, from: &T, to: &T) -> f64; fn variance_for_elapsed(&self, elapsed: i64) -> f64; } ``` - [ ] **Step 3: Update `Observer` trait** ```rust // src/observer.rs pub trait Observer: Send + Sync { fn on_iteration_end(&self, _iter: usize, _max_step: (f64, f64)) {} fn on_batch_processed(&self, _time: &T, _slice_idx: usize, _n_events: usize) {} fn on_converged(&self, _iters: usize, _final_step: (f64, f64), _converged: bool) {} } ``` - [ ] **Step 4: Update `Factor` trait** ```rust // src/factor/mod.rs pub trait Factor: Send + Sync { fn propagate(&mut self, vars: &mut VarStore) -> (f64, f64); fn log_evidence(&self, _vars: &VarStore) -> f64 { 0.0 } } ``` - [ ] **Step 5: Update `Schedule` trait** ```rust // src/schedule.rs pub trait Schedule: Send + Sync { fn run(&self, factors: &mut [BuiltinFactor], vars: &mut VarStore) -> ScheduleReport; } ``` - [ ] **Step 6: Verify both feature combinations still compile and test** ```bash cargo build --features approx cargo build --features approx,rayon cargo test --features approx --lib cargo clippy --all-targets --features approx -- -D warnings ``` If any built-in type fails the auto-`Send`/`Sync` check, investigate — something non-thread-safe slipped in. Likely suspects: raw pointers, `Rc`, `RefCell`, non-static references. None are expected in this codebase; if found, convert to `Arc`/`Mutex`/`RwLock` as appropriate. - [ ] **Step 7: Commit** ```bash cargo +nightly fmt git add -A git commit -m "$(cat <<'EOF' feat(api): add Send + Sync bounds to public traits Required for T3 rayon-based parallelism. Affected traits: - Time (+ Send + Sync + 'static) - Drift (+ Send + Sync) - Observer (+ Send + Sync) - Factor (+ Send + Sync) - Schedule (+ Send + Sync) All built-in impls (i64, Untimed, ConstantDrift, NullObserver, EpsilonOrMax, TeamSumFactor, RankDiffFactor, TruncFactor, BuiltinFactor) naturally satisfy these bounds — no internal changes needed. Minor breaking change: downstream impls that aren't already thread-safe will fail to compile until they add the bounds. Part of T3 of docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md. EOF )" ``` --- ## Task 4: Implement greedy color-group partitioning **Files:** - Create: `src/color_group.rs` - Modify: `src/lib.rs` (register module) - [ ] **Step 1: Create `src/color_group.rs`** ```rust //! Greedy graph coloring for within-slice event independence. //! //! Events sharing no `Index` can be processed in parallel under async-EP //! semantics. This module partitions a list of events into "colors" such //! that events of the same color touch disjoint index sets. //! //! The algorithm is greedy: for each event in ingestion order, place it in //! the lowest-numbered color whose existing members share no `Index`. If //! no existing color accepts the event, open a new color. //! //! Complexity: O(n × c × m) where n is events, c is colors (small, ≤ 5 in //! practice), and m is average team size. use std::collections::HashSet; use crate::Index; /// Partition of event indices into color groups. /// /// Each inner `Vec` holds the indices (into the original events /// array) of events assigned to one color. Colors are iterated in ascending /// order by convention. #[derive(Clone, Debug, Default)] pub(crate) struct ColorGroups { pub(crate) groups: Vec>, } impl ColorGroups { pub(crate) fn new() -> Self { Self::default() } pub(crate) fn n_colors(&self) -> usize { self.groups.len() } pub(crate) fn is_empty(&self) -> bool { self.groups.is_empty() } /// Total event count across all colors. pub(crate) fn total_events(&self) -> usize { self.groups.iter().map(|g| g.len()).sum() } } /// Compute color groups greedily. /// /// `event_indices` yields, for each event, the set of `Index` values that /// event touches. The returned `ColorGroups` has one inner `Vec` per /// color, containing event indices in the order they were assigned. pub(crate) fn color_greedy(n_events: usize, index_set: F) -> ColorGroups where F: Fn(usize) -> I, I: IntoIterator, { let mut groups: Vec> = Vec::new(); let mut members: Vec> = Vec::new(); for ev_idx in 0..n_events { let ev_members: HashSet = index_set(ev_idx).into_iter().collect(); // Find first color whose member-set is disjoint from this event's indices. let chosen = members .iter() .position(|m| m.is_disjoint(&ev_members)); let color_idx = match chosen { Some(c) => c, None => { groups.push(Vec::new()); members.push(HashSet::new()); groups.len() - 1 } }; groups[color_idx].push(ev_idx); members[color_idx].extend(ev_members); } ColorGroups { groups } } #[cfg(test)] mod tests { use super::*; fn idx(i: usize) -> Index { Index::from(i) } #[test] fn single_event_gets_one_color() { let cg = color_greedy(1, |_| vec![idx(0), idx(1)]); assert_eq!(cg.n_colors(), 1); assert_eq!(cg.groups[0], vec![0]); } #[test] fn disjoint_events_share_a_color() { // Event 0 touches {0, 1}; event 1 touches {2, 3}. let cg = color_greedy(2, |i| match i { 0 => vec![idx(0), idx(1)], 1 => vec![idx(2), idx(3)], _ => unreachable!(), }); assert_eq!(cg.n_colors(), 1); assert_eq!(cg.groups[0], vec![0, 1]); } #[test] fn overlapping_events_need_separate_colors() { // Event 0 touches {0, 1}; event 1 touches {1, 2}. let cg = color_greedy(2, |i| match i { 0 => vec![idx(0), idx(1)], 1 => vec![idx(1), idx(2)], _ => unreachable!(), }); assert_eq!(cg.n_colors(), 2); assert_eq!(cg.groups[0], vec![0]); assert_eq!(cg.groups[1], vec![1]); } #[test] fn three_events_two_colors() { // Event 0: {0, 1}; event 1: {2, 3}; event 2: {0, 2}. // Greedy: ev0→c0, ev1→c0 (disjoint), ev2 overlaps both→c1. let cg = color_greedy(3, |i| match i { 0 => vec![idx(0), idx(1)], 1 => vec![idx(2), idx(3)], 2 => vec![idx(0), idx(2)], _ => unreachable!(), }); assert_eq!(cg.n_colors(), 2); assert_eq!(cg.groups[0], vec![0, 1]); assert_eq!(cg.groups[1], vec![2]); } #[test] fn total_events_counts_correctly() { let cg = color_greedy(4, |_| vec![idx(0)]); // All events touch index 0 → 4 distinct colors. assert_eq!(cg.n_colors(), 4); assert_eq!(cg.total_events(), 4); } } ``` - [ ] **Step 2: Register in `src/lib.rs`** Add (private module, alphabetical): ```rust mod color_group; ``` No public re-export — this is internal infrastructure. - [ ] **Step 3: Verify** ```bash cargo test --features approx --lib color_group cargo clippy --all-targets --features approx -- -D warnings cargo +nightly fmt --check ``` Expected: 5 tests pass in the `color_group::tests` module. - [ ] **Step 4: Commit** ```bash cargo +nightly fmt git add src/color_group.rs src/lib.rs git commit -m "$(cat <<'EOF' feat(color-group): add greedy within-slice event partitioning ColorGroups holds a partition of event indices into color groups such that events of the same color touch no shared Index. Computed greedily in ingestion order: each event goes into the first color whose existing members are disjoint from the event's indices. Used in T3 for safe within-slice parallelism — events in the same color can run concurrently without touching each other's skills. Part of T3 of docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md. EOF )" ``` --- ## Task 5: Store color-group partition in `TimeSlice` **Files:** - Modify: `src/time_slice.rs` - [ ] **Step 1: Add field to `TimeSlice`** ```rust pub(crate) struct TimeSlice { // … existing fields … pub(crate) color_groups: crate::color_group::ColorGroups, } ``` Initialize to empty in `TimeSlice::new`: ```rust pub fn new(time: T, p_draw: f64) -> Self { Self { // … existing initializers … color_groups: crate::color_group::ColorGroups::new(), } } ``` - [ ] **Step 2: Recompute color groups whenever events change** Find the methods on `TimeSlice` that mutate `self.events` (most likely `add_events` — look at the current signature in `src/time_slice.rs`). After any mutation, recompute the partition: ```rust pub(crate) fn recompute_color_groups(&mut self) { let n = self.events.len(); self.color_groups = crate::color_group::color_greedy(n, |ev_idx| { // Return an iterator of every Index touched by event ev_idx. // Each event has teams; each team has items; each item has an agent: Index. self.events[ev_idx] .teams .iter() .flat_map(|t| t.items.iter().map(|it| it.agent)) .collect::>() }); } ``` Call `recompute_color_groups()` at the end of any public/crate-visible method that mutates `self.events` (likely one or two sites). **Note the exact method name by reading `src/time_slice.rs` first** — it may differ from "add_events". - [ ] **Step 3: Add a sanity test** ```rust #[test] fn time_slice_recomputes_color_groups() { // Construct a time slice with 2 events sharing competitor a — they should // end up in different color groups. // … use existing test helpers to build the slice … // Assert slice.color_groups.n_colors() == 2 and each group has 1 event. } ``` The exact helper pattern depends on how existing `src/time_slice.rs::tests` construct slices. Mirror the existing approach. - [ ] **Step 4: Verify** ```bash cargo test --features approx --lib time_slice cargo clippy --all-targets --features approx -- -D warnings ``` Expected: all existing time_slice tests still pass + 1 new test. - [ ] **Step 5: Commit** ```bash cargo +nightly fmt git add -A git commit -m "$(cat <<'EOF' feat(time-slice): pre-compute color groups at ingestion TimeSlice now stores a ColorGroups partition recomputed whenever events change. The partition is computed once per slice mutation and reused on every convergence iteration, enabling the cheap within-slice parallel sweep added in Task 6. Part of T3. EOF )" ``` --- ## Task 6: Parallel within-slice event iteration (behind `rayon` feature) **Files:** - Modify: `src/time_slice.rs` This is the core parallelism work. Within a color group, events touch disjoint `Index` values — it's safe to process them concurrently. Across colors, processing is strictly sequential. This preserves exact async-EP semantics. - [ ] **Step 1: Identify the event-iteration hot path** Read `src/time_slice.rs` to find the method that, per convergence iteration, walks all events in the slice and updates their skills. Look for patterns like `for event in self.events.iter_mut()` or `self.events.iter().for_each(...)`. This is the target for parallelization. Probably named `iteration` or similar. Note its full signature and current body. - [ ] **Step 2: Rewrite the loop as color-group-driven** **Sequential version** (always present, used when `rayon` is disabled): ```rust pub(crate) fn iteration(&mut self, …) -> (f64, f64) { let mut max_step = (0.0_f64, 0.0_f64); for color in &self.color_groups.groups { for &ev_idx in color { let step = self.events[ev_idx].iteration(…); max_step.0 = max_step.0.max(step.0); max_step.1 = max_step.1.max(step.1); } } max_step } ``` **Parallel version** (behind `cfg`): ```rust #[cfg(feature = "rayon")] pub(crate) fn iteration(&mut self, …) -> (f64, f64) { use rayon::prelude::*; let mut max_step = (0.0_f64, 0.0_f64); for color in &self.color_groups.groups { // Within one color, events touch disjoint Indexes → safe to parallelize. // SAFETY: color_greedy guarantees disjoint index sets, so the slice // entries color[i] and color[j] (i ≠ j) can be mutably borrowed // concurrently via rayon's work-stealing executor. We use // par_iter over the event indices and bundle the per-event state // into something Sync. // // Concretely: since `self` is &mut and we can't simultaneously have // mut borrows into it, we need to split the borrow. Use // events[..].par_iter_mut() filtered to the color's indices? No — // rayon's par_iter_mut doesn't support index filtering directly. // // Instead: use events.as_mut_slice().par_chunks_mut() after sorting // events so color-members are contiguous — or extract the per-event // state into a &mut [EventState] and index directly. // // Practical implementation: extract the events we'll touch this color // into a Vec<&mut Event> using split_at_mut or similar. See impl below. let contributions: Vec<(f64, f64)> = color .par_iter() .map(|&ev_idx| { // Borrow the event mutably — this requires unsafe or an // alternate data layout that exposes color-disjoint slices. // See the "Design note" below for the chosen approach. todo!("color-disjoint mutable event access") }) .collect(); for (d0, d1) in contributions { max_step.0 = max_step.0.max(d0); max_step.1 = max_step.1.max(d1); } } max_step } ``` **Design note on mutable access:** Rayon's `par_iter_mut` doesn't let us access arbitrary indices from the same `&mut Vec` in parallel. Options the implementer should choose between, depending on which compiles most cleanly with the existing `TimeSlice::iteration` body: 1. **Interior mutability.** Wrap each `Event` in `Cell<…>` or `RefCell<…>`, then `Event: Sync`. This works only if the event's internal state is small/cheap to copy. Adds overhead. 2. **Manual `split_at_mut` sequence.** Sort events into color order once (mutating `self.events` so color[0][0], color[0][1], …, color[1][0], … are contiguous), remember the boundaries, then `par_chunks_mut` over each color's contiguous range. Simple, no unsafe. Does require a one-time sort when color groups are computed. 3. **Raw pointer juggling.** `SAFETY`-commented `unsafe` blocks that pass `*mut Event` into parallel closures. Fast, but fragile. Avoid unless (1) and (2) are benchmarked and found insufficient. **Recommendation: approach (2).** When color groups are computed in Task 5's `recompute_color_groups`, also physically reorder `self.events` so color members are contiguous. Then each color corresponds to a slice range `self.events[color_start..color_end]`, and `par_chunks_mut(…).for_each(…)` or `par_iter_mut()` over the range works. **Revised `recompute_color_groups`:** ```rust pub(crate) fn recompute_color_groups(&mut self) { let n = self.events.len(); let cg = crate::color_group::color_greedy(n, |ev_idx| { self.events[ev_idx] .teams .iter() .flat_map(|t| t.items.iter().map(|it| it.agent)) .collect::>() }); // Physically reorder self.events to match the color-group layout. let mut reordered: Vec = Vec::with_capacity(n); let mut ranges: Vec<(usize, usize)> = Vec::with_capacity(cg.groups.len()); for group in &cg.groups { let start = reordered.len(); for &ev_idx in group { reordered.push(std::mem::replace( &mut self.events[ev_idx], // Placeholder; original slot becomes garbage before the // final swap. Event::placeholder(), // OR use std::mem::take if Event: Default )); } ranges.push((start, reordered.len())); } self.events = reordered; // Rebuild cg with post-reorder indices: each group now spans a // contiguous range. self.color_groups = ColorGroups::from_ranges(ranges); } ``` Then color[i] is event indices `[ranges[i].0 .. ranges[i].1)`. `ColorGroups::from_ranges(ranges: Vec<(usize, usize)>)` constructs the groups with the trivial mapping `group_i = (start..end).collect()`. **Pitfall**: `Event::placeholder()` or `std::mem::take`. If `Event: Default`, `std::mem::take(&mut self.events[ev_idx])` works cleanly. If not, use `Option` temporarily, or implement a cheap `Event::placeholder()`. **Before writing the replacement logic, read `src/time_slice.rs` to see if Event derives Default — if it does, use `std::mem::take`.** - [ ] **Step 3: Implement the parallel iteration via contiguous ranges** ```rust #[cfg(feature = "rayon")] pub(crate) fn iteration(&mut self, …) -> (f64, f64) { use rayon::prelude::*; let mut max_step = (0.0_f64, 0.0_f64); for range in &self.color_groups.ranges { let slice = &mut self.events[range.0..range.1]; let contributions: Vec<(f64, f64)> = slice .par_iter_mut() .map(|event| event.iteration(…)) .collect(); for (d0, d1) in contributions { max_step.0 = max_step.0.max(d0); max_step.1 = max_step.1.max(d1); } } max_step } ``` **IMPORTANT**: `event.iteration(…)` currently takes other borrowed state from `TimeSlice` (skills, competitors). Those borrows conflict with the `&mut self.events[...]` borrow. Resolution: a) Pull the shared data (`&self.skills`, `&self.competitors`) into a local variable before the par_iter, so rayon's closure captures `&` not `&self`. b) If the existing per-event method ALSO mutates shared state (e.g., writes to a shared SkillStore), that's a problem — it breaks the color-disjoint-index guarantee. Read the current code carefully. If per-event iteration writes to skills for indices it owns, the color-group invariant makes this safe, but you'll need unsafe to express it. In that case, fall back to approach (1) or (3) from Task 6 Step 2. **Read the existing `event.iteration` or equivalent FIRST before choosing.** The code may already be structured so events only mutate themselves. - [ ] **Step 4: Sequential fallback** ```rust #[cfg(not(feature = "rayon"))] pub(crate) fn iteration(&mut self, …) -> (f64, f64) { let mut max_step = (0.0_f64, 0.0_f64); for range in &self.color_groups.ranges { for event in &mut self.events[range.0..range.1] { let step = event.iteration(…); max_step.0 = max_step.0.max(step.0); max_step.1 = max_step.1.max(step.1); } } max_step } ``` Both versions use the same color-group traversal order — behavior is identical across feature flags. - [ ] **Step 5: Verify both feature combinations** ```bash cargo test --features approx --lib cargo test --features approx,rayon --lib cargo clippy --all-targets --features approx -- -D warnings cargo clippy --all-targets --features approx,rayon -- -D warnings ``` All 90 tests pass in both configurations. Goldens must NOT drift. - [ ] **Step 6: Commit** ```bash cargo +nightly fmt git add -A git commit -m "$(cat <<'EOF' feat(time-slice): parallel within-slice iteration via rayon Events are reordered into color-group-contiguous ranges during recompute_color_groups; each color's range is processed in parallel via par_iter_mut when the rayon feature is enabled, sequentially otherwise. The two paths produce identical results because events within a color touch disjoint Index values (async-EP invariant). Feature gated: default build still sequential; --features rayon activates the parallel path. Part of T3. EOF )" ``` --- ## Task 7: Parallel `learning_curves` with ordered reduction **Files:** - Modify: `src/history.rs` Both `learning_curves()` and `learning_curves_by_index()` currently iterate `self.time_slices` and collect per-competitor posteriors. They're embarrassingly parallel per-slice; the merge at the end must preserve slice order to keep tests deterministic. - [ ] **Step 1: Parallelize `learning_curves`** ```rust pub fn learning_curves(&self) -> HashMap> { #[cfg(feature = "rayon")] { use rayon::prelude::*; // Parallel: compute per-slice (index, time, gaussian) triples; // collect preserves slice order (.collect::> is order-preserving). let per_slice: Vec> = self .time_slices .par_iter() .map(|ts| { ts.skills .iter() .map(|(idx, sk)| (idx, ts.time, sk.posterior())) .collect() }) .collect(); // Sequential merge: iterate in slice order, push to per-key vectors. let mut data: HashMap> = HashMap::new(); for slice_contrib in per_slice { for (idx, t, g) in slice_contrib { if let Some(key) = self.keys.key(idx).cloned() { data.entry(key).or_default().push((t, g)); } } } data } #[cfg(not(feature = "rayon"))] { // Original sequential impl (unchanged) let mut data: HashMap> = HashMap::new(); for ts in &self.time_slices { for (idx, sk) in ts.skills.iter() { if let Some(key) = self.keys.key(idx).cloned() { data.entry(key).or_default().push((ts.time, sk.posterior())); } } } data } } ``` - [ ] **Step 2: Parallelize `learning_curves_by_index`** Same pattern — should it exist. (Task 20 of T2 may have removed this method; verify it's present.) If it's there, mirror the `learning_curves` parallel+sequential split. - [ ] **Step 3: Verify** ```bash cargo test --features approx,rayon ``` All tests pass — goldens preserved across both feature configurations. - [ ] **Step 4: Commit** ```bash cargo +nightly fmt git add -A git commit -m "$(cat <<'EOF' feat(history): parallel learning_curves under rayon feature Per-slice posterior collection runs in parallel; merge into the per-key HashMap is sequential in slice order to preserve deterministic output. Sequential impl unchanged under default feature set. Part of T3. EOF )" ``` --- ## Task 8: Parallel `log_evidence` / `log_evidence_for` with deterministic sum **Files:** - Modify: `src/history.rs` `log_evidence_internal` already does `self.time_slices.iter().map(|ts| ts.log_evidence(…)).sum()`. We replace with parallel map + sequential sum. - [ ] **Step 1: Parallelize `log_evidence_internal`** ```rust pub(crate) fn log_evidence_internal(&mut self, forward: bool, targets: &[Index]) -> f64 { #[cfg(feature = "rayon")] { use rayon::prelude::*; let per_slice: Vec = self .time_slices .par_iter() .map(|ts| ts.log_evidence(self.online, targets, forward, &self.competitors)) .collect(); // Sequential sum in slice order for bit-identical reduction. per_slice.into_iter().sum() } #[cfg(not(feature = "rayon"))] { self.time_slices .iter() .map(|ts| ts.log_evidence(self.online, targets, forward, &self.competitors)) .sum() } } ``` **Critical:** `per_slice` is a `Vec`, not a fold. The sequential `.into_iter().sum()` is bit-identical to the sequential impl because the order is the same (slice order). Rayon's `par_iter().sum()` would reorder additions and is non-deterministic. - [ ] **Step 2: `log_evidence_for` already wraps `log_evidence_internal`** — no further changes needed. - [ ] **Step 3: Verify** ```bash cargo test --features approx,rayon ``` All tests pass — goldens preserved. - [ ] **Step 4: Commit** ```bash cargo +nightly fmt git add -A git commit -m "$(cat <<'EOF' feat(history): parallel log_evidence with deterministic sum Per-slice contribution computed in parallel; final reduction is sequential in slice order so the sum is bit-identical to the T2 sequential baseline. This is essential for the T3 acceptance criterion of identical posteriors across RAYON_NUM_THREADS values. Part of T3. EOF )" ``` --- ## Task 9: Determinism test across thread counts **Files:** - Create: `tests/determinism.rs` - [ ] **Step 1: Create the test** ```rust //! Determinism tests: identical posteriors across RAYON_NUM_THREADS //! values. Only meaningful when the `rayon` feature is enabled. #![cfg(feature = "rayon")] use smallvec::smallvec; use trueskill_tt::{ ConstantDrift, ConvergenceOptions, Event, History, Member, Outcome, Team, }; fn build_and_converge(seed: u64) -> Vec<(i64, trueskill_tt::Gaussian)> { // Seed-driven deterministic test data: ~100 events, ~30 competitors. let mut h = History::::builder() .mu(25.0) .sigma(25.0 / 3.0) .beta(25.0 / 6.0) .drift(ConstantDrift(25.0 / 300.0)) .convergence(ConvergenceOptions { max_iter: 30, epsilon: 1e-6 }) .build(); // Deterministic pseudo-random event generation. let mut rng_state = seed; let mut next = || { rng_state = rng_state.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407); rng_state }; let mut events: Vec> = Vec::with_capacity(100); for ev_i in 0..100 { let a = (next() % 30) as usize; let mut b = (next() % 30) as usize; while b == a { b = (next() % 30) as usize; } events.push(Event { time: ev_i as i64 + 1, teams: smallvec![ Team::with_members([Member::new(format!("p{a}"))]), Team::with_members([Member::new(format!("p{b}"))]), ], outcome: Outcome::winner((next() % 2) as u32, 2), }); } h.add_events(events).unwrap(); h.converge().unwrap(); h.learning_curve(&"p0".to_string()) } #[test] fn posteriors_identical_across_thread_counts() { // Run the same history with the same seed at different rayon pool sizes. // Rayon lets us set the pool once per process; the cleanest pattern is // to use install() with a custom ThreadPoolBuilder per run. let sizes = [1, 2, 4, 8]; let mut results: Vec> = Vec::new(); for &n in &sizes { let pool = rayon::ThreadPoolBuilder::new() .num_threads(n) .build() .unwrap(); let curve = pool.install(|| build_and_converge(42)); results.push(curve); } // Every result must be bit-identical to the first. let reference = &results[0]; for (i, curve) in results.iter().enumerate().skip(1) { assert_eq!( curve.len(), reference.len(), "curve length differs at {n} threads", n = sizes[i], ); for (j, (&(t_ref, g_ref), &(t, g))) in reference.iter().zip(curve.iter()).enumerate() { assert_eq!(t_ref, t, "time point {j} differs at {} threads", sizes[i]); assert_eq!( g_ref.pi_and_tau(), g.pi_and_tau(), "posterior bits differ at thread count {}, time {}", sizes[i], t, ); } } } ``` **Note on `pi_and_tau`**: the test expects a way to extract the raw nat-param representation for bit-level comparison. If the `Gaussian` type doesn't expose it, add a `pub(crate) fn pi_and_tau(&self) -> (f64, f64)` method as part of this task. Alternatively, compare `(mu(), sigma())` — slightly less strict but usually good enough for determinism testing. Actually, `f64::to_bits()` gives us direct bit equality: ```rust assert_eq!(g_ref.mu().to_bits(), g.mu().to_bits(), …); assert_eq!(g_ref.sigma().to_bits(), g.sigma().to_bits(), …); ``` Use `to_bits()` so we don't need a new accessor. - [ ] **Step 2: Verify** ```bash cargo test --features approx,rayon --test determinism ``` Expected: `posteriors_identical_across_thread_counts` passes. - [ ] **Step 3: Commit** ```bash cargo +nightly fmt git add tests/determinism.rs git commit -m "$(cat <<'EOF' test: assert bit-identical posteriors across RAYON_NUM_THREADS Runs the same deterministic history at thread counts {1, 2, 4, 8} and asserts every (time, posterior) pair is bit-identical. Verifies the T3 determinism invariant holds under the ordered-reduce strategy. Only compiled with --features rayon. Part of T3. EOF )" ``` --- ## Task 10: Multi-thread benchmark + acceptance gate **Files:** - Create: `benches/history_converge.rs` - Modify: `Cargo.toml` (register the bench) - [ ] **Step 1: Register the bench in `Cargo.toml`** Add under the existing `[[bench]]` entries: ```toml [[bench]] name = "history_converge" harness = false ``` - [ ] **Step 2: Create `benches/history_converge.rs`** ```rust //! End-to-end convergence benchmark. Measures `History::converge()` on a //! realistic workload (~500 events, ~100 competitors, ~30 iters). The //! rayon feature, when enabled, activates within-slice parallel event //! iteration. use criterion::{Criterion, criterion_group, criterion_main}; use smallvec::smallvec; use trueskill_tt::{ ConstantDrift, ConvergenceOptions, Event, History, Member, Outcome, Team, }; fn build_history(n_events: usize, n_competitors: usize, seed: u64) -> History { let mut rng = seed; let mut next = || { rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407); rng }; let mut h = History::::builder() .mu(25.0) .sigma(25.0 / 3.0) .beta(25.0 / 6.0) .drift(ConstantDrift(25.0 / 300.0)) .convergence(ConvergenceOptions { max_iter: 30, epsilon: 1e-6 }) .build(); let mut events: Vec> = Vec::with_capacity(n_events); for ev_i in 0..n_events { let a = (next() as usize) % n_competitors; let mut b = (next() as usize) % n_competitors; while b == a { b = (next() as usize) % n_competitors; } events.push(Event { time: (ev_i as i64 / 10) + 1, // ~10 events per slice teams: smallvec![ Team::with_members([Member::new(format!("p{a}"))]), Team::with_members([Member::new(format!("p{b}"))]), ], outcome: Outcome::winner((next() % 2) as u32, 2), }); } h.add_events(events).unwrap(); h } fn bench_converge(c: &mut Criterion) { c.bench_function("History::converge/500x100", |b| { b.iter_batched( || build_history(500, 100, 42), |mut h| { h.converge().unwrap(); }, criterion::BatchSize::SmallInput, ); }); } criterion_group!(benches, bench_converge); criterion_main!(benches); ``` - [ ] **Step 3: Run sequential baseline** ```bash cargo bench --bench history_converge --features approx 2>&1 | grep 'History::converge' ``` Record the sequential baseline number. - [ ] **Step 4: Run parallel version** ```bash cargo bench --bench history_converge --features approx,rayon 2>&1 | grep 'History::converge' ``` **Acceptance gate:** parallel run should show >2× speedup on an 8-core machine. If it's less: - Verify rayon is actually being used (`RAYON_LOG=1`). - Check whether the workload has enough color-group parallelism (a slice with 10 events that all share competitors has 0 parallel work). - Consider whether the per-event cost is large enough to amortize rayon overhead. If the gate fails, dig in before committing. Acceptable fallback: tune the benchmark workload (more events per slice, more competitors) so parallelism has more opportunity. Report findings. - [ ] **Step 5: Commit** ```bash cargo +nightly fmt git add -A git commit -m "$(cat <<'EOF' bench: end-to-end History::converge benchmark Workload: 500 events across ~50 time slices, ~100 competitors, 30 iteration cap, 1e-6 convergence. Measures full forward+backward sweep through convergence — the target hot path for rayon-backed within-slice parallelism. Sequential baseline: ms With --features rayon on 8 cores: ms (×) Part of T3. EOF )" ``` (Fill in ``, ``, `` from measured numbers.) --- ## Task 11: Final verification, bench capture, CHANGELOG **Files:** - Modify: `benches/baseline.txt` - Modify: `CHANGELOG.md` - [ ] **Step 1: Run the complete verification matrix** ```bash cargo +nightly fmt --check cargo clippy --all-targets --features approx -- -D warnings cargo clippy --all-targets --features approx,rayon -- -D warnings cargo test --features approx cargo test --features approx,rayon cargo bench --bench batch --features approx 2>&1 | grep "Batch::iteration" cargo bench --bench batch --features approx,rayon 2>&1 | grep "Batch::iteration" cargo bench --bench history_converge --features approx 2>&1 | grep 'History::converge' cargo bench --bench history_converge --features approx,rayon 2>&1 | grep 'History::converge' ``` All must pass / be clean. - [ ] **Step 2: Append T3 block to `benches/baseline.txt`** ``` # After T3 (date, same hardware) Batch::iteration (seq) µs ( vs T2 21.36 µs) Batch::iteration (rayon, 8c) µs (sequential path on single slice; rayon not active) History::converge (seq) ms baseline History::converge (rayon, 8c) ms (× — target was ≥2×) # Notes: # - T3 adds Send + Sync on public traits, color-group partition in # TimeSlice, and rayon-feature-gated parallel paths on within-slice # iteration, learning_curves, log_evidence_for. # - Determinism verified: tests/determinism.rs asserts bit-identical # posteriors across RAYON_NUM_THREADS={1, 2, 4, 8}. # - Rayon is opt-in: default build is single-threaded as in T2. ``` - [ ] **Step 3: Prepend T3 entry to `CHANGELOG.md`** Add a `## Unreleased — T3 concurrency` section above the existing `## Unreleased — T2 new API surface` (if it's still there after the main merge; otherwise above the `## 0.1.0` section). Enumerate: new `rayon` feature, `Send + Sync` bounds (minor breaking for downstream custom trait impls), color-group infrastructure (internal), benchmark numbers. - [ ] **Step 4: Commit** ```bash git add benches/baseline.txt CHANGELOG.md git commit -m "$(cat <<'EOF' bench,docs: capture T3 final numbers and update CHANGELOG History::converge parallel speedup: × on 8 cores with --features rayon. Batch::iteration unchanged in seq mode; Gaussian ops unchanged. Determinism verified: bit-identical posteriors across RAYON_NUM_THREADS={1, 2, 4, 8}. Closes T3 of docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md. EOF )" ``` - [ ] **Step 5: Ready to PR** Branch `t3-concurrency` should open against `main`. PR body references the spec, the plan, and this file's benchmarks block. --- ## Self-review notes **Spec coverage** (against Section 6 + Section 7 "T3"): - ✅ `Send + Sync` audit and bounds on all public traits (Task 3) - ✅ Color-group partitioning at `TimeSlice` ingestion (Tasks 4, 5) - ✅ `rayon` as opt-in feature (Task 2; note: spec said default-on, we flip later) - ✅ Parallel within-slice color groups (Task 6) - ✅ Parallel `learning_curves` (Task 7) - ✅ Parallel `log_evidence_for` (Task 8) - ✅ Deterministic posteriors across `RAYON_NUM_THREADS` (Task 9) - ✅ >2× speedup on 8-core offline converge (Task 10) **Deferred to later tiers:** - Cross-slice parallelism (dirty-bit slice skipping) — spec acknowledges this is separate from T3's within-slice focus. - Synchronous-EP schedule with barrier merge — available as a `Schedule` impl, per spec Section 6, but deferred. - Exposing color-group partitioning to users via `add_events_with_partition` — per spec open question 5. - Rayon default-on flip — after field-stability evidence. **Hazards during execution:** - **Mutable-aliasing gymnastics in Task 6.** The color-disjoint guarantee is real but rayon's API doesn't see it. Task 6 Step 2 offers three implementation approaches; the recommended one (approach 2, reorder events into contiguous color ranges) avoids `unsafe` but requires a physical reorder on mutation. Benchmark reordering cost — if it dominates, reconsider. - **`Send + Sync` auto-derive failure.** If any struct between traits holds a non-thread-safe primitive (raw pointer, `Rc`), auto-derive fails. Unlikely in this codebase, but watch for it in Task 3. - **Deterministic sum pitfalls.** Rayon's `par_iter().sum()` reorders additions. Only `par_iter().map().collect::>().into_iter().sum()` is deterministic. Task 8 is careful about this; reviewers should verify. - **`f64` NaN in goldens.** If any test golden has a NaN, `to_bits()` comparison needs special casing. Our goldens are all finite; no action needed. **Things outside the plan that may bite:** - CI needs to test both feature configurations. If there's a `.github/workflows/` file, update it to include `--features approx,rayon` as a matrix entry. If no CI exists (likely for this project), flag and move on. - `examples/atp.rs` — may want to demonstrate `--features rayon`. Optional; skip unless trivial.