From 3e4ad79137fadb8de9d8d64e3290cd88a572da04 Mon Sep 17 00:00:00 2001 From: Anders Olsson Date: Mon, 25 May 2026 12:30:56 +0200 Subject: [PATCH] fix(supervisor): publish full status (pid, port, uptime, restart_count, last_exit) via watch channel Replace watch::Receiver on SupervisorHandle with watch::Receiver, a richer snapshot type that carries pid, port, uptime_secs, restart_count and last_exit. SupervisorTask maintains current_pid and publishes a fresh Status on every state transition; handlers.rs reads the full Status so list/status no longer return zeroed/None fields. Co-Authored-By: Claude Sonnet 4.6 --- crates/xy-supervisor/src/lib.rs | 3 +- crates/xy-supervisor/src/supervisor.rs | 68 +++++++++++++++++++------- crates/xy/src/daemon/handlers.rs | 28 ++++++----- crates/xy/src/daemon/mod.rs | 17 +++++-- 4 files changed, 80 insertions(+), 36 deletions(-) diff --git a/crates/xy-supervisor/src/lib.rs b/crates/xy-supervisor/src/lib.rs index 10882e0..4807b59 100644 --- a/crates/xy-supervisor/src/lib.rs +++ b/crates/xy-supervisor/src/lib.rs @@ -13,5 +13,6 @@ pub use logs::{LogSink, RecordedLine, RingBuffer, RotatingLogWriter}; pub use policy::{RestartDecision, decide}; pub use retry_window::RetryWindow; pub use supervisor::{ - RealSpawner, Spawner, StartAck, StopAck, SupervisorCmd, SupervisorHandle, SupervisorTask, + RealSpawner, Spawner, StartAck, Status, StopAck, SupervisorCmd, SupervisorHandle, + SupervisorTask, }; diff --git a/crates/xy-supervisor/src/supervisor.rs b/crates/xy-supervisor/src/supervisor.rs index e30a051..c1bdf64 100644 --- a/crates/xy-supervisor/src/supervisor.rs +++ b/crates/xy-supervisor/src/supervisor.rs @@ -42,11 +42,21 @@ pub enum StopAck { NotRunning, } +#[derive(Debug, Clone)] +pub struct Status { + pub state: ServerState, + pub pid: Option, + pub port: u16, + pub uptime_secs: Option, + pub restart_count: u32, + pub last_exit: Option, +} + #[derive(Clone)] pub struct SupervisorHandle { pub name: String, pub tx: mpsc::Sender, - pub state: watch::Receiver, + pub status: watch::Receiver, pub log_sink: LogSink, } @@ -60,13 +70,14 @@ pub struct SupervisorTask { cfg: ServerConfig, log_sink: LogSink, spawner: S, - state_tx: watch::Sender, + status_tx: watch::Sender, cmd_rx: mpsc::Receiver, backoff: Backoff, retry_window: RetryWindow, restart_count: u32, last_exit: Option, started_at: Option, + current_pid: Option, } impl SupervisorTask { @@ -74,7 +85,7 @@ impl SupervisorTask { cfg: ServerConfig, log_sink: LogSink, spawner: S, - state_tx: watch::Sender, + status_tx: watch::Sender, cmd_rx: mpsc::Receiver, ) -> Self { let backoff = Backoff::new(cfg.restart.backoff_initial, cfg.restart.backoff_max); @@ -85,18 +96,28 @@ impl SupervisorTask { cfg, log_sink, spawner, - state_tx, + status_tx, cmd_rx, backoff, retry_window, restart_count: 0, last_exit: None, started_at: None, + current_pid: None, } } - fn set_state(&self, s: ServerState) { - let _ = self.state_tx.send(s); + fn set_state(&mut self, s: ServerState) { + let uptime_secs = self.started_at.map(|t| t.elapsed().as_secs()); + + let _ = self.status_tx.send(Status { + state: s, + pid: self.current_pid, + port: self.cfg.port, + uptime_secs, + restart_count: self.restart_count, + last_exit: self.last_exit, + }); } pub async fn run(mut self) { @@ -174,6 +195,7 @@ impl SupervisorTask { child = None; self.last_exit = code; + self.current_pid = None; let now = Instant::now(); @@ -221,6 +243,7 @@ impl SupervisorTask { self.restart_count = self.restart_count.saturating_add(1); self.started_at = Some(Instant::now()); + self.current_pid = Some(c.pid()); self.backoff.reset(); self.set_state(ServerState::Running); @@ -244,6 +267,7 @@ impl SupervisorTask { } } + self.current_pid = None; self.started_at = None; self.set_state(ServerState::Stopped); } @@ -317,34 +341,40 @@ mod tests { LogSink::new(name.to_string(), writer, 1024) } - async fn wait_for(rx: &mut watch::Receiver, want: ServerState) { + fn initial_status(cfg: &ServerConfig) -> Status { + Status { + state: ServerState::Stopped, + pid: None, + port: cfg.port, + uptime_secs: None, + restart_count: 0, + last_exit: None, + } + } + + async fn wait_for(rx: &mut watch::Receiver, want: ServerState) { let deadline = tokio::time::Instant::now() + Duration::from_secs(2); loop { - if *rx.borrow() == want { + if rx.borrow().state == want { return; } tokio::select! { _ = rx.changed() => {} - _ = tokio::time::sleep_until(deadline) => panic!("never reached {want:?}, last={:?}", *rx.borrow()), + _ = tokio::time::sleep_until(deadline) => panic!("never reached {want:?}, last={:?}", rx.borrow().state), } } } #[tokio::test] async fn start_runs_to_running_and_stop_to_stopped() { + let cfg = cfg("x", RestartPolicy::Never, 5); let (mock, mut ctl) = MockChild::new(1); let queue = Arc::new(Mutex::new(vec![mock])); let spawner = QueueSpawner { queue }; - let (state_tx, mut state_rx) = watch::channel(ServerState::Stopped); + let (status_tx, mut status_rx) = watch::channel(initial_status(&cfg)); let (cmd_tx, cmd_rx) = mpsc::channel(8); - let task = SupervisorTask::new( - cfg("x", RestartPolicy::Never, 5), - sink("x"), - spawner, - state_tx, - cmd_rx, - ); + let task = SupervisorTask::new(cfg, sink("x"), spawner, status_tx, cmd_rx); let h = tokio::spawn(task.run()); let (ack_tx, ack_rx) = oneshot::channel(); @@ -353,10 +383,10 @@ mod tests { .await .unwrap(); assert_eq!(ack_rx.await.unwrap(), StartAck::Started); - wait_for(&mut state_rx, ServerState::Running).await; + wait_for(&mut status_rx, ServerState::Running).await; ctl.exit_tx.take().unwrap().send(Some(0)).unwrap(); - wait_for(&mut state_rx, ServerState::Stopped).await; + wait_for(&mut status_rx, ServerState::Stopped).await; let (ack_tx, ack_rx) = oneshot::channel(); cmd_tx diff --git a/crates/xy/src/daemon/handlers.rs b/crates/xy/src/daemon/handlers.rs index f0333fe..c5b5938 100644 --- a/crates/xy/src/daemon/handlers.rs +++ b/crates/xy/src/daemon/handlers.rs @@ -163,14 +163,16 @@ async fn list(reg: &Registry) -> Result, ApiError> { let mut out = Vec::new(); for (name, entry) in reg.snapshot().await { + let s = entry.handle.status.borrow(); + out.push(ServerSummary { name, - state: *entry.handle.state.borrow(), - pid: None, - port: 0, - uptime_secs: None, - restart_count: 0, - last_exit: None, + state: s.state, + pid: s.pid, + port: s.port, + uptime_secs: s.uptime_secs, + restart_count: s.restart_count, + last_exit: s.last_exit, }); } @@ -185,15 +187,17 @@ async fn status(reg: &Registry, name: &str) -> Result { )); }; + let s = entry.handle.status.borrow(); + Ok(StatusDetail { summary: ServerSummary { name: entry.handle.name.clone(), - state: *entry.handle.state.borrow(), - pid: None, - port: 0, - uptime_secs: None, - restart_count: 0, - last_exit: None, + state: s.state, + pid: s.pid, + port: s.port, + uptime_secs: s.uptime_secs, + restart_count: s.restart_count, + last_exit: s.last_exit, }, recent_transitions: Vec::new(), }) diff --git a/crates/xy/src/daemon/mod.rs b/crates/xy/src/daemon/mod.rs index 98bbb22..02a546e 100644 --- a/crates/xy/src/daemon/mod.rs +++ b/crates/xy/src/daemon/mod.rs @@ -8,7 +8,7 @@ use xy_ipc::{Connection, bind}; use xy_protocol::{ServerConfig, ServerState, kdl_parse::load_all_configs}; use xy_supervisor::{ logs::{LogSink, RotatingLogWriter}, - supervisor::{RealSpawner, SupervisorCmd, SupervisorHandle, SupervisorTask}, + supervisor::{RealSpawner, Status, SupervisorCmd, SupervisorHandle, SupervisorTask}, }; pub mod handlers; @@ -39,19 +39,28 @@ pub fn spawn_supervisor(paths: &Paths, cfg: ServerConfig) -> Result