From c679465f12b07ab01f04d286721c66ee2aa617f1 Mon Sep 17 00:00:00 2001 From: Anders Olsson Date: Mon, 25 May 2026 11:56:45 +0200 Subject: [PATCH] feat(xy): reload handler with diff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the `reload` JSON-RPC method: diffs the on-disk config dir against the in-memory registry and reconciles — stops removed servers, restarts changed servers (shutdown-then-respawn), and starts new ones. Co-Authored-By: Claude Sonnet 4.6 --- crates/xy/src/cli/mod.rs | 30 +++++--- crates/xy/src/daemon/handlers.rs | 117 ++++++++++++++++++++++++++++++- crates/xy/src/daemon/registry.rs | 7 +- crates/xy/src/main.rs | 40 +++++++---- crates/xy/src/paths.rs | 8 ++- crates/xy/src/pidfile.rs | 19 +++-- 6 files changed, 193 insertions(+), 28 deletions(-) diff --git a/crates/xy/src/cli/mod.rs b/crates/xy/src/cli/mod.rs index 0b13e58..7087111 100644 --- a/crates/xy/src/cli/mod.rs +++ b/crates/xy/src/cli/mod.rs @@ -1,10 +1,24 @@ use crate::paths::Paths; -use anyhow::{bail, Result}; +use anyhow::{Result, bail}; -pub async fn list(_p: Paths) -> Result { bail!("not implemented") } -pub async fn status(_p: Paths, _name: String) -> Result { bail!("not implemented") } -pub async fn start(_p: Paths, _all: bool, _name: Option) -> Result { bail!("not implemented") } -pub async fn stop(_p: Paths, _all: bool, _name: Option) -> Result { bail!("not implemented") } -pub async fn restart(_p: Paths, _all: bool, _name: Option) -> Result { bail!("not implemented") } -pub async fn reload(_p: Paths) -> Result { bail!("not implemented") } -pub async fn logs(_p: Paths, _name: String, _tail: Option, _follow: bool) -> Result { bail!("not implemented") } +pub async fn list(_p: Paths) -> Result { + bail!("not implemented") +} +pub async fn status(_p: Paths, _name: String) -> Result { + bail!("not implemented") +} +pub async fn start(_p: Paths, _all: bool, _name: Option) -> Result { + bail!("not implemented") +} +pub async fn stop(_p: Paths, _all: bool, _name: Option) -> Result { + bail!("not implemented") +} +pub async fn restart(_p: Paths, _all: bool, _name: Option) -> Result { + bail!("not implemented") +} +pub async fn reload(_p: Paths) -> Result { + bail!("not implemented") +} +pub async fn logs(_p: Paths, _name: String, _tail: Option, _follow: bool) -> Result { + bail!("not implemented") +} diff --git a/crates/xy/src/daemon/handlers.rs b/crates/xy/src/daemon/handlers.rs index 0566ead..53059b2 100644 --- a/crates/xy/src/daemon/handlers.rs +++ b/crates/xy/src/daemon/handlers.rs @@ -65,7 +65,10 @@ async fn handle_request(req: Request, reg: &Registry) -> Response { methods::START => dispatch_lifecycle(id, params, reg, Op::Start).await, methods::STOP => dispatch_lifecycle(id, params, reg, Op::Stop).await, methods::RESTART => dispatch_lifecycle(id, params, reg, Op::Restart).await, - methods::RELOAD => err_response(id, -32601, "reload not yet implemented".into()), + methods::RELOAD => match reload(reg).await { + Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()), + Err(e) => err_response(id, e.code, e.message), + }, methods::LOGS => err_response(id, -32601, "logs not yet implemented".into()), methods::LOGS_CANCEL => err_response(id, -32601, "logs_cancel not yet implemented".into()), other => err_response(id, -32601, format!("unknown method `{other}`")), @@ -246,3 +249,115 @@ async fn dispatch_lifecycle( } } } + +use xy_protocol::rpc::ReloadResult; + +async fn reload(reg: &Registry) -> Result { + let paths = crate::daemon::PATHS.get().ok_or_else(|| { + ApiError::rpc(RpcErrorCode::ConfigInvalid, "daemon paths not initialized") + })?; + + let new_configs = xy_protocol::kdl_parse::load_all_configs(&paths.config_dir) + .map_err(|err| ApiError::rpc(RpcErrorCode::ConfigInvalid, err.to_string()))?; + + use std::collections::HashMap; + + let new_by_name: HashMap = new_configs + .into_iter() + .map(|c| (c.name.clone(), c)) + .collect(); + + let existing_names: Vec = reg.names().await; + + let mut added = Vec::new(); + let mut removed = Vec::new(); + let mut changed = Vec::new(); + let mut unchanged = Vec::new(); + + for name in &existing_names { + if !new_by_name.contains_key(name) { + if let Some(entry) = reg.remove(name).await { + let (tx, rx) = oneshot::channel(); + + let _ = entry + .handle + .tx + .send(SupervisorCmd::Shutdown { ack: tx }) + .await; + + let _ = rx.await; + + removed.push(name.clone()); + } + } + } + + for (name, cfg) in new_by_name { + let new_hash = crate::daemon::config_hash(&cfg); + + match reg.get(&name).await { + None => { + let handle = crate::daemon::spawn_supervisor(paths, cfg) + .map_err(|err| ApiError::rpc(RpcErrorCode::SpawnFailed, err.to_string()))?; + + reg.insert( + name.clone(), + crate::daemon::registry::Entry { + handle: handle.clone(), + config_hash: new_hash, + }, + ) + .await; + + let (tx, rx) = oneshot::channel(); + + let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await; + + let _ = rx.await; + + added.push(name); + } + Some(entry) if entry.config_hash != new_hash => { + let (tx, rx) = oneshot::channel(); + + let _ = entry + .handle + .tx + .send(SupervisorCmd::Shutdown { ack: tx }) + .await; + + let _ = rx.await; + + reg.remove(&name).await; + + let handle = crate::daemon::spawn_supervisor(paths, cfg) + .map_err(|err| ApiError::rpc(RpcErrorCode::SpawnFailed, err.to_string()))?; + + reg.insert( + name.clone(), + crate::daemon::registry::Entry { + handle: handle.clone(), + config_hash: new_hash, + }, + ) + .await; + + let (tx, rx) = oneshot::channel(); + + let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await; + + let _ = rx.await; + + changed.push(name); + } + Some(_) => unchanged.push(name), + } + } + + Ok(ReloadResult { + added, + removed, + changed, + unchanged, + }) +} diff --git a/crates/xy/src/daemon/registry.rs b/crates/xy/src/daemon/registry.rs index 0fd9b45..7861704 100644 --- a/crates/xy/src/daemon/registry.rs +++ b/crates/xy/src/daemon/registry.rs @@ -15,7 +15,9 @@ pub struct Registry { } impl Registry { - pub fn new() -> Self { Self::default() } + pub fn new() -> Self { + Self::default() + } pub async fn insert(&self, name: String, entry: Entry) { self.inner.write().await.insert(name, entry); } @@ -28,7 +30,8 @@ impl Registry { pub async fn names(&self) -> Vec { let g = self.inner.read().await; let mut v: Vec = g.keys().cloned().collect(); - v.sort(); v + v.sort(); + v } pub async fn snapshot(&self) -> Vec<(String, Entry)> { let g = self.inner.read().await; diff --git a/crates/xy/src/main.rs b/crates/xy/src/main.rs index 3145272..53ad0db 100644 --- a/crates/xy/src/main.rs +++ b/crates/xy/src/main.rs @@ -22,41 +22,54 @@ enum Cmd { Status { name: String }, /// Start a server (or all configured servers with --all). Start { - #[arg(long, conflicts_with = "name")] all: bool, - #[arg(required_unless_present = "all")] name: Option, + #[arg(long, conflicts_with = "name")] + all: bool, + #[arg(required_unless_present = "all")] + name: Option, }, /// Stop a server (or --all). Stop { - #[arg(long, conflicts_with = "name")] all: bool, - #[arg(required_unless_present = "all")] name: Option, + #[arg(long, conflicts_with = "name")] + all: bool, + #[arg(required_unless_present = "all")] + name: Option, }, /// Restart a server (or --all). Restart { - #[arg(long, conflicts_with = "name")] all: bool, - #[arg(required_unless_present = "all")] name: Option, + #[arg(long, conflicts_with = "name")] + all: bool, + #[arg(required_unless_present = "all")] + name: Option, }, /// Re-read config dir and reconcile running servers. Reload, /// Stream a server's log. Logs { name: String, - #[arg(long)] tail: Option, - #[arg(short = 'f', long)] follow: bool, + #[arg(long)] + tail: Option, + #[arg(short = 'f', long)] + follow: bool, }, } #[tokio::main] async fn main() -> std::process::ExitCode { tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))) + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) .with_writer(std::io::stderr) .init(); let cli = Cli::parse(); let paths = match paths::Paths::resolve() { Ok(p) => p, - Err(e) => { eprintln!("xy: failed to resolve XDG paths: {e}"); return std::process::ExitCode::from(3); } + Err(e) => { + eprintln!("xy: failed to resolve XDG paths: {e}"); + return std::process::ExitCode::from(3); + } }; let result: anyhow::Result = match cli.cmd { @@ -72,6 +85,9 @@ async fn main() -> std::process::ExitCode { match result { Ok(code) => std::process::ExitCode::from(code as u8), - Err(e) => { eprintln!("xy: {e:#}"); std::process::ExitCode::from(1) } + Err(e) => { + eprintln!("xy: {e:#}"); + std::process::ExitCode::from(1) + } } } diff --git a/crates/xy/src/paths.rs b/crates/xy/src/paths.rs index 15df94d..fef2d81 100644 --- a/crates/xy/src/paths.rs +++ b/crates/xy/src/paths.rs @@ -20,7 +20,13 @@ impl Paths { .map(|p| PathBuf::from(p).join("xy.sock")) .unwrap_or_else(|| state_dir.join("xy.sock")); let pidfile = state_dir.join("xy.pid"); - Ok(Self { config_dir, state_dir, log_dir, socket, pidfile }) + Ok(Self { + config_dir, + state_dir, + log_dir, + socket, + pidfile, + }) } pub fn ensure_dirs(&self) -> std::io::Result<()> { diff --git a/crates/xy/src/pidfile.rs b/crates/xy/src/pidfile.rs index 3bf6ee0..a7ff1dc 100644 --- a/crates/xy/src/pidfile.rs +++ b/crates/xy/src/pidfile.rs @@ -12,14 +12,22 @@ pub struct PidFile { impl PidFile { pub fn acquire(path: &Path) -> std::io::Result { let mut f = OpenOptions::new() - .write(true).create_new(true).mode(0o600).open(path)?; + .write(true) + .create_new(true) + .mode(0o600) + .open(path)?; writeln!(f, "{}", std::process::id())?; - Ok(Self { path: path.to_path_buf(), _file: f }) + Ok(Self { + path: path.to_path_buf(), + _file: f, + }) } } impl Drop for PidFile { - fn drop(&mut self) { let _ = std::fs::remove_file(&self.path); } + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.path); + } } #[cfg(test)] @@ -40,7 +48,10 @@ mod tests { fn drop_removes_file() { let dir = tempdir().unwrap(); let p = dir.path().join("x.pid"); - { let _g = PidFile::acquire(&p).unwrap(); assert!(p.exists()); } + { + let _g = PidFile::acquire(&p).unwrap(); + assert!(p.exists()); + } assert!(!p.exists()); } }