T3: rayon-backed concurrency (opt-in) #2
@@ -84,6 +84,17 @@ pub(crate) struct Event {
|
||||
weights: Vec<Vec<f64>>,
|
||||
}
|
||||
|
||||
/// Output of a single event's inference pass — ready to apply back to shared state.
|
||||
struct EventOutput {
|
||||
/// New per-team/per-item likelihoods (same shape as `event.teams`).
|
||||
likelihoods: Vec<Vec<Gaussian>>,
|
||||
evidence: f64,
|
||||
/// (agent index, new skill likelihood) pairs for the sequential apply step
|
||||
/// that updates `SkillStore`. Computed while holding `&SkillStore` so the
|
||||
/// caller only needs `&mut SkillStore` when writing back.
|
||||
skill_updates: Vec<(Index, Gaussian)>,
|
||||
}
|
||||
|
||||
impl Event {
|
||||
pub(crate) fn iter_agents(&self) -> impl Iterator<Item = Index> + '_ {
|
||||
self.teams
|
||||
@@ -115,6 +126,51 @@ impl Event {
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
/// Compute the inference update for this event, returning an `EventOutput`
|
||||
/// that describes the mutations to apply. Takes only shared references so
|
||||
/// it can run inside a parallel closure.
|
||||
fn compute<T: Time, D: Drift<T>>(
|
||||
&self,
|
||||
skills: &SkillStore,
|
||||
agents: &CompetitorStore<T, D>,
|
||||
p_draw: f64,
|
||||
) -> EventOutput {
|
||||
let mut arena = ScratchArena::new();
|
||||
let teams = self.within_priors(false, false, skills, agents);
|
||||
let result = self.outputs();
|
||||
let g = Game::ranked_with_arena(teams, &result, &self.weights, p_draw, &mut arena);
|
||||
|
||||
// Pre-compute new skill likelihoods while we still hold &skills.
|
||||
let mut skill_updates: Vec<(Index, Gaussian)> = Vec::new();
|
||||
for (t, team) in self.teams.iter().enumerate() {
|
||||
for (i, item) in team.items.iter().enumerate() {
|
||||
let old_skill_likelihood = skills.get(item.agent).unwrap().likelihood;
|
||||
let new_item_likelihood = g.likelihoods[t][i];
|
||||
let new_skill_likelihood =
|
||||
(old_skill_likelihood / item.likelihood) * new_item_likelihood;
|
||||
skill_updates.push((item.agent, new_skill_likelihood));
|
||||
}
|
||||
}
|
||||
|
||||
EventOutput {
|
||||
likelihoods: g.likelihoods,
|
||||
evidence: g.evidence,
|
||||
skill_updates,
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply an `EventOutput` back onto this event's mutable item likelihoods
|
||||
/// and evidence. The `SkillStore` updates are applied separately by the
|
||||
/// caller to avoid conflicting borrows.
|
||||
fn apply_output(&mut self, output: &EventOutput) {
|
||||
self.evidence = output.evidence;
|
||||
for (t, team) in self.teams.iter_mut().enumerate() {
|
||||
for (i, item) in team.items.iter_mut().enumerate() {
|
||||
item.likelihood = output.likelihoods[t][i];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -266,28 +322,79 @@ impl<T: Time> TimeSlice<T> {
|
||||
}
|
||||
|
||||
pub fn iteration<D: Drift<T>>(&mut self, from: usize, agents: &CompetitorStore<T, D>) {
|
||||
for event in self.events.iter_mut().skip(from) {
|
||||
let teams = event.within_priors(false, false, &self.skills, agents);
|
||||
let result = event.outputs();
|
||||
if from > 0 || self.color_groups.is_empty() {
|
||||
// Initial pass (add_events) or no color groups yet: simple sequential sweep.
|
||||
for event in self.events.iter_mut().skip(from) {
|
||||
let teams = event.within_priors(false, false, &self.skills, agents);
|
||||
let result = event.outputs();
|
||||
|
||||
let g = Game::ranked_with_arena(
|
||||
teams,
|
||||
&result,
|
||||
&event.weights,
|
||||
self.p_draw,
|
||||
&mut self.arena,
|
||||
);
|
||||
let g = Game::ranked_with_arena(
|
||||
teams,
|
||||
&result,
|
||||
&event.weights,
|
||||
self.p_draw,
|
||||
&mut self.arena,
|
||||
);
|
||||
|
||||
for (t, team) in event.teams.iter_mut().enumerate() {
|
||||
for (i, item) in team.items.iter_mut().enumerate() {
|
||||
let old_likelihood = self.skills.get(item.agent).unwrap().likelihood;
|
||||
let new_likelihood = (old_likelihood / item.likelihood) * g.likelihoods[t][i];
|
||||
self.skills.get_mut(item.agent).unwrap().likelihood = new_likelihood;
|
||||
item.likelihood = g.likelihoods[t][i];
|
||||
for (t, team) in event.teams.iter_mut().enumerate() {
|
||||
for (i, item) in team.items.iter_mut().enumerate() {
|
||||
let old_likelihood = self.skills.get(item.agent).unwrap().likelihood;
|
||||
let new_likelihood =
|
||||
(old_likelihood / item.likelihood) * g.likelihoods[t][i];
|
||||
self.skills.get_mut(item.agent).unwrap().likelihood = new_likelihood;
|
||||
item.likelihood = g.likelihoods[t][i];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
event.evidence = g.evidence;
|
||||
event.evidence = g.evidence;
|
||||
}
|
||||
} else {
|
||||
self.sweep_color_groups(agents);
|
||||
}
|
||||
}
|
||||
|
||||
/// Full event sweep using the color-group partition. Colors are processed
|
||||
/// sequentially; within each color the inner loop is parallel under rayon.
|
||||
fn sweep_color_groups<D: Drift<T>>(&mut self, agents: &CompetitorStore<T, D>) {
|
||||
// We need &self.skills (immutable) and &mut self.events (mutable) at the
|
||||
// same time. Rust allows this because they are distinct struct fields.
|
||||
// The parallel closure captures &self.skills and &self.p_draw by shared
|
||||
// ref; it returns owned EventOutput values that we apply sequentially.
|
||||
for color_idx in 0..self.color_groups.groups.len() {
|
||||
if self.color_groups.groups[color_idx].is_empty() {
|
||||
continue;
|
||||
}
|
||||
let range = self.color_groups.color_range(color_idx);
|
||||
|
||||
// Compute phase — parallel under rayon, sequential otherwise.
|
||||
// Borrows: &self.skills and &agents are shared refs captured by the closure;
|
||||
// &mut self.events[range] is the mutable slice for par_iter_mut.
|
||||
let p_draw = self.p_draw;
|
||||
let skills: &SkillStore = &self.skills;
|
||||
|
||||
#[cfg(feature = "rayon")]
|
||||
let outputs: Vec<EventOutput> = {
|
||||
use rayon::prelude::*;
|
||||
self.events[range.clone()]
|
||||
.par_iter()
|
||||
.map(|ev| ev.compute(skills, agents, p_draw))
|
||||
.collect()
|
||||
};
|
||||
|
||||
#[cfg(not(feature = "rayon"))]
|
||||
let outputs: Vec<EventOutput> = self.events[range.clone()]
|
||||
.iter()
|
||||
.map(|ev| ev.compute(skills, agents, p_draw))
|
||||
.collect();
|
||||
|
||||
// Apply phase — sequential: write skill likelihoods back to self.skills,
|
||||
// then update per-event item likelihoods and evidence.
|
||||
for (ev, output) in self.events[range].iter_mut().zip(outputs.iter()) {
|
||||
for &(agent, new_skill_lhood) in &output.skill_updates {
|
||||
self.skills.get_mut(agent).unwrap().likelihood = new_skill_lhood;
|
||||
}
|
||||
ev.apply_output(output);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user