diff --git a/src/time_slice.rs b/src/time_slice.rs index bf6008d..cb1eed2 100644 --- a/src/time_slice.rs +++ b/src/time_slice.rs @@ -84,6 +84,17 @@ pub(crate) struct Event { weights: Vec>, } +/// Output of a single event's inference pass — ready to apply back to shared state. +struct EventOutput { + /// New per-team/per-item likelihoods (same shape as `event.teams`). + likelihoods: Vec>, + evidence: f64, + /// (agent index, new skill likelihood) pairs for the sequential apply step + /// that updates `SkillStore`. Computed while holding `&SkillStore` so the + /// caller only needs `&mut SkillStore` when writing back. + skill_updates: Vec<(Index, Gaussian)>, +} + impl Event { pub(crate) fn iter_agents(&self) -> impl Iterator + '_ { self.teams @@ -115,6 +126,51 @@ impl Event { }) .collect::>() } + + /// Compute the inference update for this event, returning an `EventOutput` + /// that describes the mutations to apply. Takes only shared references so + /// it can run inside a parallel closure. + fn compute>( + &self, + skills: &SkillStore, + agents: &CompetitorStore, + p_draw: f64, + ) -> EventOutput { + let mut arena = ScratchArena::new(); + let teams = self.within_priors(false, false, skills, agents); + let result = self.outputs(); + let g = Game::ranked_with_arena(teams, &result, &self.weights, p_draw, &mut arena); + + // Pre-compute new skill likelihoods while we still hold &skills. + let mut skill_updates: Vec<(Index, Gaussian)> = Vec::new(); + for (t, team) in self.teams.iter().enumerate() { + for (i, item) in team.items.iter().enumerate() { + let old_skill_likelihood = skills.get(item.agent).unwrap().likelihood; + let new_item_likelihood = g.likelihoods[t][i]; + let new_skill_likelihood = + (old_skill_likelihood / item.likelihood) * new_item_likelihood; + skill_updates.push((item.agent, new_skill_likelihood)); + } + } + + EventOutput { + likelihoods: g.likelihoods, + evidence: g.evidence, + skill_updates, + } + } + + /// Apply an `EventOutput` back onto this event's mutable item likelihoods + /// and evidence. The `SkillStore` updates are applied separately by the + /// caller to avoid conflicting borrows. + fn apply_output(&mut self, output: &EventOutput) { + self.evidence = output.evidence; + for (t, team) in self.teams.iter_mut().enumerate() { + for (i, item) in team.items.iter_mut().enumerate() { + item.likelihood = output.likelihoods[t][i]; + } + } + } } #[derive(Debug)] @@ -266,28 +322,79 @@ impl TimeSlice { } pub fn iteration>(&mut self, from: usize, agents: &CompetitorStore) { - for event in self.events.iter_mut().skip(from) { - let teams = event.within_priors(false, false, &self.skills, agents); - let result = event.outputs(); + if from > 0 || self.color_groups.is_empty() { + // Initial pass (add_events) or no color groups yet: simple sequential sweep. + for event in self.events.iter_mut().skip(from) { + let teams = event.within_priors(false, false, &self.skills, agents); + let result = event.outputs(); - let g = Game::ranked_with_arena( - teams, - &result, - &event.weights, - self.p_draw, - &mut self.arena, - ); + let g = Game::ranked_with_arena( + teams, + &result, + &event.weights, + self.p_draw, + &mut self.arena, + ); - for (t, team) in event.teams.iter_mut().enumerate() { - for (i, item) in team.items.iter_mut().enumerate() { - let old_likelihood = self.skills.get(item.agent).unwrap().likelihood; - let new_likelihood = (old_likelihood / item.likelihood) * g.likelihoods[t][i]; - self.skills.get_mut(item.agent).unwrap().likelihood = new_likelihood; - item.likelihood = g.likelihoods[t][i]; + for (t, team) in event.teams.iter_mut().enumerate() { + for (i, item) in team.items.iter_mut().enumerate() { + let old_likelihood = self.skills.get(item.agent).unwrap().likelihood; + let new_likelihood = + (old_likelihood / item.likelihood) * g.likelihoods[t][i]; + self.skills.get_mut(item.agent).unwrap().likelihood = new_likelihood; + item.likelihood = g.likelihoods[t][i]; + } } - } - event.evidence = g.evidence; + event.evidence = g.evidence; + } + } else { + self.sweep_color_groups(agents); + } + } + + /// Full event sweep using the color-group partition. Colors are processed + /// sequentially; within each color the inner loop is parallel under rayon. + fn sweep_color_groups>(&mut self, agents: &CompetitorStore) { + // We need &self.skills (immutable) and &mut self.events (mutable) at the + // same time. Rust allows this because they are distinct struct fields. + // The parallel closure captures &self.skills and &self.p_draw by shared + // ref; it returns owned EventOutput values that we apply sequentially. + for color_idx in 0..self.color_groups.groups.len() { + if self.color_groups.groups[color_idx].is_empty() { + continue; + } + let range = self.color_groups.color_range(color_idx); + + // Compute phase — parallel under rayon, sequential otherwise. + // Borrows: &self.skills and &agents are shared refs captured by the closure; + // &mut self.events[range] is the mutable slice for par_iter_mut. + let p_draw = self.p_draw; + let skills: &SkillStore = &self.skills; + + #[cfg(feature = "rayon")] + let outputs: Vec = { + use rayon::prelude::*; + self.events[range.clone()] + .par_iter() + .map(|ev| ev.compute(skills, agents, p_draw)) + .collect() + }; + + #[cfg(not(feature = "rayon"))] + let outputs: Vec = self.events[range.clone()] + .iter() + .map(|ev| ev.compute(skills, agents, p_draw)) + .collect(); + + // Apply phase — sequential: write skill likelihoods back to self.skills, + // then update per-event item likelihoods and evidence. + for (ev, output) in self.events[range].iter_mut().zip(outputs.iter()) { + for &(agent, new_skill_lhood) in &output.skill_updates { + self.skills.get_mut(agent).unwrap().likelihood = new_skill_lhood; + } + ev.apply_output(output); + } } }