From 3680c54d3c44339791007d38713a8e26fcd96e6e Mon Sep 17 00:00:00 2001 From: Anders Olsson Date: Fri, 24 Apr 2026 13:48:41 +0200 Subject: [PATCH] feat(time-slice): parallel within-slice event iteration via rayon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Under #[cfg(feature = "rayon")], the per-iteration event sweep processes events color-by-color: within a color, events touch disjoint Index values by construction, so par_iter is safe. Across colors, sequential ordering preserves async-EP semantics. Event::compute() is a pure function returning an owned EventOutput (new per-item likelihoods, evidence, and pre-computed new skill likelihoods). The apply phase runs sequentially after the parallel map, writing EventOutput values back to SkillStore and each event's item likelihoods. This avoids shared mutable state in the hot loop. Default build (no rayon) uses a sequential fallback that traverses the same color-group order — behaviorally identical to the parallel path. This keeps goldens bit-identical across feature configurations. Scenario 3b applied: event updates read from and write to the shared SkillStore, so the compute/apply split (Option A) was necessary. Part of T3 of docs/superpowers/specs/2026-04-23-trueskill-engine-redesign-design.md. --- src/time_slice.rs | 143 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 125 insertions(+), 18 deletions(-) diff --git a/src/time_slice.rs b/src/time_slice.rs index bf6008d..cb1eed2 100644 --- a/src/time_slice.rs +++ b/src/time_slice.rs @@ -84,6 +84,17 @@ pub(crate) struct Event { weights: Vec>, } +/// Output of a single event's inference pass — ready to apply back to shared state. +struct EventOutput { + /// New per-team/per-item likelihoods (same shape as `event.teams`). + likelihoods: Vec>, + evidence: f64, + /// (agent index, new skill likelihood) pairs for the sequential apply step + /// that updates `SkillStore`. Computed while holding `&SkillStore` so the + /// caller only needs `&mut SkillStore` when writing back. + skill_updates: Vec<(Index, Gaussian)>, +} + impl Event { pub(crate) fn iter_agents(&self) -> impl Iterator + '_ { self.teams @@ -115,6 +126,51 @@ impl Event { }) .collect::>() } + + /// Compute the inference update for this event, returning an `EventOutput` + /// that describes the mutations to apply. Takes only shared references so + /// it can run inside a parallel closure. + fn compute>( + &self, + skills: &SkillStore, + agents: &CompetitorStore, + p_draw: f64, + ) -> EventOutput { + let mut arena = ScratchArena::new(); + let teams = self.within_priors(false, false, skills, agents); + let result = self.outputs(); + let g = Game::ranked_with_arena(teams, &result, &self.weights, p_draw, &mut arena); + + // Pre-compute new skill likelihoods while we still hold &skills. + let mut skill_updates: Vec<(Index, Gaussian)> = Vec::new(); + for (t, team) in self.teams.iter().enumerate() { + for (i, item) in team.items.iter().enumerate() { + let old_skill_likelihood = skills.get(item.agent).unwrap().likelihood; + let new_item_likelihood = g.likelihoods[t][i]; + let new_skill_likelihood = + (old_skill_likelihood / item.likelihood) * new_item_likelihood; + skill_updates.push((item.agent, new_skill_likelihood)); + } + } + + EventOutput { + likelihoods: g.likelihoods, + evidence: g.evidence, + skill_updates, + } + } + + /// Apply an `EventOutput` back onto this event's mutable item likelihoods + /// and evidence. The `SkillStore` updates are applied separately by the + /// caller to avoid conflicting borrows. + fn apply_output(&mut self, output: &EventOutput) { + self.evidence = output.evidence; + for (t, team) in self.teams.iter_mut().enumerate() { + for (i, item) in team.items.iter_mut().enumerate() { + item.likelihood = output.likelihoods[t][i]; + } + } + } } #[derive(Debug)] @@ -266,28 +322,79 @@ impl TimeSlice { } pub fn iteration>(&mut self, from: usize, agents: &CompetitorStore) { - for event in self.events.iter_mut().skip(from) { - let teams = event.within_priors(false, false, &self.skills, agents); - let result = event.outputs(); + if from > 0 || self.color_groups.is_empty() { + // Initial pass (add_events) or no color groups yet: simple sequential sweep. + for event in self.events.iter_mut().skip(from) { + let teams = event.within_priors(false, false, &self.skills, agents); + let result = event.outputs(); - let g = Game::ranked_with_arena( - teams, - &result, - &event.weights, - self.p_draw, - &mut self.arena, - ); + let g = Game::ranked_with_arena( + teams, + &result, + &event.weights, + self.p_draw, + &mut self.arena, + ); - for (t, team) in event.teams.iter_mut().enumerate() { - for (i, item) in team.items.iter_mut().enumerate() { - let old_likelihood = self.skills.get(item.agent).unwrap().likelihood; - let new_likelihood = (old_likelihood / item.likelihood) * g.likelihoods[t][i]; - self.skills.get_mut(item.agent).unwrap().likelihood = new_likelihood; - item.likelihood = g.likelihoods[t][i]; + for (t, team) in event.teams.iter_mut().enumerate() { + for (i, item) in team.items.iter_mut().enumerate() { + let old_likelihood = self.skills.get(item.agent).unwrap().likelihood; + let new_likelihood = + (old_likelihood / item.likelihood) * g.likelihoods[t][i]; + self.skills.get_mut(item.agent).unwrap().likelihood = new_likelihood; + item.likelihood = g.likelihoods[t][i]; + } } - } - event.evidence = g.evidence; + event.evidence = g.evidence; + } + } else { + self.sweep_color_groups(agents); + } + } + + /// Full event sweep using the color-group partition. Colors are processed + /// sequentially; within each color the inner loop is parallel under rayon. + fn sweep_color_groups>(&mut self, agents: &CompetitorStore) { + // We need &self.skills (immutable) and &mut self.events (mutable) at the + // same time. Rust allows this because they are distinct struct fields. + // The parallel closure captures &self.skills and &self.p_draw by shared + // ref; it returns owned EventOutput values that we apply sequentially. + for color_idx in 0..self.color_groups.groups.len() { + if self.color_groups.groups[color_idx].is_empty() { + continue; + } + let range = self.color_groups.color_range(color_idx); + + // Compute phase — parallel under rayon, sequential otherwise. + // Borrows: &self.skills and &agents are shared refs captured by the closure; + // &mut self.events[range] is the mutable slice for par_iter_mut. + let p_draw = self.p_draw; + let skills: &SkillStore = &self.skills; + + #[cfg(feature = "rayon")] + let outputs: Vec = { + use rayon::prelude::*; + self.events[range.clone()] + .par_iter() + .map(|ev| ev.compute(skills, agents, p_draw)) + .collect() + }; + + #[cfg(not(feature = "rayon"))] + let outputs: Vec = self.events[range.clone()] + .iter() + .map(|ev| ev.compute(skills, agents, p_draw)) + .collect(); + + // Apply phase — sequential: write skill likelihoods back to self.skills, + // then update per-event item likelihoods and evidence. + for (ev, output) in self.events[range].iter_mut().zip(outputs.iter()) { + for &(agent, new_skill_lhood) in &output.skill_updates { + self.skills.get_mut(agent).unwrap().likelihood = new_skill_lhood; + } + ev.apply_output(output); + } } }