perf(time-slice): restore sequential direct-write path under cfg(not(feature = "rayon"))
The compute/apply split introduced in 3680c54 was always active — the
sequential build paid EventOutput heap-alloc overhead even without
rayon, regressing Batch::iteration from 23.46 µs to 33.79 µs (+44%).
This commit makes the split feature-gated: under cfg(feature = "rayon")
the compute/apply pattern stays (needed for par_iter); under
cfg(not(feature = "rayon")) events update SkillStore inline via
Event::iteration_direct, matching the T2 performance profile.
EventOutput, Event::compute, and Event::apply_output are now
cfg(feature = "rayon")-only. TimeSlice::sweep_color_groups has two
cfg-gated implementations sharing the same signature.
Sequential restored to 23.29 µs; parallel 34.31 µs (small-workload
overhead expected — rayon threadpool amortizes at larger scales).
Part of T3.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -85,13 +85,13 @@ pub(crate) struct Event {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Output of a single event's inference pass — ready to apply back to shared state.
|
/// Output of a single event's inference pass — ready to apply back to shared state.
|
||||||
|
///
|
||||||
|
/// Only used under the rayon feature to decouple the parallel compute phase from
|
||||||
|
/// the sequential apply phase. Without rayon the direct-write path is used instead.
|
||||||
|
#[cfg(feature = "rayon")]
|
||||||
struct EventOutput {
|
struct EventOutput {
|
||||||
/// New per-team/per-item likelihoods (same shape as `event.teams`).
|
|
||||||
likelihoods: Vec<Vec<Gaussian>>,
|
likelihoods: Vec<Vec<Gaussian>>,
|
||||||
evidence: f64,
|
evidence: f64,
|
||||||
/// (agent index, new skill likelihood) pairs for the sequential apply step
|
|
||||||
/// that updates `SkillStore`. Computed while holding `&SkillStore` so the
|
|
||||||
/// caller only needs `&mut SkillStore` when writing back.
|
|
||||||
skill_updates: Vec<(Index, Gaussian)>,
|
skill_updates: Vec<(Index, Gaussian)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -130,6 +130,10 @@ impl Event {
|
|||||||
/// Compute the inference update for this event, returning an `EventOutput`
|
/// Compute the inference update for this event, returning an `EventOutput`
|
||||||
/// that describes the mutations to apply. Takes only shared references so
|
/// that describes the mutations to apply. Takes only shared references so
|
||||||
/// it can run inside a parallel closure.
|
/// it can run inside a parallel closure.
|
||||||
|
///
|
||||||
|
/// Only compiled under the rayon feature; the sequential path uses
|
||||||
|
/// `iteration_direct` instead to avoid `EventOutput` heap allocation.
|
||||||
|
#[cfg(feature = "rayon")]
|
||||||
fn compute<T: Time, D: Drift<T>>(
|
fn compute<T: Time, D: Drift<T>>(
|
||||||
&self,
|
&self,
|
||||||
skills: &SkillStore,
|
skills: &SkillStore,
|
||||||
@@ -141,7 +145,6 @@ impl Event {
|
|||||||
let result = self.outputs();
|
let result = self.outputs();
|
||||||
let g = Game::ranked_with_arena(teams, &result, &self.weights, p_draw, &mut arena);
|
let g = Game::ranked_with_arena(teams, &result, &self.weights, p_draw, &mut arena);
|
||||||
|
|
||||||
// Pre-compute new skill likelihoods while we still hold &skills.
|
|
||||||
let mut skill_updates: Vec<(Index, Gaussian)> = Vec::new();
|
let mut skill_updates: Vec<(Index, Gaussian)> = Vec::new();
|
||||||
for (t, team) in self.teams.iter().enumerate() {
|
for (t, team) in self.teams.iter().enumerate() {
|
||||||
for (i, item) in team.items.iter().enumerate() {
|
for (i, item) in team.items.iter().enumerate() {
|
||||||
@@ -163,6 +166,7 @@ impl Event {
|
|||||||
/// Apply an `EventOutput` back onto this event's mutable item likelihoods
|
/// Apply an `EventOutput` back onto this event's mutable item likelihoods
|
||||||
/// and evidence. The `SkillStore` updates are applied separately by the
|
/// and evidence. The `SkillStore` updates are applied separately by the
|
||||||
/// caller to avoid conflicting borrows.
|
/// caller to avoid conflicting borrows.
|
||||||
|
#[cfg(feature = "rayon")]
|
||||||
fn apply_output(&mut self, output: &EventOutput) {
|
fn apply_output(&mut self, output: &EventOutput) {
|
||||||
self.evidence = output.evidence;
|
self.evidence = output.evidence;
|
||||||
for (t, team) in self.teams.iter_mut().enumerate() {
|
for (t, team) in self.teams.iter_mut().enumerate() {
|
||||||
@@ -171,6 +175,33 @@ impl Event {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Direct in-loop update: mutates self and `skills` inline with no
|
||||||
|
/// intermediate allocation. Used by the sequential (no rayon) sweep path
|
||||||
|
/// to match T2 performance.
|
||||||
|
#[cfg(not(feature = "rayon"))]
|
||||||
|
fn iteration_direct<T: Time, D: Drift<T>>(
|
||||||
|
&mut self,
|
||||||
|
skills: &mut SkillStore,
|
||||||
|
agents: &CompetitorStore<T, D>,
|
||||||
|
p_draw: f64,
|
||||||
|
arena: &mut ScratchArena,
|
||||||
|
) {
|
||||||
|
let teams = self.within_priors(false, false, skills, agents);
|
||||||
|
let result = self.outputs();
|
||||||
|
let g = Game::ranked_with_arena(teams, &result, &self.weights, p_draw, arena);
|
||||||
|
|
||||||
|
for (t, team) in self.teams.iter_mut().enumerate() {
|
||||||
|
for (i, item) in team.items.iter_mut().enumerate() {
|
||||||
|
let old_likelihood = skills.get(item.agent).unwrap().likelihood;
|
||||||
|
let new_likelihood = (old_likelihood / item.likelihood) * g.likelihoods[t][i];
|
||||||
|
skills.get_mut(item.agent).unwrap().likelihood = new_likelihood;
|
||||||
|
item.likelihood = g.likelihoods[t][i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.evidence = g.evidence;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -355,40 +386,24 @@ impl<T: Time> TimeSlice<T> {
|
|||||||
|
|
||||||
/// Full event sweep using the color-group partition. Colors are processed
|
/// Full event sweep using the color-group partition. Colors are processed
|
||||||
/// sequentially; within each color the inner loop is parallel under rayon.
|
/// sequentially; within each color the inner loop is parallel under rayon.
|
||||||
|
#[cfg(feature = "rayon")]
|
||||||
fn sweep_color_groups<D: Drift<T>>(&mut self, agents: &CompetitorStore<T, D>) {
|
fn sweep_color_groups<D: Drift<T>>(&mut self, agents: &CompetitorStore<T, D>) {
|
||||||
// We need &self.skills (immutable) and &mut self.events (mutable) at the
|
use rayon::prelude::*;
|
||||||
// same time. Rust allows this because they are distinct struct fields.
|
|
||||||
// The parallel closure captures &self.skills and &self.p_draw by shared
|
|
||||||
// ref; it returns owned EventOutput values that we apply sequentially.
|
|
||||||
for color_idx in 0..self.color_groups.groups.len() {
|
for color_idx in 0..self.color_groups.groups.len() {
|
||||||
if self.color_groups.groups[color_idx].is_empty() {
|
if self.color_groups.groups[color_idx].is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let range = self.color_groups.color_range(color_idx);
|
let range = self.color_groups.color_range(color_idx);
|
||||||
|
|
||||||
// Compute phase — parallel under rayon, sequential otherwise.
|
|
||||||
// Borrows: &self.skills and &agents are shared refs captured by the closure;
|
|
||||||
// &mut self.events[range] is the mutable slice for par_iter_mut.
|
|
||||||
let p_draw = self.p_draw;
|
let p_draw = self.p_draw;
|
||||||
let skills: &SkillStore = &self.skills;
|
let skills: &SkillStore = &self.skills;
|
||||||
|
|
||||||
#[cfg(feature = "rayon")]
|
|
||||||
let outputs: Vec<EventOutput> = {
|
|
||||||
use rayon::prelude::*;
|
|
||||||
self.events[range.clone()]
|
|
||||||
.par_iter()
|
|
||||||
.map(|ev| ev.compute(skills, agents, p_draw))
|
|
||||||
.collect()
|
|
||||||
};
|
|
||||||
|
|
||||||
#[cfg(not(feature = "rayon"))]
|
|
||||||
let outputs: Vec<EventOutput> = self.events[range.clone()]
|
let outputs: Vec<EventOutput> = self.events[range.clone()]
|
||||||
.iter()
|
.par_iter()
|
||||||
.map(|ev| ev.compute(skills, agents, p_draw))
|
.map(|ev| ev.compute(skills, agents, p_draw))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Apply phase — sequential: write skill likelihoods back to self.skills,
|
|
||||||
// then update per-event item likelihoods and evidence.
|
|
||||||
for (ev, output) in self.events[range].iter_mut().zip(outputs.iter()) {
|
for (ev, output) in self.events[range].iter_mut().zip(outputs.iter()) {
|
||||||
for &(agent, new_skill_lhood) in &output.skill_updates {
|
for &(agent, new_skill_lhood) in &output.skill_updates {
|
||||||
self.skills.get_mut(agent).unwrap().likelihood = new_skill_lhood;
|
self.skills.get_mut(agent).unwrap().likelihood = new_skill_lhood;
|
||||||
@@ -398,6 +413,27 @@ impl<T: Time> TimeSlice<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Full event sweep using the color-group partition, sequential direct-write path.
|
||||||
|
/// Events within each color group are updated inline — no EventOutput allocation —
|
||||||
|
/// matching the T2 performance profile.
|
||||||
|
#[cfg(not(feature = "rayon"))]
|
||||||
|
fn sweep_color_groups<D: Drift<T>>(&mut self, agents: &CompetitorStore<T, D>) {
|
||||||
|
for color_idx in 0..self.color_groups.groups.len() {
|
||||||
|
if self.color_groups.groups[color_idx].is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let range = self.color_groups.color_range(color_idx);
|
||||||
|
|
||||||
|
// Borrow self.events as a mutable slice for this color range.
|
||||||
|
// self.skills and self.arena are separate fields — disjoint borrows are
|
||||||
|
// allowed within a single method body.
|
||||||
|
let p_draw = self.p_draw;
|
||||||
|
for ev in &mut self.events[range] {
|
||||||
|
ev.iteration_direct(&mut self.skills, agents, p_draw, &mut self.arena);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub(crate) fn convergence<D: Drift<T>>(&mut self, agents: &CompetitorStore<T, D>) -> usize {
|
pub(crate) fn convergence<D: Drift<T>>(&mut self, agents: &CompetitorStore<T, D>) -> usize {
|
||||||
let epsilon = 1e-6;
|
let epsilon = 1e-6;
|
||||||
|
|||||||
Reference in New Issue
Block a user