diff --git a/crates/xy-supervisor/src/lib.rs b/crates/xy-supervisor/src/lib.rs index 1aa7124..e8f7fb1 100644 --- a/crates/xy-supervisor/src/lib.rs +++ b/crates/xy-supervisor/src/lib.rs @@ -8,6 +8,6 @@ pub mod retry_window; pub use backoff::Backoff; pub use child::{ChildHandle, MockChild, MockChildController}; -pub use logs::RotatingLogWriter; +pub use logs::{RecordedLine, RingBuffer, RotatingLogWriter}; pub use policy::{RestartDecision, decide}; pub use retry_window::RetryWindow; diff --git a/crates/xy-supervisor/src/logs.rs b/crates/xy-supervisor/src/logs.rs index 0c66204..a2c80f7 100644 --- a/crates/xy-supervisor/src/logs.rs +++ b/crates/xy-supervisor/src/logs.rs @@ -1,6 +1,8 @@ +use std::collections::VecDeque; use std::fs::{File, OpenOptions}; use std::io::Write; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; pub struct RotatingLogWriter { base: PathBuf, @@ -64,6 +66,61 @@ impl RotatingLogWriter { } } +#[derive(Clone)] +pub struct RingBuffer { + inner: Arc>, + capacity_bytes: usize, +} + +struct RingBufferInner { + lines: VecDeque, + bytes: usize, +} + +#[derive(Debug, Clone)] +pub struct RecordedLine { + pub stream: xy_protocol::rpc::LogStream, + pub line: String, + pub ts_unix_ms: u64, +} + +impl RingBuffer { + pub fn new(capacity_bytes: usize) -> Self { + Self { + inner: Arc::new(Mutex::new(RingBufferInner { + lines: VecDeque::new(), + bytes: 0, + })), + capacity_bytes, + } + } + + pub fn push(&self, line: RecordedLine) { + let mut g = self.inner.lock().unwrap(); + g.bytes += line.line.len(); + g.lines.push_back(line); + while g.bytes > self.capacity_bytes { + if let Some(removed) = g.lines.pop_front() { + g.bytes -= removed.line.len(); + } else { + break; + } + } + } + + pub fn snapshot_tail(&self, n: Option) -> Vec { + let g = self.inner.lock().unwrap(); + match n { + None => g.lines.iter().cloned().collect(), + Some(n) => { + let take = (n as usize).min(g.lines.len()); + let start = g.lines.len() - take; + g.lines.iter().skip(start).cloned().collect() + } + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -101,4 +158,38 @@ mod tests { rotated.display() ); } + + use xy_protocol::rpc::LogStream; + + fn recorded(s: &str) -> RecordedLine { + RecordedLine { + stream: LogStream::Stdout, + line: s.to_string(), + ts_unix_ms: 0, + } + } + + #[test] + fn ring_buffer_drops_oldest_when_full() { + let rb = RingBuffer::new(10); + rb.push(recorded("aaaaa")); + rb.push(recorded("bbbbb")); + rb.push(recorded("ccc")); + let snap = rb.snapshot_tail(None); + assert_eq!(snap.len(), 2); + assert_eq!(snap[0].line, "bbbbb"); + assert_eq!(snap[1].line, "ccc"); + } + + #[test] + fn ring_buffer_tail_n() { + let rb = RingBuffer::new(1024); + for i in 0..5 { + rb.push(recorded(&format!("line{i}"))); + } + let snap = rb.snapshot_tail(Some(2)); + assert_eq!(snap.len(), 2); + assert_eq!(snap[0].line, "line3"); + assert_eq!(snap[1].line, "line4"); + } }