feat(supervisor): LogSink fans out to file, ring buffer, broadcast
This commit is contained in:
@@ -8,6 +8,6 @@ pub mod retry_window;
|
|||||||
|
|
||||||
pub use backoff::Backoff;
|
pub use backoff::Backoff;
|
||||||
pub use child::{ChildHandle, MockChild, MockChildController};
|
pub use child::{ChildHandle, MockChild, MockChildController};
|
||||||
pub use logs::{RecordedLine, RingBuffer, RotatingLogWriter};
|
pub use logs::{LogSink, RecordedLine, RingBuffer, RotatingLogWriter};
|
||||||
pub use policy::{RestartDecision, decide};
|
pub use policy::{RestartDecision, decide};
|
||||||
pub use retry_window::RetryWindow;
|
pub use retry_window::RetryWindow;
|
||||||
|
|||||||
@@ -4,6 +4,9 @@ use std::io::Write;
|
|||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
use xy_protocol::rpc::{LogLine, LogStream};
|
||||||
|
|
||||||
pub struct RotatingLogWriter {
|
pub struct RotatingLogWriter {
|
||||||
base: PathBuf,
|
base: PathBuf,
|
||||||
max_bytes: u64,
|
max_bytes: u64,
|
||||||
@@ -121,6 +124,62 @@ impl RingBuffer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const LOG_BROADCAST_CAP: usize = 256;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct LogSink {
|
||||||
|
pub server_name: String,
|
||||||
|
writer: Arc<Mutex<RotatingLogWriter>>,
|
||||||
|
pub ring: RingBuffer,
|
||||||
|
pub broadcast: broadcast::Sender<LogLine>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LogSink {
|
||||||
|
pub fn new(
|
||||||
|
server_name: String,
|
||||||
|
writer: RotatingLogWriter,
|
||||||
|
ring_capacity_bytes: usize,
|
||||||
|
) -> Self {
|
||||||
|
let (tx, _) = broadcast::channel(LOG_BROADCAST_CAP);
|
||||||
|
Self {
|
||||||
|
server_name,
|
||||||
|
writer: Arc::new(Mutex::new(writer)),
|
||||||
|
ring: RingBuffer::new(ring_capacity_bytes),
|
||||||
|
broadcast: tx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn record(&self, stream: LogStream, line: String) {
|
||||||
|
let ts = now_unix_ms();
|
||||||
|
let tag = match stream {
|
||||||
|
LogStream::Stdout => "[out]",
|
||||||
|
LogStream::Stderr => "[err]",
|
||||||
|
};
|
||||||
|
if let Err(e) = self.writer.lock().unwrap().write_line(tag, &line) {
|
||||||
|
tracing::warn!(server = %self.server_name, error = %e, "log file write failed");
|
||||||
|
}
|
||||||
|
self.ring.push(RecordedLine {
|
||||||
|
stream,
|
||||||
|
line: line.clone(),
|
||||||
|
ts_unix_ms: ts,
|
||||||
|
});
|
||||||
|
let _ = self.broadcast.send(LogLine {
|
||||||
|
subscription_id: 0,
|
||||||
|
name: self.server_name.clone(),
|
||||||
|
stream,
|
||||||
|
line,
|
||||||
|
ts_unix_ms: ts,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn now_unix_ms() -> u64 {
|
||||||
|
std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.map(|d| d.as_millis() as u64)
|
||||||
|
.unwrap_or(0)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -192,4 +251,20 @@ mod tests {
|
|||||||
assert_eq!(snap[0].line, "line3");
|
assert_eq!(snap[0].line, "line3");
|
||||||
assert_eq!(snap[1].line, "line4");
|
assert_eq!(snap[1].line, "line4");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn log_sink_records_and_broadcasts() {
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let writer = RotatingLogWriter::open(&dir.path().join("s.log"), 1024, 3).unwrap();
|
||||||
|
let sink = LogSink::new("s".to_string(), writer, 1024);
|
||||||
|
let mut rx = sink.broadcast.subscribe();
|
||||||
|
sink.record(LogStream::Stdout, "hello".to_string());
|
||||||
|
let got = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(got.line, "hello");
|
||||||
|
assert_eq!(got.stream, LogStream::Stdout);
|
||||||
|
assert_eq!(sink.ring.snapshot_tail(None).len(), 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user