//! A single time step's worth of events. //! //! Renamed from `Batch` in T2. use std::collections::HashMap; use crate::{ Index, N_INF, arena::ScratchArena, color_group::ColorGroups, drift::Drift, game::Game, gaussian::Gaussian, rating::Rating, storage::{CompetitorStore, SkillStore}, time::Time, tuple_gt, tuple_max, }; #[derive(Debug)] pub(crate) struct Skill { pub(crate) forward: Gaussian, backward: Gaussian, likelihood: Gaussian, pub(crate) elapsed: i64, pub(crate) online: Gaussian, } impl Skill { pub(crate) fn posterior(&self) -> Gaussian { self.likelihood * self.backward * self.forward } } impl Default for Skill { fn default() -> Self { Self { forward: N_INF, backward: N_INF, likelihood: N_INF, elapsed: 0, online: N_INF, } } } #[derive(Debug)] struct Item { agent: Index, likelihood: Gaussian, } impl Item { fn within_prior>( &self, online: bool, forward: bool, skills: &SkillStore, agents: &CompetitorStore, ) -> Rating { let r = &agents[self.agent].rating; let skill = skills.get(self.agent).unwrap(); if online { Rating::new(skill.online, r.beta, r.drift) } else if forward { Rating::new(skill.forward, r.beta, r.drift) } else { Rating::new(skill.posterior() / self.likelihood, r.beta, r.drift) } } } #[derive(Debug)] struct Team { items: Vec, output: f64, } #[derive(Debug)] pub(crate) struct Event { teams: Vec, evidence: f64, weights: Vec>, } impl Event { pub(crate) fn iter_agents(&self) -> impl Iterator + '_ { self.teams .iter() .flat_map(|t| t.items.iter().map(|it| it.agent)) } fn outputs(&self) -> Vec { self.teams .iter() .map(|team| team.output) .collect::>() } pub(crate) fn within_priors>( &self, online: bool, forward: bool, skills: &SkillStore, agents: &CompetitorStore, ) -> Vec>> { self.teams .iter() .map(|team| { team.items .iter() .map(|item| item.within_prior(online, forward, skills, agents)) .collect::>() }) .collect::>() } /// Direct in-loop update: mutates self and `skills` inline with no /// intermediate allocation. Used by both the sequential sweep path and, /// via unsafe, by the parallel rayon path for events in the same color /// group (which have disjoint agent sets — see `sweep_color_groups`). fn iteration_direct>( &mut self, skills: &mut SkillStore, agents: &CompetitorStore, p_draw: f64, arena: &mut ScratchArena, ) { let teams = self.within_priors(false, false, skills, agents); let result = self.outputs(); let g = Game::ranked_with_arena(teams, &result, &self.weights, p_draw, arena); for (t, team) in self.teams.iter_mut().enumerate() { for (i, item) in team.items.iter_mut().enumerate() { let old_likelihood = skills.get(item.agent).unwrap().likelihood; let new_likelihood = (old_likelihood / item.likelihood) * g.likelihoods[t][i]; skills.get_mut(item.agent).unwrap().likelihood = new_likelihood; item.likelihood = g.likelihoods[t][i]; } } self.evidence = g.evidence; } } #[derive(Debug)] pub struct TimeSlice { pub(crate) events: Vec, pub(crate) skills: SkillStore, pub(crate) time: T, p_draw: f64, arena: ScratchArena, pub(crate) color_groups: ColorGroups, } impl TimeSlice { pub fn new(time: T, p_draw: f64) -> Self { Self { events: Vec::new(), skills: SkillStore::new(), time, p_draw, arena: ScratchArena::new(), color_groups: ColorGroups::new(), } } /// Recompute the color-group partition and reorder `self.events` into /// color-contiguous ranges. After this call, `self.color_groups.groups[c]` /// contains a contiguous ascending range of indices in `self.events`. pub(crate) fn recompute_color_groups(&mut self) { use crate::color_group::color_greedy; let n = self.events.len(); if n == 0 { self.color_groups = ColorGroups::new(); return; } let cg = color_greedy(n, |ev_idx| { self.events[ev_idx].iter_agents().collect::>() }); let mut reordered: Vec = Vec::with_capacity(n); let mut new_groups: Vec> = Vec::with_capacity(cg.groups.len()); let mut taken: Vec> = self.events.drain(..).map(Some).collect(); for group in &cg.groups { let mut new_indices: Vec = Vec::with_capacity(group.len()); for &old_idx in group { let ev = taken[old_idx].take().expect("event already taken"); new_indices.push(reordered.len()); reordered.push(ev); } new_groups.push(new_indices); } self.events = reordered; self.color_groups = ColorGroups { groups: new_groups }; } pub fn add_events>( &mut self, composition: Vec>>, results: Vec>, weights: Vec>>, agents: &CompetitorStore, ) { let mut unique = Vec::with_capacity(10); let this_agent = composition.iter().flatten().flatten().filter(|idx| { if !unique.contains(idx) { unique.push(*idx); return true; } false }); for idx in this_agent { let elapsed = compute_elapsed(agents[*idx].last_time.as_ref(), &self.time); if let Some(skill) = self.skills.get_mut(*idx) { skill.elapsed = elapsed; skill.forward = agents[*idx].receive(&self.time); } else { self.skills.insert( *idx, Skill { forward: agents[*idx].receive(&self.time), elapsed, ..Default::default() }, ); } } let events = composition.iter().enumerate().map(|(e, event)| { let teams = event .iter() .enumerate() .map(|(t, team)| { let items = team .iter() .map(|&agent| Item { agent, likelihood: N_INF, }) .collect::>(); Team { items, output: if results.is_empty() { (event.len() - (t + 1)) as f64 } else { results[e][t] }, } }) .collect::>(); let weights = if weights.is_empty() { teams .iter() .map(|team| vec![1.0; team.items.len()]) .collect::>() } else { weights[e].clone() }; Event { teams, evidence: 0.0, weights, } }); let from = self.events.len(); self.events.extend(events); self.iteration(from, agents); self.recompute_color_groups(); } pub(crate) fn posteriors(&self) -> HashMap { self.skills .iter() .map(|(idx, skill)| (idx, skill.posterior())) .collect::>() } pub fn iteration>(&mut self, from: usize, agents: &CompetitorStore) { if from > 0 || self.color_groups.is_empty() { // Initial pass (add_events) or no color groups yet: simple sequential sweep. for event in self.events.iter_mut().skip(from) { let teams = event.within_priors(false, false, &self.skills, agents); let result = event.outputs(); let g = Game::ranked_with_arena( teams, &result, &event.weights, self.p_draw, &mut self.arena, ); for (t, team) in event.teams.iter_mut().enumerate() { for (i, item) in team.items.iter_mut().enumerate() { let old_likelihood = self.skills.get(item.agent).unwrap().likelihood; let new_likelihood = (old_likelihood / item.likelihood) * g.likelihoods[t][i]; self.skills.get_mut(item.agent).unwrap().likelihood = new_likelihood; item.likelihood = g.likelihoods[t][i]; } } event.evidence = g.evidence; } } else { self.sweep_color_groups(agents); } } /// Full event sweep using the color-group partition. Colors are processed /// sequentially; within each color the inner loop is parallel under rayon. /// /// Events within each color group touch disjoint agent sets (guaranteed by /// the greedy coloring). This lets each rayon thread write directly to its /// events' skill likelihoods without a deferred-apply step, matching the /// sequential path's allocation profile. The unsafe block is sound because: /// 1. `self.events[range]` and `self.skills` are separate fields → disjoint. /// 2. Events in the same color group access disjoint `Index` values in /// `self.skills`, so concurrent writes land on different memory locations. /// 3. Each event only writes to its own items' likelihoods (no sharing). #[cfg(feature = "rayon")] fn sweep_color_groups>(&mut self, agents: &CompetitorStore) { use rayon::prelude::*; thread_local! { static ARENA: std::cell::RefCell = std::cell::RefCell::new(ScratchArena::new()); } // Minimum color-group size to justify rayon's task-spawn overhead. // Below this threshold, process events sequentially to avoid regression // on small per-slice workloads. const RAYON_THRESHOLD: usize = 64; for color_idx in 0..self.color_groups.groups.len() { let group_len = self.color_groups.groups[color_idx].len(); if group_len == 0 { continue; } let range = self.color_groups.color_range(color_idx); let p_draw = self.p_draw; if group_len >= RAYON_THRESHOLD { // Obtain a raw pointer from the unique `&mut self.skills` reference. // Casting back to `&mut` inside the closure is sound because: // 1. The pointer originates from a `&mut` — no aliasing with shared refs. // 2. Events in the same color group touch disjoint `Index` slots in the // underlying Vec, so concurrent writes from different threads land on // different memory locations — no data race. // 3. `self.events[range]` and `self.skills` are separate struct fields, // so the borrow splits cleanly. let skills_addr: usize = (&mut self.skills as *mut SkillStore) as usize; self.events[range].par_iter_mut().for_each(move |ev| { // SAFETY: see above. let skills: &mut SkillStore = unsafe { &mut *(skills_addr as *mut SkillStore) }; ARENA.with(|cell| { let mut arena = cell.borrow_mut(); arena.reset(); ev.iteration_direct(skills, agents, p_draw, &mut arena); }); }); } else { for ev in &mut self.events[range] { ev.iteration_direct(&mut self.skills, agents, p_draw, &mut self.arena); } } } } /// Full event sweep using the color-group partition, sequential direct-write path. /// Events within each color group are updated inline — no EventOutput allocation — /// matching the T2 performance profile. #[cfg(not(feature = "rayon"))] fn sweep_color_groups>(&mut self, agents: &CompetitorStore) { for color_idx in 0..self.color_groups.groups.len() { if self.color_groups.groups[color_idx].is_empty() { continue; } let range = self.color_groups.color_range(color_idx); // Borrow self.events as a mutable slice for this color range. // self.skills and self.arena are separate fields — disjoint borrows are // allowed within a single method body. let p_draw = self.p_draw; for ev in &mut self.events[range] { ev.iteration_direct(&mut self.skills, agents, p_draw, &mut self.arena); } } } #[allow(dead_code)] pub(crate) fn convergence>(&mut self, agents: &CompetitorStore) -> usize { let epsilon = 1e-6; let iterations = 20; let mut step = (f64::INFINITY, f64::INFINITY); let mut i = 0; while tuple_gt(step, epsilon) && i < iterations { let old = self.posteriors(); self.iteration(0, agents); let new = self.posteriors(); step = old.iter().fold((0.0, 0.0), |step, (a, old)| { tuple_max(step, old.delta(new[a])) }); i += 1; } i } pub(crate) fn forward_prior_out(&self, agent: &Index) -> Gaussian { let skill = self.skills.get(*agent).unwrap(); skill.forward * skill.likelihood } pub(crate) fn backward_prior_out>( &self, agent: &Index, agents: &CompetitorStore, ) -> Gaussian { let skill = self.skills.get(*agent).unwrap(); let n = skill.likelihood * skill.backward; n.forget( agents[*agent] .rating .drift .variance_for_elapsed(skill.elapsed), ) } pub(crate) fn new_backward_info>(&mut self, agents: &CompetitorStore) { for (agent, skill) in self.skills.iter_mut() { skill.backward = agents[agent].message; } self.iteration(0, agents); } pub(crate) fn new_forward_info>(&mut self, agents: &CompetitorStore) { for (agent, skill) in self.skills.iter_mut() { skill.forward = agents[agent].receive_for_elapsed(skill.elapsed); } self.iteration(0, agents); } pub(crate) fn log_evidence>( &self, online: bool, targets: &[Index], forward: bool, agents: &CompetitorStore, ) -> f64 { // log_evidence is infrequent; a local arena avoids needing &mut self. let mut arena = ScratchArena::new(); if targets.is_empty() { if online || forward { self.events .iter() .map(|event| { Game::ranked_with_arena( event.within_priors(online, forward, &self.skills, agents), &event.outputs(), &event.weights, self.p_draw, &mut arena, ) .evidence .ln() }) .sum() } else { self.events.iter().map(|event| event.evidence.ln()).sum() } } else if online || forward { self.events .iter() .enumerate() .filter(|(_, event)| { event .teams .iter() .flat_map(|team| &team.items) .any(|item| targets.contains(&item.agent)) }) .map(|(_, event)| { Game::ranked_with_arena( event.within_priors(online, forward, &self.skills, agents), &event.outputs(), &event.weights, self.p_draw, &mut arena, ) .evidence .ln() }) .sum() } else { self.events .iter() .filter(|event| { event .teams .iter() .flat_map(|team| &team.items) .any(|item| targets.contains(&item.agent)) }) .map(|event| event.evidence.ln()) .sum() } } pub fn get_composition(&self) -> Vec>> { self.events .iter() .map(|event| { event .teams .iter() .map(|team| team.items.iter().map(|item| item.agent).collect::>()) .collect::>() }) .collect::>() } pub fn get_results(&self) -> Vec> { self.events .iter() .map(|event| { event .teams .iter() .map(|team| team.output) .collect::>() }) .collect::>() } } pub(crate) fn compute_elapsed(last: Option<&T>, current: &T) -> i64 { last.map(|l| l.elapsed_to(current).max(0)).unwrap_or(0) } #[cfg(test)] mod tests { use approx::assert_ulps_eq; use super::*; use crate::{ KeyTable, competitor::Competitor, drift::ConstantDrift, rating::Rating, storage::CompetitorStore, }; #[test] fn test_one_event_each() { let mut index_map = KeyTable::new(); let a = index_map.get_or_create("a"); let b = index_map.get_or_create("b"); let c = index_map.get_or_create("c"); let d = index_map.get_or_create("d"); let e = index_map.get_or_create("e"); let f = index_map.get_or_create("f"); let mut agents: CompetitorStore = CompetitorStore::new(); for agent in [a, b, c, d, e, f] { agents.insert( agent, Competitor { rating: Rating::new( Gaussian::from_ms(25.0, 25.0 / 3.0), 25.0 / 6.0, ConstantDrift(25.0 / 300.0), ), ..Default::default() }, ); } let mut time_slice = TimeSlice::new(0i64, 0.0); time_slice.add_events( vec![ vec![vec![a], vec![b]], vec![vec![c], vec![d]], vec![vec![e], vec![f]], ], vec![vec![1.0, 0.0], vec![0.0, 1.0], vec![1.0, 0.0]], vec![], &agents, ); let post = time_slice.posteriors(); assert_ulps_eq!( post[&a], Gaussian::from_ms(29.205220, 7.194481), epsilon = 1e-6 ); assert_ulps_eq!( post[&b], Gaussian::from_ms(20.794779, 7.194481), epsilon = 1e-6 ); assert_ulps_eq!( post[&c], Gaussian::from_ms(20.794779, 7.194481), epsilon = 1e-6 ); assert_ulps_eq!( post[&d], Gaussian::from_ms(29.205220, 7.194481), epsilon = 1e-6 ); assert_ulps_eq!( post[&e], Gaussian::from_ms(29.205220, 7.194481), epsilon = 1e-6 ); assert_ulps_eq!( post[&f], Gaussian::from_ms(20.794779, 7.194481), epsilon = 1e-6 ); assert_eq!(time_slice.convergence(&agents), 1); } #[test] fn test_same_strength() { let mut index_map = KeyTable::new(); let a = index_map.get_or_create("a"); let b = index_map.get_or_create("b"); let c = index_map.get_or_create("c"); let d = index_map.get_or_create("d"); let e = index_map.get_or_create("e"); let f = index_map.get_or_create("f"); let mut agents: CompetitorStore = CompetitorStore::new(); for agent in [a, b, c, d, e, f] { agents.insert( agent, Competitor { rating: Rating::new( Gaussian::from_ms(25.0, 25.0 / 3.0), 25.0 / 6.0, ConstantDrift(25.0 / 300.0), ), ..Default::default() }, ); } let mut time_slice = TimeSlice::new(0i64, 0.0); time_slice.add_events( vec![ vec![vec![a], vec![b]], vec![vec![a], vec![c]], vec![vec![b], vec![c]], ], vec![vec![1.0, 0.0], vec![0.0, 1.0], vec![1.0, 0.0]], vec![], &agents, ); let post = time_slice.posteriors(); assert_ulps_eq!( post[&a], Gaussian::from_ms(24.960978, 6.298544), epsilon = 1e-6 ); assert_ulps_eq!( post[&b], Gaussian::from_ms(27.095590, 6.010330), epsilon = 1e-6 ); assert_ulps_eq!( post[&c], Gaussian::from_ms(24.889681, 5.866311), epsilon = 1e-6 ); assert!(time_slice.convergence(&agents) > 1); let post = time_slice.posteriors(); assert_ulps_eq!( post[&a], Gaussian::from_ms(25.000000, 5.419212), epsilon = 1e-6 ); assert_ulps_eq!( post[&b], Gaussian::from_ms(25.000000, 5.419212), epsilon = 1e-6 ); assert_ulps_eq!( post[&c], Gaussian::from_ms(25.000000, 5.419212), epsilon = 1e-6 ); } #[test] fn test_add_events() { let mut index_map = KeyTable::new(); let a = index_map.get_or_create("a"); let b = index_map.get_or_create("b"); let c = index_map.get_or_create("c"); let d = index_map.get_or_create("d"); let e = index_map.get_or_create("e"); let f = index_map.get_or_create("f"); let mut agents: CompetitorStore = CompetitorStore::new(); for agent in [a, b, c, d, e, f] { agents.insert( agent, Competitor { rating: Rating::new( Gaussian::from_ms(25.0, 25.0 / 3.0), 25.0 / 6.0, ConstantDrift(25.0 / 300.0), ), ..Default::default() }, ); } let mut time_slice = TimeSlice::new(0i64, 0.0); time_slice.add_events( vec![ vec![vec![a], vec![b]], vec![vec![a], vec![c]], vec![vec![b], vec![c]], ], vec![vec![1.0, 0.0], vec![0.0, 1.0], vec![1.0, 0.0]], vec![], &agents, ); time_slice.convergence(&agents); let post = time_slice.posteriors(); assert_ulps_eq!( post[&a], Gaussian::from_ms(25.000000, 5.419212), epsilon = 1e-6 ); assert_ulps_eq!( post[&b], Gaussian::from_ms(25.000000, 5.419212), epsilon = 1e-6 ); assert_ulps_eq!( post[&c], Gaussian::from_ms(25.000000, 5.419212), epsilon = 1e-6 ); time_slice.add_events( vec![ vec![vec![a], vec![b]], vec![vec![a], vec![c]], vec![vec![b], vec![c]], ], vec![vec![1.0, 0.0], vec![0.0, 1.0], vec![1.0, 0.0]], vec![], &agents, ); assert_eq!(time_slice.events.len(), 6); time_slice.convergence(&agents); let post = time_slice.posteriors(); assert_ulps_eq!( post[&a], Gaussian::from_ms(25.000003, 3.880150), epsilon = 1e-6 ); assert_ulps_eq!( post[&b], Gaussian::from_ms(25.000003, 3.880150), epsilon = 1e-6 ); assert_ulps_eq!( post[&c], Gaussian::from_ms(25.000003, 3.880150), epsilon = 1e-6 ); } #[test] fn time_slice_color_groups_reorders_events() { // ev0: [a, b]; ev1: [c, d]; ev2: [a, c] // Greedy coloring: ev0→c0, ev1→c0 (disjoint), ev2→c1 (overlaps both). // After recompute_color_groups, physical order is [ev0, ev1, ev2] // and groups == [[0, 1], [2]]. let mut index_map = KeyTable::new(); let a = index_map.get_or_create("a"); let b = index_map.get_or_create("b"); let c = index_map.get_or_create("c"); let d = index_map.get_or_create("d"); let mut agents: CompetitorStore = CompetitorStore::new(); for agent in [a, b, c, d] { agents.insert( agent, Competitor { rating: Rating::new( Gaussian::from_ms(25.0, 25.0 / 3.0), 25.0 / 6.0, ConstantDrift(25.0 / 300.0), ), ..Default::default() }, ); } let mut ts = TimeSlice::new(0i64, 0.0); ts.add_events( vec![ vec![vec![a], vec![b]], vec![vec![c], vec![d]], vec![vec![a], vec![c]], ], vec![vec![1.0, 0.0], vec![1.0, 0.0], vec![1.0, 0.0]], vec![], &agents, ); assert_eq!(ts.color_groups.n_colors(), 2); assert_eq!(ts.color_groups.groups[0], vec![0, 1]); assert_eq!(ts.color_groups.groups[1], vec![2]); assert_eq!(ts.color_groups.color_range(0), 0..2); assert_eq!(ts.color_groups.color_range(1), 2..3); // Events at positions 0 and 1 (color 0) must be disjoint — verify by // checking that the agent sets of self.events[0] and self.events[1] do // not include the agent at self.events[2]. let agents_in_ev2: Vec = ts.events[2].iter_agents().collect(); let agents_in_ev0: Vec = ts.events[0].iter_agents().collect(); let agents_in_ev1: Vec = ts.events[1].iter_agents().collect(); // ev0 and ev1 must be disjoint from each other (color-0 invariant). assert!(agents_in_ev0.iter().all(|ag| !agents_in_ev1.contains(ag))); // ev2 must share an agent with ev0 or ev1 (it needed its own color). let ev2_overlaps_ev0 = agents_in_ev2.iter().any(|ag| agents_in_ev0.contains(ag)); let ev2_overlaps_ev1 = agents_in_ev2.iter().any(|ag| agents_in_ev1.contains(ag)); assert!(ev2_overlaps_ev0 || ev2_overlaps_ev1); } }