From e121fe28bb82a155e51918c07cf7af8f84461b6a Mon Sep 17 00:00:00 2001 From: Anders Olsson Date: Mon, 25 May 2026 11:37:19 +0200 Subject: [PATCH] feat(supervisor): LogSink fans out to file, ring buffer, broadcast --- crates/xy-supervisor/src/lib.rs | 2 +- crates/xy-supervisor/src/logs.rs | 75 ++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/crates/xy-supervisor/src/lib.rs b/crates/xy-supervisor/src/lib.rs index e8f7fb1..9a6df14 100644 --- a/crates/xy-supervisor/src/lib.rs +++ b/crates/xy-supervisor/src/lib.rs @@ -8,6 +8,6 @@ pub mod retry_window; pub use backoff::Backoff; pub use child::{ChildHandle, MockChild, MockChildController}; -pub use logs::{RecordedLine, RingBuffer, RotatingLogWriter}; +pub use logs::{LogSink, RecordedLine, RingBuffer, RotatingLogWriter}; pub use policy::{RestartDecision, decide}; pub use retry_window::RetryWindow; diff --git a/crates/xy-supervisor/src/logs.rs b/crates/xy-supervisor/src/logs.rs index a2c80f7..a343585 100644 --- a/crates/xy-supervisor/src/logs.rs +++ b/crates/xy-supervisor/src/logs.rs @@ -4,6 +4,9 @@ use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; +use xy_protocol::rpc::{LogLine, LogStream}; + pub struct RotatingLogWriter { base: PathBuf, max_bytes: u64, @@ -121,6 +124,62 @@ impl RingBuffer { } } +const LOG_BROADCAST_CAP: usize = 256; + +#[derive(Clone)] +pub struct LogSink { + pub server_name: String, + writer: Arc>, + pub ring: RingBuffer, + pub broadcast: broadcast::Sender, +} + +impl LogSink { + pub fn new( + server_name: String, + writer: RotatingLogWriter, + ring_capacity_bytes: usize, + ) -> Self { + let (tx, _) = broadcast::channel(LOG_BROADCAST_CAP); + Self { + server_name, + writer: Arc::new(Mutex::new(writer)), + ring: RingBuffer::new(ring_capacity_bytes), + broadcast: tx, + } + } + + pub fn record(&self, stream: LogStream, line: String) { + let ts = now_unix_ms(); + let tag = match stream { + LogStream::Stdout => "[out]", + LogStream::Stderr => "[err]", + }; + if let Err(e) = self.writer.lock().unwrap().write_line(tag, &line) { + tracing::warn!(server = %self.server_name, error = %e, "log file write failed"); + } + self.ring.push(RecordedLine { + stream, + line: line.clone(), + ts_unix_ms: ts, + }); + let _ = self.broadcast.send(LogLine { + subscription_id: 0, + name: self.server_name.clone(), + stream, + line, + ts_unix_ms: ts, + }); + } +} + +fn now_unix_ms() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} + #[cfg(test)] mod tests { use super::*; @@ -192,4 +251,20 @@ mod tests { assert_eq!(snap[0].line, "line3"); assert_eq!(snap[1].line, "line4"); } + + #[tokio::test] + async fn log_sink_records_and_broadcasts() { + let dir = tempdir().unwrap(); + let writer = RotatingLogWriter::open(&dir.path().join("s.log"), 1024, 3).unwrap(); + let sink = LogSink::new("s".to_string(), writer, 1024); + let mut rx = sink.broadcast.subscribe(); + sink.record(LogStream::Stdout, "hello".to_string()); + let got = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(got.line, "hello"); + assert_eq!(got.stream, LogStream::Stdout); + assert_eq!(sink.ring.snapshot_tail(None).len(), 1); + } }