feat(supervisor): RealChild + spawn_with_logs
Append RealChild (real tokio::process::Child wrapper) and spawn_with_logs to child.rs. Uses nix::unistd::setpgid via tokio's re-exported pre_exec to create an own process group, and fires per-stream log pump tasks that drain stdout/stderr into the provided LogSink. terminate/kill signal the whole process group via kill(-pgid, SIG*). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -71,6 +71,125 @@ impl ChildHandle for MockChild {
|
||||
}
|
||||
}
|
||||
|
||||
use crate::logs::LogSink;
|
||||
use nix::sys::signal::{Signal, kill};
|
||||
use nix::unistd::Pid;
|
||||
use std::process::Stdio;
|
||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||
use tokio::process::{Child as TokioChild, Command};
|
||||
use xy_protocol::{ServerConfig, rpc::LogStream};
|
||||
|
||||
pub struct RealChild {
|
||||
pid: u32,
|
||||
pgid: Pid,
|
||||
child: Option<TokioChild>,
|
||||
}
|
||||
|
||||
impl RealChild {
|
||||
pub fn pgid(&self) -> Pid {
|
||||
self.pgid
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ChildHandle for RealChild {
|
||||
fn pid(&self) -> u32 {
|
||||
self.pid
|
||||
}
|
||||
|
||||
async fn wait(&mut self) -> std::io::Result<Option<i32>> {
|
||||
let child = self
|
||||
.child
|
||||
.as_mut()
|
||||
.ok_or_else(|| std::io::Error::other("already waited"))?;
|
||||
|
||||
let status = child.wait().await?;
|
||||
|
||||
Ok(status.code())
|
||||
}
|
||||
|
||||
fn terminate(&mut self) -> std::io::Result<()> {
|
||||
kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGTERM)
|
||||
.map_err(|err| std::io::Error::other(err.to_string()))
|
||||
}
|
||||
|
||||
fn kill(&mut self) -> std::io::Result<()> {
|
||||
kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGKILL)
|
||||
.map_err(|err| std::io::Error::other(err.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_with_logs(cfg: &ServerConfig, sink: LogSink) -> std::io::Result<RealChild> {
|
||||
let mut cmd = Command::new(&cfg.command);
|
||||
|
||||
cmd.args(&cfg.args);
|
||||
|
||||
for (k, v) in &cfg.env {
|
||||
cmd.env(k, v);
|
||||
}
|
||||
|
||||
if let Some(dir) = &cfg.working_dir {
|
||||
cmd.current_dir(dir);
|
||||
}
|
||||
|
||||
cmd.stdout(Stdio::piped());
|
||||
cmd.stderr(Stdio::piped());
|
||||
cmd.kill_on_drop(true);
|
||||
|
||||
// Own process group so signals reach the whole tree.
|
||||
unsafe {
|
||||
cmd.pre_exec(|| {
|
||||
nix::unistd::setpgid(Pid::from_raw(0), Pid::from_raw(0))
|
||||
.map_err(|err| std::io::Error::other(err.to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
let mut child = cmd.spawn()?;
|
||||
|
||||
let pid = child.id().ok_or_else(|| std::io::Error::other("no pid"))?;
|
||||
let pgid = Pid::from_raw(pid as i32);
|
||||
|
||||
if let Some(out) = child.stdout.take() {
|
||||
spawn_pump(out, sink.clone(), LogStream::Stdout);
|
||||
}
|
||||
|
||||
if let Some(err) = child.stderr.take() {
|
||||
spawn_pump(err, sink.clone(), LogStream::Stderr);
|
||||
}
|
||||
|
||||
Ok(RealChild {
|
||||
pid,
|
||||
pgid,
|
||||
child: Some(child),
|
||||
})
|
||||
}
|
||||
|
||||
fn spawn_pump<R: tokio::io::AsyncRead + Unpin + Send + 'static>(
|
||||
reader: R,
|
||||
sink: LogSink,
|
||||
stream: LogStream,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
let mut lines = BufReader::new(reader).lines();
|
||||
|
||||
loop {
|
||||
match lines.next_line().await {
|
||||
Ok(Some(line)) => sink.record(stream, line),
|
||||
Ok(None) => break,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
server = %sink.server_name,
|
||||
error = %err,
|
||||
?stream,
|
||||
"log pump read error"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -7,7 +7,7 @@ pub mod policy;
|
||||
pub mod retry_window;
|
||||
|
||||
pub use backoff::Backoff;
|
||||
pub use child::{ChildHandle, MockChild, MockChildController};
|
||||
pub use child::{ChildHandle, MockChild, MockChildController, RealChild, spawn_with_logs};
|
||||
pub use logs::{LogSink, RecordedLine, RingBuffer, RotatingLogWriter};
|
||||
pub use policy::{RestartDecision, decide};
|
||||
pub use retry_window::RetryWindow;
|
||||
|
||||
Reference in New Issue
Block a user