feat(xy): reload handler with diff

Implements the `reload` JSON-RPC method: diffs the on-disk config dir
against the in-memory registry and reconciles — stops removed servers,
restarts changed servers (shutdown-then-respawn), and starts new ones.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 11:56:45 +02:00
parent 736e6d1854
commit c679465f12
6 changed files with 193 additions and 28 deletions
+22 -8
View File
@@ -1,10 +1,24 @@
use crate::paths::Paths;
use anyhow::{bail, Result};
use anyhow::{Result, bail};
pub async fn list(_p: Paths) -> Result<i32> { bail!("not implemented") }
pub async fn status(_p: Paths, _name: String) -> Result<i32> { bail!("not implemented") }
pub async fn start(_p: Paths, _all: bool, _name: Option<String>) -> Result<i32> { bail!("not implemented") }
pub async fn stop(_p: Paths, _all: bool, _name: Option<String>) -> Result<i32> { bail!("not implemented") }
pub async fn restart(_p: Paths, _all: bool, _name: Option<String>) -> Result<i32> { bail!("not implemented") }
pub async fn reload(_p: Paths) -> Result<i32> { bail!("not implemented") }
pub async fn logs(_p: Paths, _name: String, _tail: Option<u32>, _follow: bool) -> Result<i32> { bail!("not implemented") }
pub async fn list(_p: Paths) -> Result<i32> {
bail!("not implemented")
}
pub async fn status(_p: Paths, _name: String) -> Result<i32> {
bail!("not implemented")
}
pub async fn start(_p: Paths, _all: bool, _name: Option<String>) -> Result<i32> {
bail!("not implemented")
}
pub async fn stop(_p: Paths, _all: bool, _name: Option<String>) -> Result<i32> {
bail!("not implemented")
}
pub async fn restart(_p: Paths, _all: bool, _name: Option<String>) -> Result<i32> {
bail!("not implemented")
}
pub async fn reload(_p: Paths) -> Result<i32> {
bail!("not implemented")
}
pub async fn logs(_p: Paths, _name: String, _tail: Option<u32>, _follow: bool) -> Result<i32> {
bail!("not implemented")
}
+116 -1
View File
@@ -65,7 +65,10 @@ async fn handle_request(req: Request, reg: &Registry) -> Response {
methods::START => dispatch_lifecycle(id, params, reg, Op::Start).await,
methods::STOP => dispatch_lifecycle(id, params, reg, Op::Stop).await,
methods::RESTART => dispatch_lifecycle(id, params, reg, Op::Restart).await,
methods::RELOAD => err_response(id, -32601, "reload not yet implemented".into()),
methods::RELOAD => match reload(reg).await {
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
Err(e) => err_response(id, e.code, e.message),
},
methods::LOGS => err_response(id, -32601, "logs not yet implemented".into()),
methods::LOGS_CANCEL => err_response(id, -32601, "logs_cancel not yet implemented".into()),
other => err_response(id, -32601, format!("unknown method `{other}`")),
@@ -246,3 +249,115 @@ async fn dispatch_lifecycle(
}
}
}
use xy_protocol::rpc::ReloadResult;
async fn reload(reg: &Registry) -> Result<ReloadResult, ApiError> {
let paths = crate::daemon::PATHS.get().ok_or_else(|| {
ApiError::rpc(RpcErrorCode::ConfigInvalid, "daemon paths not initialized")
})?;
let new_configs = xy_protocol::kdl_parse::load_all_configs(&paths.config_dir)
.map_err(|err| ApiError::rpc(RpcErrorCode::ConfigInvalid, err.to_string()))?;
use std::collections::HashMap;
let new_by_name: HashMap<String, xy_protocol::ServerConfig> = new_configs
.into_iter()
.map(|c| (c.name.clone(), c))
.collect();
let existing_names: Vec<String> = reg.names().await;
let mut added = Vec::new();
let mut removed = Vec::new();
let mut changed = Vec::new();
let mut unchanged = Vec::new();
for name in &existing_names {
if !new_by_name.contains_key(name) {
if let Some(entry) = reg.remove(name).await {
let (tx, rx) = oneshot::channel();
let _ = entry
.handle
.tx
.send(SupervisorCmd::Shutdown { ack: tx })
.await;
let _ = rx.await;
removed.push(name.clone());
}
}
}
for (name, cfg) in new_by_name {
let new_hash = crate::daemon::config_hash(&cfg);
match reg.get(&name).await {
None => {
let handle = crate::daemon::spawn_supervisor(paths, cfg)
.map_err(|err| ApiError::rpc(RpcErrorCode::SpawnFailed, err.to_string()))?;
reg.insert(
name.clone(),
crate::daemon::registry::Entry {
handle: handle.clone(),
config_hash: new_hash,
},
)
.await;
let (tx, rx) = oneshot::channel();
let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
let _ = rx.await;
added.push(name);
}
Some(entry) if entry.config_hash != new_hash => {
let (tx, rx) = oneshot::channel();
let _ = entry
.handle
.tx
.send(SupervisorCmd::Shutdown { ack: tx })
.await;
let _ = rx.await;
reg.remove(&name).await;
let handle = crate::daemon::spawn_supervisor(paths, cfg)
.map_err(|err| ApiError::rpc(RpcErrorCode::SpawnFailed, err.to_string()))?;
reg.insert(
name.clone(),
crate::daemon::registry::Entry {
handle: handle.clone(),
config_hash: new_hash,
},
)
.await;
let (tx, rx) = oneshot::channel();
let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
let _ = rx.await;
changed.push(name);
}
Some(_) => unchanged.push(name),
}
}
Ok(ReloadResult {
added,
removed,
changed,
unchanged,
})
}
+5 -2
View File
@@ -15,7 +15,9 @@ pub struct Registry {
}
impl Registry {
pub fn new() -> Self { Self::default() }
pub fn new() -> Self {
Self::default()
}
pub async fn insert(&self, name: String, entry: Entry) {
self.inner.write().await.insert(name, entry);
}
@@ -28,7 +30,8 @@ impl Registry {
pub async fn names(&self) -> Vec<String> {
let g = self.inner.read().await;
let mut v: Vec<String> = g.keys().cloned().collect();
v.sort(); v
v.sort();
v
}
pub async fn snapshot(&self) -> Vec<(String, Entry)> {
let g = self.inner.read().await;
+28 -12
View File
@@ -22,41 +22,54 @@ enum Cmd {
Status { name: String },
/// Start a server (or all configured servers with --all).
Start {
#[arg(long, conflicts_with = "name")] all: bool,
#[arg(required_unless_present = "all")] name: Option<String>,
#[arg(long, conflicts_with = "name")]
all: bool,
#[arg(required_unless_present = "all")]
name: Option<String>,
},
/// Stop a server (or --all).
Stop {
#[arg(long, conflicts_with = "name")] all: bool,
#[arg(required_unless_present = "all")] name: Option<String>,
#[arg(long, conflicts_with = "name")]
all: bool,
#[arg(required_unless_present = "all")]
name: Option<String>,
},
/// Restart a server (or --all).
Restart {
#[arg(long, conflicts_with = "name")] all: bool,
#[arg(required_unless_present = "all")] name: Option<String>,
#[arg(long, conflicts_with = "name")]
all: bool,
#[arg(required_unless_present = "all")]
name: Option<String>,
},
/// Re-read config dir and reconcile running servers.
Reload,
/// Stream a server's log.
Logs {
name: String,
#[arg(long)] tail: Option<u32>,
#[arg(short = 'f', long)] follow: bool,
#[arg(long)]
tail: Option<u32>,
#[arg(short = 'f', long)]
follow: bool,
},
}
#[tokio::main]
async fn main() -> std::process::ExitCode {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")))
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.with_writer(std::io::stderr)
.init();
let cli = Cli::parse();
let paths = match paths::Paths::resolve() {
Ok(p) => p,
Err(e) => { eprintln!("xy: failed to resolve XDG paths: {e}"); return std::process::ExitCode::from(3); }
Err(e) => {
eprintln!("xy: failed to resolve XDG paths: {e}");
return std::process::ExitCode::from(3);
}
};
let result: anyhow::Result<i32> = match cli.cmd {
@@ -72,6 +85,9 @@ async fn main() -> std::process::ExitCode {
match result {
Ok(code) => std::process::ExitCode::from(code as u8),
Err(e) => { eprintln!("xy: {e:#}"); std::process::ExitCode::from(1) }
Err(e) => {
eprintln!("xy: {e:#}");
std::process::ExitCode::from(1)
}
}
}
+7 -1
View File
@@ -20,7 +20,13 @@ impl Paths {
.map(|p| PathBuf::from(p).join("xy.sock"))
.unwrap_or_else(|| state_dir.join("xy.sock"));
let pidfile = state_dir.join("xy.pid");
Ok(Self { config_dir, state_dir, log_dir, socket, pidfile })
Ok(Self {
config_dir,
state_dir,
log_dir,
socket,
pidfile,
})
}
pub fn ensure_dirs(&self) -> std::io::Result<()> {
+15 -4
View File
@@ -12,14 +12,22 @@ pub struct PidFile {
impl PidFile {
pub fn acquire(path: &Path) -> std::io::Result<Self> {
let mut f = OpenOptions::new()
.write(true).create_new(true).mode(0o600).open(path)?;
.write(true)
.create_new(true)
.mode(0o600)
.open(path)?;
writeln!(f, "{}", std::process::id())?;
Ok(Self { path: path.to_path_buf(), _file: f })
Ok(Self {
path: path.to_path_buf(),
_file: f,
})
}
}
impl Drop for PidFile {
fn drop(&mut self) { let _ = std::fs::remove_file(&self.path); }
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}
#[cfg(test)]
@@ -40,7 +48,10 @@ mod tests {
fn drop_removes_file() {
let dir = tempdir().unwrap();
let p = dir.path().join("x.pid");
{ let _g = PidFile::acquire(&p).unwrap(); assert!(p.exists()); }
{
let _g = PidFile::acquire(&p).unwrap();
assert!(p.exists());
}
assert!(!p.exists());
}
}