diff --git a/crates/xy-supervisor/src/child.rs b/crates/xy-supervisor/src/child.rs index fd9cc15..eb9c361 100644 --- a/crates/xy-supervisor/src/child.rs +++ b/crates/xy-supervisor/src/child.rs @@ -71,6 +71,125 @@ impl ChildHandle for MockChild { } } +use crate::logs::LogSink; +use nix::sys::signal::{Signal, kill}; +use nix::unistd::Pid; +use std::process::Stdio; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::{Child as TokioChild, Command}; +use xy_protocol::{ServerConfig, rpc::LogStream}; + +pub struct RealChild { + pid: u32, + pgid: Pid, + child: Option, +} + +impl RealChild { + pub fn pgid(&self) -> Pid { + self.pgid + } +} + +#[async_trait::async_trait] +impl ChildHandle for RealChild { + fn pid(&self) -> u32 { + self.pid + } + + async fn wait(&mut self) -> std::io::Result> { + let child = self + .child + .as_mut() + .ok_or_else(|| std::io::Error::other("already waited"))?; + + let status = child.wait().await?; + + Ok(status.code()) + } + + fn terminate(&mut self) -> std::io::Result<()> { + kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGTERM) + .map_err(|err| std::io::Error::other(err.to_string())) + } + + fn kill(&mut self) -> std::io::Result<()> { + kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGKILL) + .map_err(|err| std::io::Error::other(err.to_string())) + } +} + +pub fn spawn_with_logs(cfg: &ServerConfig, sink: LogSink) -> std::io::Result { + let mut cmd = Command::new(&cfg.command); + + cmd.args(&cfg.args); + + for (k, v) in &cfg.env { + cmd.env(k, v); + } + + if let Some(dir) = &cfg.working_dir { + cmd.current_dir(dir); + } + + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); + cmd.kill_on_drop(true); + + // Own process group so signals reach the whole tree. + unsafe { + cmd.pre_exec(|| { + nix::unistd::setpgid(Pid::from_raw(0), Pid::from_raw(0)) + .map_err(|err| std::io::Error::other(err.to_string())) + }); + } + + let mut child = cmd.spawn()?; + + let pid = child.id().ok_or_else(|| std::io::Error::other("no pid"))?; + let pgid = Pid::from_raw(pid as i32); + + if let Some(out) = child.stdout.take() { + spawn_pump(out, sink.clone(), LogStream::Stdout); + } + + if let Some(err) = child.stderr.take() { + spawn_pump(err, sink.clone(), LogStream::Stderr); + } + + Ok(RealChild { + pid, + pgid, + child: Some(child), + }) +} + +fn spawn_pump( + reader: R, + sink: LogSink, + stream: LogStream, +) { + tokio::spawn(async move { + let mut lines = BufReader::new(reader).lines(); + + loop { + match lines.next_line().await { + Ok(Some(line)) => sink.record(stream, line), + Ok(None) => break, + Err(err) => { + tracing::warn!( + server = %sink.server_name, + error = %err, + ?stream, + "log pump read error" + ); + break; + } + } + } + }); +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/xy-supervisor/src/lib.rs b/crates/xy-supervisor/src/lib.rs index 9a6df14..2241d41 100644 --- a/crates/xy-supervisor/src/lib.rs +++ b/crates/xy-supervisor/src/lib.rs @@ -7,7 +7,7 @@ pub mod policy; pub mod retry_window; pub use backoff::Backoff; -pub use child::{ChildHandle, MockChild, MockChildController}; +pub use child::{ChildHandle, MockChild, MockChildController, RealChild, spawn_with_logs}; pub use logs::{LogSink, RecordedLine, RingBuffer, RotatingLogWriter}; pub use policy::{RestartDecision, decide}; pub use retry_window::RetryWindow;