From 736e6d18542435934f57d5f58aa6f49532cc462f Mon Sep 17 00:00:00 2001 From: Anders Olsson Date: Mon, 25 May 2026 11:54:52 +0200 Subject: [PATCH] feat(xy): RPC handlers for list/status/start/stop/restart MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-connection JSON-RPC dispatch in daemon/handlers.rs — list, status, start, stop, and restart are fully implemented; reload, logs, and logs_cancel are stubbed with -32601 for later tasks. Co-Authored-By: Claude Sonnet 4.6 --- crates/xy/src/daemon/handlers.rs | 244 ++++++++++++++++++++++++++++++- 1 file changed, 242 insertions(+), 2 deletions(-) diff --git a/crates/xy/src/daemon/handlers.rs b/crates/xy/src/daemon/handlers.rs index 02e743f..0566ead 100644 --- a/crates/xy/src/daemon/handlers.rs +++ b/crates/xy/src/daemon/handlers.rs @@ -1,8 +1,248 @@ use crate::daemon::registry::Registry; use crate::paths::Paths; use std::sync::Arc; +use tokio::sync::oneshot; use xy_ipc::Connection; +use xy_ipc::envelope::{Incoming, Request, Response, err_response, ok_response}; +use xy_protocol::RpcErrorCode; +use xy_protocol::rpc::{ + NameOrAll, RestartResult, ServerSummary, StartResult, StatusDetail, StopResult, methods, +}; +use xy_supervisor::supervisor::{StartAck, StopAck, SupervisorCmd}; -pub async fn serve(_conn: Arc, _reg: Registry, _paths: Paths) -> std::io::Result<()> { - Ok(()) +pub async fn serve(conn: Arc, reg: Registry, _paths: Paths) -> std::io::Result<()> { + loop { + let Some(incoming) = conn.read_incoming().await? else { + return Ok(()); + }; + + match incoming { + Incoming::Request(req) => { + let resp = handle_request(req, ®).await; + + conn.write_response(&resp).await?; + } + _ => continue, + } + } +} + +struct ApiError { + code: i32, + message: String, +} + +impl ApiError { + fn rpc(code: RpcErrorCode, msg: impl Into) -> Self { + Self { + code: code.as_i32(), + message: msg.into(), + } + } +} + +async fn handle_request(req: Request, reg: &Registry) -> Response { + let id = req.id.clone(); + let method = req.method.as_str(); + let params = req.params.unwrap_or(serde_json::Value::Null); + + match method { + methods::LIST => match list(reg).await { + Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()), + Err(err) => err_response(id, err.code, err.message), + }, + methods::STATUS => { + let p: xy_protocol::rpc::StatusParams = match serde_json::from_value(params) { + Ok(p) => p, + Err(err) => return err_response(id, -32602, format!("invalid params: {err}")), + }; + + match status(reg, &p.name).await { + Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()), + Err(err) => err_response(id, err.code, err.message), + } + } + methods::START => dispatch_lifecycle(id, params, reg, Op::Start).await, + methods::STOP => dispatch_lifecycle(id, params, reg, Op::Stop).await, + methods::RESTART => dispatch_lifecycle(id, params, reg, Op::Restart).await, + methods::RELOAD => err_response(id, -32601, "reload not yet implemented".into()), + methods::LOGS => err_response(id, -32601, "logs not yet implemented".into()), + methods::LOGS_CANCEL => err_response(id, -32601, "logs_cancel not yet implemented".into()), + other => err_response(id, -32601, format!("unknown method `{other}`")), + } +} + +async fn list(reg: &Registry) -> Result, ApiError> { + let mut out = Vec::new(); + + for (name, entry) in reg.snapshot().await { + out.push(ServerSummary { + name, + state: *entry.handle.state.borrow(), + pid: None, + port: 0, + uptime_secs: None, + restart_count: 0, + last_exit: None, + }); + } + + Ok(out) +} + +async fn status(reg: &Registry, name: &str) -> Result { + let Some(entry) = reg.get(name).await else { + return Err(ApiError::rpc( + RpcErrorCode::ServerNotFound, + format!("no such server `{name}`"), + )); + }; + + Ok(StatusDetail { + summary: ServerSummary { + name: entry.handle.name.clone(), + state: *entry.handle.state.borrow(), + pid: None, + port: 0, + uptime_secs: None, + restart_count: 0, + last_exit: None, + }, + recent_transitions: Vec::new(), + }) +} + +enum Op { + Start, + Stop, + Restart, +} + +async fn dispatch_lifecycle( + id: serde_json::Value, + params: serde_json::Value, + reg: &Registry, + op: Op, +) -> Response { + let p: NameOrAll = match serde_json::from_value(params) { + Ok(p) => p, + Err(err) => return err_response(id, -32602, format!("invalid params: {err}")), + }; + + let targets: Vec = match p { + NameOrAll::All { all } if all => reg.names().await, + NameOrAll::Name { name } => vec![name], + NameOrAll::All { .. } => return err_response(id, -32602, "must set all=true".into()), + }; + + match op { + Op::Start => { + let mut started = Vec::new(); + let mut already = Vec::new(); + + for name in targets { + let Some(entry) = reg.get(&name).await else { + return err_response( + id, + RpcErrorCode::ServerNotFound.as_i32(), + format!("no such server `{name}`"), + ); + }; + + let (tx, rx) = oneshot::channel(); + + let _ = entry.handle.tx.send(SupervisorCmd::Start { ack: tx }).await; + + match rx.await { + Ok(StartAck::Started) => started.push(name), + Ok(StartAck::AlreadyRunning) => already.push(name), + Err(_) => { + return err_response( + id, + RpcErrorCode::SpawnFailed.as_i32(), + format!("supervisor for `{name}` dropped"), + ); + } + } + } + + ok_response( + id, + serde_json::to_value(StartResult { + started, + already_running: already, + }) + .unwrap(), + ) + } + Op::Stop => { + let mut stopped = Vec::new(); + let mut not_running = Vec::new(); + + for name in targets { + let Some(entry) = reg.get(&name).await else { + return err_response( + id, + RpcErrorCode::ServerNotFound.as_i32(), + format!("no such server `{name}`"), + ); + }; + + let (tx, rx) = oneshot::channel(); + + let _ = entry.handle.tx.send(SupervisorCmd::Stop { ack: tx }).await; + + match rx.await { + Ok(StopAck::Stopped) => stopped.push(name), + Ok(StopAck::NotRunning) => not_running.push(name), + Err(_) => { + return err_response( + id, + RpcErrorCode::SpawnFailed.as_i32(), + format!("supervisor for `{name}` dropped"), + ); + } + } + } + + ok_response( + id, + serde_json::to_value(StopResult { + stopped, + not_running, + }) + .unwrap(), + ) + } + Op::Restart => { + let mut restarted = Vec::new(); + + for name in targets { + let Some(entry) = reg.get(&name).await else { + return err_response( + id, + RpcErrorCode::ServerNotFound.as_i32(), + format!("no such server `{name}`"), + ); + }; + + let (tx, rx) = oneshot::channel(); + + let _ = entry + .handle + .tx + .send(SupervisorCmd::Restart { ack: tx }) + .await; + + let _ = rx.await; + + restarted.push(name); + } + + ok_response( + id, + serde_json::to_value(RestartResult { restarted }).unwrap(), + ) + } + } }