feat(supervisor): ring buffer for recent log lines
This commit is contained in:
@@ -8,6 +8,6 @@ pub mod retry_window;
|
||||
|
||||
pub use backoff::Backoff;
|
||||
pub use child::{ChildHandle, MockChild, MockChildController};
|
||||
pub use logs::RotatingLogWriter;
|
||||
pub use logs::{RecordedLine, RingBuffer, RotatingLogWriter};
|
||||
pub use policy::{RestartDecision, decide};
|
||||
pub use retry_window::RetryWindow;
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
pub struct RotatingLogWriter {
|
||||
base: PathBuf,
|
||||
@@ -64,6 +66,61 @@ impl RotatingLogWriter {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RingBuffer {
|
||||
inner: Arc<Mutex<RingBufferInner>>,
|
||||
capacity_bytes: usize,
|
||||
}
|
||||
|
||||
struct RingBufferInner {
|
||||
lines: VecDeque<RecordedLine>,
|
||||
bytes: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RecordedLine {
|
||||
pub stream: xy_protocol::rpc::LogStream,
|
||||
pub line: String,
|
||||
pub ts_unix_ms: u64,
|
||||
}
|
||||
|
||||
impl RingBuffer {
|
||||
pub fn new(capacity_bytes: usize) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(RingBufferInner {
|
||||
lines: VecDeque::new(),
|
||||
bytes: 0,
|
||||
})),
|
||||
capacity_bytes,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push(&self, line: RecordedLine) {
|
||||
let mut g = self.inner.lock().unwrap();
|
||||
g.bytes += line.line.len();
|
||||
g.lines.push_back(line);
|
||||
while g.bytes > self.capacity_bytes {
|
||||
if let Some(removed) = g.lines.pop_front() {
|
||||
g.bytes -= removed.line.len();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn snapshot_tail(&self, n: Option<u32>) -> Vec<RecordedLine> {
|
||||
let g = self.inner.lock().unwrap();
|
||||
match n {
|
||||
None => g.lines.iter().cloned().collect(),
|
||||
Some(n) => {
|
||||
let take = (n as usize).min(g.lines.len());
|
||||
let start = g.lines.len() - take;
|
||||
g.lines.iter().skip(start).cloned().collect()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -101,4 +158,38 @@ mod tests {
|
||||
rotated.display()
|
||||
);
|
||||
}
|
||||
|
||||
use xy_protocol::rpc::LogStream;
|
||||
|
||||
fn recorded(s: &str) -> RecordedLine {
|
||||
RecordedLine {
|
||||
stream: LogStream::Stdout,
|
||||
line: s.to_string(),
|
||||
ts_unix_ms: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ring_buffer_drops_oldest_when_full() {
|
||||
let rb = RingBuffer::new(10);
|
||||
rb.push(recorded("aaaaa"));
|
||||
rb.push(recorded("bbbbb"));
|
||||
rb.push(recorded("ccc"));
|
||||
let snap = rb.snapshot_tail(None);
|
||||
assert_eq!(snap.len(), 2);
|
||||
assert_eq!(snap[0].line, "bbbbb");
|
||||
assert_eq!(snap[1].line, "ccc");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ring_buffer_tail_n() {
|
||||
let rb = RingBuffer::new(1024);
|
||||
for i in 0..5 {
|
||||
rb.push(recorded(&format!("line{i}")));
|
||||
}
|
||||
let snap = rb.snapshot_tail(Some(2));
|
||||
assert_eq!(snap.len(), 2);
|
||||
assert_eq!(snap[0].line, "line3");
|
||||
assert_eq!(snap[1].line, "line4");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user