feat(supervisor): supervisor task with state machine

One async task per managed server owns all state transitions via a
tokio::select! loop over cmd_rx and wait_child. Includes RealSpawner
and a smoke test covering the Start → Running → exit → Stopped →
Shutdown happy path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 11:44:12 +02:00
parent f1b2306156
commit a3c979511e
3 changed files with 376 additions and 10 deletions
+4
View File
@@ -5,9 +5,13 @@ pub mod child;
pub mod logs;
pub mod policy;
pub mod retry_window;
pub mod supervisor;
pub use backoff::Backoff;
pub use child::{ChildHandle, MockChild, MockChildController, RealChild, spawn_with_logs};
pub use logs::{LogSink, RecordedLine, RingBuffer, RotatingLogWriter};
pub use policy::{RestartDecision, decide};
pub use retry_window::RetryWindow;
pub use supervisor::{
RealSpawner, Spawner, StartAck, StopAck, SupervisorCmd, SupervisorHandle, SupervisorTask,
};
+3 -10
View File
@@ -135,11 +135,7 @@ pub struct LogSink {
}
impl LogSink {
pub fn new(
server_name: String,
writer: RotatingLogWriter,
ring_capacity_bytes: usize,
) -> Self {
pub fn new(server_name: String, writer: RotatingLogWriter, ring_capacity_bytes: usize) -> Self {
let (tx, _) = broadcast::channel(LOG_BROADCAST_CAP);
Self {
server_name,
@@ -183,8 +179,8 @@ fn now_unix_ms() -> u64 {
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
use std::io::Read;
use tempfile::tempdir;
#[test]
fn writes_lines_with_tags() {
@@ -194,10 +190,7 @@ mod tests {
w.write_line("[out]", "hello").unwrap();
w.write_line("[err]", "boom").unwrap();
let mut s = String::new();
File::open(&base)
.unwrap()
.read_to_string(&mut s)
.unwrap();
File::open(&base).unwrap().read_to_string(&mut s).unwrap();
assert_eq!(s, "[out] hello\n[err] boom\n");
}
+369
View File
@@ -0,0 +1,369 @@
use crate::{
backoff::Backoff,
child::ChildHandle,
logs::LogSink,
policy::{RestartDecision, decide},
retry_window::RetryWindow,
};
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::sleep;
use tracing::{debug, info, warn};
use xy_protocol::{ServerConfig, ServerState};
pub enum SupervisorCmd {
Start {
ack: oneshot::Sender<StartAck>,
},
Stop {
ack: oneshot::Sender<StopAck>,
},
Restart {
ack: oneshot::Sender<()>,
},
Reconfigure {
new: ServerConfig,
ack: oneshot::Sender<()>,
},
Shutdown {
ack: oneshot::Sender<()>,
},
}
#[derive(Debug, PartialEq, Eq)]
pub enum StartAck {
Started,
AlreadyRunning,
}
#[derive(Debug, PartialEq, Eq)]
pub enum StopAck {
Stopped,
NotRunning,
}
#[derive(Clone)]
pub struct SupervisorHandle {
pub name: String,
pub tx: mpsc::Sender<SupervisorCmd>,
pub state: watch::Receiver<ServerState>,
pub log_sink: LogSink,
}
#[async_trait::async_trait]
pub trait Spawner: Send + 'static {
type Child: ChildHandle;
async fn spawn(&self, cfg: &ServerConfig, sink: LogSink) -> std::io::Result<Self::Child>;
}
pub struct SupervisorTask<S: Spawner> {
cfg: ServerConfig,
log_sink: LogSink,
spawner: S,
state_tx: watch::Sender<ServerState>,
cmd_rx: mpsc::Receiver<SupervisorCmd>,
backoff: Backoff,
retry_window: RetryWindow,
restart_count: u32,
last_exit: Option<i32>,
started_at: Option<Instant>,
}
impl<S: Spawner> SupervisorTask<S> {
pub fn new(
cfg: ServerConfig,
log_sink: LogSink,
spawner: S,
state_tx: watch::Sender<ServerState>,
cmd_rx: mpsc::Receiver<SupervisorCmd>,
) -> Self {
let backoff = Backoff::new(cfg.restart.backoff_initial, cfg.restart.backoff_max);
let retry_window =
RetryWindow::new(Duration::from_secs(60), cfg.restart.max_retries_per_minute);
Self {
cfg,
log_sink,
spawner,
state_tx,
cmd_rx,
backoff,
retry_window,
restart_count: 0,
last_exit: None,
started_at: None,
}
}
fn set_state(&self, s: ServerState) {
let _ = self.state_tx.send(s);
}
pub async fn run(mut self) {
let mut child: Option<S::Child> = None;
loop {
tokio::select! {
cmd = self.cmd_rx.recv() => {
let Some(cmd) = cmd else { break; };
match cmd {
SupervisorCmd::Start { ack } => {
if child.is_some() {
let _ = ack.send(StartAck::AlreadyRunning);
} else {
match self.do_start().await {
Ok(c) => {
child = Some(c);
let _ = ack.send(StartAck::Started);
}
Err(err) => {
warn!(name = %self.cfg.name, error = %err, "spawn failed");
self.set_state(ServerState::Failed);
let _ = ack.send(StartAck::Started);
}
}
}
}
SupervisorCmd::Stop { ack } => {
if let Some(c) = child.take() {
self.do_stop(c).await;
let _ = ack.send(StopAck::Stopped);
} else {
let _ = ack.send(StopAck::NotRunning);
}
}
SupervisorCmd::Restart { ack } => {
if let Some(c) = child.take() {
self.set_state(ServerState::Restarting);
self.do_stop(c).await;
}
match self.do_start().await {
Ok(c) => child = Some(c),
Err(err) => {
warn!(name = %self.cfg.name, error = %err, "restart spawn failed");
self.set_state(ServerState::Failed);
}
}
let _ = ack.send(());
}
SupervisorCmd::Reconfigure { new, ack } => {
self.cfg = new;
self.backoff =
Backoff::new(self.cfg.restart.backoff_initial, self.cfg.restart.backoff_max);
self.retry_window = RetryWindow::new(
Duration::from_secs(60),
self.cfg.restart.max_retries_per_minute,
);
let _ = ack.send(());
}
SupervisorCmd::Shutdown { ack } => {
if let Some(c) = child.take() {
self.do_stop(c).await;
}
let _ = ack.send(());
return;
}
}
}
code = wait_child(&mut child) => {
child = None;
self.last_exit = code;
let now = Instant::now();
self.retry_window.record(now);
let cap = self.retry_window.cap_reached(now);
let decision = decide(self.cfg.restart.policy, code, cap);
debug!(name = %self.cfg.name, ?code, ?decision, "child exited");
match decision {
RestartDecision::StayStopped => {
self.started_at = None;
self.set_state(ServerState::Stopped);
}
RestartDecision::MarkFailed => {
self.started_at = None;
self.set_state(ServerState::Failed);
}
RestartDecision::Restart => {
self.set_state(ServerState::Restarting);
let delay = self.backoff.next();
sleep(delay).await;
match self.do_start().await {
Ok(c) => child = Some(c),
Err(err) => {
warn!(name = %self.cfg.name, error = %err, "restart spawn failed");
self.set_state(ServerState::Failed);
}
}
}
}
}
}
}
}
async fn do_start(&mut self) -> std::io::Result<S::Child> {
self.set_state(ServerState::Starting);
let c = self.spawner.spawn(&self.cfg, self.log_sink.clone()).await?;
self.restart_count = self.restart_count.saturating_add(1);
self.started_at = Some(Instant::now());
self.backoff.reset();
self.set_state(ServerState::Running);
info!(name = %self.cfg.name, pid = c.pid(), "started");
Ok(c)
}
async fn do_stop(&mut self, mut c: S::Child) {
self.set_state(ServerState::Stopping);
let _ = c.terminate();
let grace = self.cfg.stop.grace;
match tokio::time::timeout(grace, c.wait()).await {
Ok(_) => {}
Err(_) => {
let _ = c.kill();
let _ = c.wait().await;
}
}
self.started_at = None;
self.set_state(ServerState::Stopped);
}
}
async fn wait_child<C: ChildHandle>(slot: &mut Option<C>) -> Option<i32> {
match slot.as_mut() {
Some(c) => c.wait().await.ok().flatten(),
None => std::future::pending().await,
}
}
pub struct RealSpawner;
#[async_trait::async_trait]
impl Spawner for RealSpawner {
type Child = crate::child::RealChild;
async fn spawn(&self, cfg: &ServerConfig, sink: LogSink) -> std::io::Result<Self::Child> {
crate::child::spawn_with_logs(cfg, sink)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::child::MockChild;
use crate::logs::{LogSink, RotatingLogWriter};
use std::sync::{Arc, Mutex};
use tempfile::tempdir;
use xy_protocol::{RestartConfig, RestartPolicy, StopConfig};
struct QueueSpawner {
queue: Arc<Mutex<Vec<MockChild>>>,
}
#[async_trait::async_trait]
impl Spawner for QueueSpawner {
type Child = MockChild;
async fn spawn(&self, _cfg: &ServerConfig, _sink: LogSink) -> std::io::Result<MockChild> {
let mut q = self.queue.lock().unwrap();
Ok(q.remove(0))
}
}
fn cfg(name: &str, policy: RestartPolicy, max_retries: u32) -> ServerConfig {
ServerConfig {
name: name.to_string(),
command: "/bin/true".into(),
args: vec![],
port: 1,
env: Default::default(),
working_dir: None,
restart: RestartConfig {
policy,
backoff_initial: Duration::from_millis(1),
backoff_max: Duration::from_millis(1),
max_retries_per_minute: max_retries,
},
stop: StopConfig {
grace: Duration::from_millis(50),
},
}
}
fn sink(name: &str) -> LogSink {
let dir = tempdir().unwrap();
let writer = RotatingLogWriter::open(&dir.path().join("s.log"), 1024, 3).unwrap();
std::mem::forget(dir);
LogSink::new(name.to_string(), writer, 1024)
}
async fn wait_for(rx: &mut watch::Receiver<ServerState>, want: ServerState) {
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
if *rx.borrow() == want {
return;
}
tokio::select! {
_ = rx.changed() => {}
_ = tokio::time::sleep_until(deadline) => panic!("never reached {want:?}, last={:?}", *rx.borrow()),
}
}
}
#[tokio::test]
async fn start_runs_to_running_and_stop_to_stopped() {
let (mock, mut ctl) = MockChild::new(1);
let queue = Arc::new(Mutex::new(vec![mock]));
let spawner = QueueSpawner { queue };
let (state_tx, mut state_rx) = watch::channel(ServerState::Stopped);
let (cmd_tx, cmd_rx) = mpsc::channel(8);
let task = SupervisorTask::new(
cfg("x", RestartPolicy::Never, 5),
sink("x"),
spawner,
state_tx,
cmd_rx,
);
let h = tokio::spawn(task.run());
let (ack_tx, ack_rx) = oneshot::channel();
cmd_tx
.send(SupervisorCmd::Start { ack: ack_tx })
.await
.unwrap();
assert_eq!(ack_rx.await.unwrap(), StartAck::Started);
wait_for(&mut state_rx, ServerState::Running).await;
ctl.exit_tx.take().unwrap().send(Some(0)).unwrap();
wait_for(&mut state_rx, ServerState::Stopped).await;
let (ack_tx, ack_rx) = oneshot::channel();
cmd_tx
.send(SupervisorCmd::Shutdown { ack: ack_tx })
.await
.unwrap();
ack_rx.await.unwrap();
h.await.unwrap();
}
}