From 3ab982aea188409939931607df04ac19dcfaea60 Mon Sep 17 00:00:00 2001 From: Anders Olsson Date: Mon, 25 May 2026 11:52:41 +0200 Subject: [PATCH] feat(xy): daemon boot + accept loop + graceful shutdown Co-Authored-By: Claude Sonnet 4.6 --- crates/xy/src/daemon/handlers.rs | 8 ++ crates/xy/src/daemon/mod.rs | 136 ++++++++++++++++++++++++++++++- crates/xy/src/daemon/shutdown.rs | 45 ++++++++++ 3 files changed, 187 insertions(+), 2 deletions(-) create mode 100644 crates/xy/src/daemon/handlers.rs create mode 100644 crates/xy/src/daemon/shutdown.rs diff --git a/crates/xy/src/daemon/handlers.rs b/crates/xy/src/daemon/handlers.rs new file mode 100644 index 0000000..02e743f --- /dev/null +++ b/crates/xy/src/daemon/handlers.rs @@ -0,0 +1,8 @@ +use crate::daemon::registry::Registry; +use crate::paths::Paths; +use std::sync::Arc; +use xy_ipc::Connection; + +pub async fn serve(_conn: Arc, _reg: Registry, _paths: Paths) -> std::io::Result<()> { + Ok(()) +} diff --git a/crates/xy/src/daemon/mod.rs b/crates/xy/src/daemon/mod.rs index 3e003fe..98bbb22 100644 --- a/crates/xy/src/daemon/mod.rs +++ b/crates/xy/src/daemon/mod.rs @@ -1,6 +1,138 @@ use crate::paths::Paths; -use anyhow::{bail, Result}; +use crate::pidfile::PidFile; +use anyhow::{Context, Result}; +use std::sync::{Arc, OnceLock}; +use tokio::sync::{mpsc, oneshot, watch}; +use tracing::{error, info}; +use xy_ipc::{Connection, bind}; +use xy_protocol::{ServerConfig, ServerState, kdl_parse::load_all_configs}; +use xy_supervisor::{ + logs::{LogSink, RotatingLogWriter}, + supervisor::{RealSpawner, SupervisorCmd, SupervisorHandle, SupervisorTask}, +}; +pub mod handlers; pub mod registry; +pub mod shutdown; -pub async fn run(_paths: Paths) -> Result<()> { bail!("not implemented") } +const LOG_FILE_MAX_BYTES: u64 = 10 * 1024 * 1024; +const LOG_FILE_KEEP: usize = 5; +const RING_BUFFER_BYTES: usize = 1024 * 1024; + +pub static PATHS: OnceLock = OnceLock::new(); + +pub fn config_hash(cfg: &ServerConfig) -> u64 { + use std::hash::{Hash, Hasher}; + + let mut h = std::collections::hash_map::DefaultHasher::new(); + + serde_json::to_string(cfg).unwrap().hash(&mut h); + + h.finish() +} + +pub fn spawn_supervisor(paths: &Paths, cfg: ServerConfig) -> Result { + let log_path = paths.log_dir.join(format!("{}.log", cfg.name)); + + let writer = RotatingLogWriter::open(&log_path, LOG_FILE_MAX_BYTES, LOG_FILE_KEEP) + .with_context(|| format!("open log file {}", log_path.display()))?; + + let sink = LogSink::new(cfg.name.clone(), writer, RING_BUFFER_BYTES); + + let (state_tx, state_rx) = watch::channel(ServerState::Stopped); + let (cmd_tx, cmd_rx) = mpsc::channel(16); + + let name = cfg.name.clone(); + + let task = SupervisorTask::new(cfg, sink.clone(), RealSpawner, state_tx, cmd_rx); + + tokio::spawn(task.run()); + + Ok(SupervisorHandle { + name, + tx: cmd_tx, + state: state_rx, + log_sink: sink, + }) +} + +pub async fn run(paths: Paths) -> Result<()> { + paths.ensure_dirs().context("create state dirs")?; + + let _pid = + PidFile::acquire(&paths.pidfile).context("another xy daemon appears to be running")?; + + let listener = bind(&paths.socket).context("bind unix socket")?; + + let _ = PATHS.set(paths.clone()); + + info!(socket = %paths.socket.display(), "daemon listening"); + + let configs = load_all_configs(&paths.config_dir).context("load configs")?; + + let registry = registry::Registry::new(); + + for cfg in configs { + let hash = config_hash(&cfg); + + let handle = spawn_supervisor(&paths, cfg)?; + + let name = handle.name.clone(); + + registry + .insert( + name.clone(), + registry::Entry { + handle: handle.clone(), + config_hash: hash, + }, + ) + .await; + + let (ack_tx, ack_rx) = oneshot::channel(); + + if handle + .tx + .send(SupervisorCmd::Start { ack: ack_tx }) + .await + .is_ok() + { + let _ = ack_rx.await; + } + } + + let registry_for_shutdown = registry.clone(); + + let shutdown_signal = shutdown::install(); + + let accept = async { + loop { + let (stream, _addr) = match listener.accept().await { + Ok(p) => p, + Err(err) => { + error!(error = %err, "accept failed"); + continue; + } + }; + + let conn = Arc::new(Connection::new(stream)); + let reg = registry.clone(); + let paths_clone = paths.clone(); + + tokio::spawn(async move { + if let Err(err) = handlers::serve(conn, reg, paths_clone).await { + error!(error = %err, "connection ended with error"); + } + }); + } + }; + + tokio::select! { + _ = accept => {} + _ = shutdown_signal => { info!("shutdown signal received"); } + } + + shutdown::shutdown_all(registry_for_shutdown).await; + + Ok(()) +} diff --git a/crates/xy/src/daemon/shutdown.rs b/crates/xy/src/daemon/shutdown.rs new file mode 100644 index 0000000..0b60ac8 --- /dev/null +++ b/crates/xy/src/daemon/shutdown.rs @@ -0,0 +1,45 @@ +use crate::daemon::registry::Registry; +use tokio::signal::unix::{SignalKind, signal}; +use tokio::sync::oneshot; +use xy_supervisor::supervisor::SupervisorCmd; + +pub fn install() -> impl std::future::Future { + let mut term = signal(SignalKind::terminate()).expect("install SIGTERM handler"); + let mut int = signal(SignalKind::interrupt()).expect("install SIGINT handler"); + + async move { + tokio::select! { + _ = term.recv() => {} + _ = int.recv() => {} + } + } +} + +pub async fn shutdown_all(reg: Registry) { + let snapshot = reg.snapshot().await; + + let mut acks = Vec::new(); + + for (_name, entry) in &snapshot { + let (tx, rx) = oneshot::channel(); + + if entry + .handle + .tx + .send(SupervisorCmd::Shutdown { ack: tx }) + .await + .is_ok() + { + acks.push(rx); + } + } + + let deadline = tokio::time::Duration::from_secs(30); + + let _ = tokio::time::timeout(deadline, async { + for rx in acks { + let _ = rx.await; + } + }) + .await; +}