feat(xy): RPC handlers for list/status/start/stop/restart
Per-connection JSON-RPC dispatch in daemon/handlers.rs — list, status, start, stop, and restart are fully implemented; reload, logs, and logs_cancel are stubbed with -32601 for later tasks. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,8 +1,248 @@
|
|||||||
use crate::daemon::registry::Registry;
|
use crate::daemon::registry::Registry;
|
||||||
use crate::paths::Paths;
|
use crate::paths::Paths;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
use xy_ipc::Connection;
|
use xy_ipc::Connection;
|
||||||
|
use xy_ipc::envelope::{Incoming, Request, Response, err_response, ok_response};
|
||||||
|
use xy_protocol::RpcErrorCode;
|
||||||
|
use xy_protocol::rpc::{
|
||||||
|
NameOrAll, RestartResult, ServerSummary, StartResult, StatusDetail, StopResult, methods,
|
||||||
|
};
|
||||||
|
use xy_supervisor::supervisor::{StartAck, StopAck, SupervisorCmd};
|
||||||
|
|
||||||
pub async fn serve(_conn: Arc<Connection>, _reg: Registry, _paths: Paths) -> std::io::Result<()> {
|
pub async fn serve(conn: Arc<Connection>, reg: Registry, _paths: Paths) -> std::io::Result<()> {
|
||||||
Ok(())
|
loop {
|
||||||
|
let Some(incoming) = conn.read_incoming().await? else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
match incoming {
|
||||||
|
Incoming::Request(req) => {
|
||||||
|
let resp = handle_request(req, ®).await;
|
||||||
|
|
||||||
|
conn.write_response(&resp).await?;
|
||||||
|
}
|
||||||
|
_ => continue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ApiError {
|
||||||
|
code: i32,
|
||||||
|
message: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ApiError {
|
||||||
|
fn rpc(code: RpcErrorCode, msg: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
code: code.as_i32(),
|
||||||
|
message: msg.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_request(req: Request, reg: &Registry) -> Response {
|
||||||
|
let id = req.id.clone();
|
||||||
|
let method = req.method.as_str();
|
||||||
|
let params = req.params.unwrap_or(serde_json::Value::Null);
|
||||||
|
|
||||||
|
match method {
|
||||||
|
methods::LIST => match list(reg).await {
|
||||||
|
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
|
||||||
|
Err(err) => err_response(id, err.code, err.message),
|
||||||
|
},
|
||||||
|
methods::STATUS => {
|
||||||
|
let p: xy_protocol::rpc::StatusParams = match serde_json::from_value(params) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(err) => return err_response(id, -32602, format!("invalid params: {err}")),
|
||||||
|
};
|
||||||
|
|
||||||
|
match status(reg, &p.name).await {
|
||||||
|
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
|
||||||
|
Err(err) => err_response(id, err.code, err.message),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
methods::START => dispatch_lifecycle(id, params, reg, Op::Start).await,
|
||||||
|
methods::STOP => dispatch_lifecycle(id, params, reg, Op::Stop).await,
|
||||||
|
methods::RESTART => dispatch_lifecycle(id, params, reg, Op::Restart).await,
|
||||||
|
methods::RELOAD => err_response(id, -32601, "reload not yet implemented".into()),
|
||||||
|
methods::LOGS => err_response(id, -32601, "logs not yet implemented".into()),
|
||||||
|
methods::LOGS_CANCEL => err_response(id, -32601, "logs_cancel not yet implemented".into()),
|
||||||
|
other => err_response(id, -32601, format!("unknown method `{other}`")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list(reg: &Registry) -> Result<Vec<ServerSummary>, ApiError> {
|
||||||
|
let mut out = Vec::new();
|
||||||
|
|
||||||
|
for (name, entry) in reg.snapshot().await {
|
||||||
|
out.push(ServerSummary {
|
||||||
|
name,
|
||||||
|
state: *entry.handle.state.borrow(),
|
||||||
|
pid: None,
|
||||||
|
port: 0,
|
||||||
|
uptime_secs: None,
|
||||||
|
restart_count: 0,
|
||||||
|
last_exit: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn status(reg: &Registry, name: &str) -> Result<StatusDetail, ApiError> {
|
||||||
|
let Some(entry) = reg.get(name).await else {
|
||||||
|
return Err(ApiError::rpc(
|
||||||
|
RpcErrorCode::ServerNotFound,
|
||||||
|
format!("no such server `{name}`"),
|
||||||
|
));
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(StatusDetail {
|
||||||
|
summary: ServerSummary {
|
||||||
|
name: entry.handle.name.clone(),
|
||||||
|
state: *entry.handle.state.borrow(),
|
||||||
|
pid: None,
|
||||||
|
port: 0,
|
||||||
|
uptime_secs: None,
|
||||||
|
restart_count: 0,
|
||||||
|
last_exit: None,
|
||||||
|
},
|
||||||
|
recent_transitions: Vec::new(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
enum Op {
|
||||||
|
Start,
|
||||||
|
Stop,
|
||||||
|
Restart,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn dispatch_lifecycle(
|
||||||
|
id: serde_json::Value,
|
||||||
|
params: serde_json::Value,
|
||||||
|
reg: &Registry,
|
||||||
|
op: Op,
|
||||||
|
) -> Response {
|
||||||
|
let p: NameOrAll = match serde_json::from_value(params) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(err) => return err_response(id, -32602, format!("invalid params: {err}")),
|
||||||
|
};
|
||||||
|
|
||||||
|
let targets: Vec<String> = match p {
|
||||||
|
NameOrAll::All { all } if all => reg.names().await,
|
||||||
|
NameOrAll::Name { name } => vec![name],
|
||||||
|
NameOrAll::All { .. } => return err_response(id, -32602, "must set all=true".into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
match op {
|
||||||
|
Op::Start => {
|
||||||
|
let mut started = Vec::new();
|
||||||
|
let mut already = Vec::new();
|
||||||
|
|
||||||
|
for name in targets {
|
||||||
|
let Some(entry) = reg.get(&name).await else {
|
||||||
|
return err_response(
|
||||||
|
id,
|
||||||
|
RpcErrorCode::ServerNotFound.as_i32(),
|
||||||
|
format!("no such server `{name}`"),
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let _ = entry.handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
|
||||||
|
|
||||||
|
match rx.await {
|
||||||
|
Ok(StartAck::Started) => started.push(name),
|
||||||
|
Ok(StartAck::AlreadyRunning) => already.push(name),
|
||||||
|
Err(_) => {
|
||||||
|
return err_response(
|
||||||
|
id,
|
||||||
|
RpcErrorCode::SpawnFailed.as_i32(),
|
||||||
|
format!("supervisor for `{name}` dropped"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ok_response(
|
||||||
|
id,
|
||||||
|
serde_json::to_value(StartResult {
|
||||||
|
started,
|
||||||
|
already_running: already,
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Op::Stop => {
|
||||||
|
let mut stopped = Vec::new();
|
||||||
|
let mut not_running = Vec::new();
|
||||||
|
|
||||||
|
for name in targets {
|
||||||
|
let Some(entry) = reg.get(&name).await else {
|
||||||
|
return err_response(
|
||||||
|
id,
|
||||||
|
RpcErrorCode::ServerNotFound.as_i32(),
|
||||||
|
format!("no such server `{name}`"),
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let _ = entry.handle.tx.send(SupervisorCmd::Stop { ack: tx }).await;
|
||||||
|
|
||||||
|
match rx.await {
|
||||||
|
Ok(StopAck::Stopped) => stopped.push(name),
|
||||||
|
Ok(StopAck::NotRunning) => not_running.push(name),
|
||||||
|
Err(_) => {
|
||||||
|
return err_response(
|
||||||
|
id,
|
||||||
|
RpcErrorCode::SpawnFailed.as_i32(),
|
||||||
|
format!("supervisor for `{name}` dropped"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ok_response(
|
||||||
|
id,
|
||||||
|
serde_json::to_value(StopResult {
|
||||||
|
stopped,
|
||||||
|
not_running,
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Op::Restart => {
|
||||||
|
let mut restarted = Vec::new();
|
||||||
|
|
||||||
|
for name in targets {
|
||||||
|
let Some(entry) = reg.get(&name).await else {
|
||||||
|
return err_response(
|
||||||
|
id,
|
||||||
|
RpcErrorCode::ServerNotFound.as_i32(),
|
||||||
|
format!("no such server `{name}`"),
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let _ = entry
|
||||||
|
.handle
|
||||||
|
.tx
|
||||||
|
.send(SupervisorCmd::Restart { ack: tx })
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let _ = rx.await;
|
||||||
|
|
||||||
|
restarted.push(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
ok_response(
|
||||||
|
id,
|
||||||
|
serde_json::to_value(RestartResult { restarted }).unwrap(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user