From b434c636a6ead1a682108a0b9a6decce5f7a3f42 Mon Sep 17 00:00:00 2001 From: Anders Olsson Date: Mon, 25 May 2026 11:59:46 +0200 Subject: [PATCH] feat(xy): logs streaming via subscription notifications Implement per-connection ConnState tracking active subscriptions, and the logs/logs_cancel RPC handlers. Snapshot-only streams terminate with a log_end notification; follow streams forward broadcast lines until cancelled or connection close. Co-Authored-By: Claude Sonnet 4.6 --- crates/xy/src/daemon/handlers.rs | 158 +++++++++++++++++++++++++++++-- 1 file changed, 148 insertions(+), 10 deletions(-) diff --git a/crates/xy/src/daemon/handlers.rs b/crates/xy/src/daemon/handlers.rs index 53059b2..a77b707 100644 --- a/crates/xy/src/daemon/handlers.rs +++ b/crates/xy/src/daemon/handlers.rs @@ -1,28 +1,52 @@ use crate::daemon::registry::Registry; use crate::paths::Paths; +use std::collections::HashMap; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use tokio::sync::Mutex; use tokio::sync::oneshot; +use tokio::task::JoinHandle; use xy_ipc::Connection; use xy_ipc::envelope::{Incoming, Request, Response, err_response, ok_response}; use xy_protocol::RpcErrorCode; use xy_protocol::rpc::{ - NameOrAll, RestartResult, ServerSummary, StartResult, StatusDetail, StopResult, methods, + LogEnd, LogLine, LogsCancelParams, LogsParams, LogsSubscribed, NameOrAll, RestartResult, + ServerSummary, StartResult, StatusDetail, StopResult, methods, notifications, }; use xy_supervisor::supervisor::{StartAck, StopAck, SupervisorCmd}; +pub struct ConnState { + pub subs: Mutex>>, + pub next: AtomicU64, +} + +impl ConnState { + pub fn new() -> Self { + Self { + subs: Mutex::new(HashMap::new()), + next: AtomicU64::new(1), + } + } +} + pub async fn serve(conn: Arc, reg: Registry, _paths: Paths) -> std::io::Result<()> { + let state = Arc::new(ConnState::new()); + loop { let Some(incoming) = conn.read_incoming().await? else { + let mut subs = state.subs.lock().await; + + for (_, h) in subs.drain() { + h.abort(); + } + return Ok(()); }; - match incoming { - Incoming::Request(req) => { - let resp = handle_request(req, ®).await; + if let Incoming::Request(req) = incoming { + let resp = handle_request(req, ®, &conn, &state).await; - conn.write_response(&resp).await?; - } - _ => continue, + conn.write_response(&resp).await?; } } } @@ -41,7 +65,12 @@ impl ApiError { } } -async fn handle_request(req: Request, reg: &Registry) -> Response { +async fn handle_request( + req: Request, + reg: &Registry, + conn: &Arc, + state: &Arc, +) -> Response { let id = req.id.clone(); let method = req.method.as_str(); let params = req.params.unwrap_or(serde_json::Value::Null); @@ -69,8 +98,37 @@ async fn handle_request(req: Request, reg: &Registry) -> Response { Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()), Err(e) => err_response(id, e.code, e.message), }, - methods::LOGS => err_response(id, -32601, "logs not yet implemented".into()), - methods::LOGS_CANCEL => err_response(id, -32601, "logs_cancel not yet implemented".into()), + methods::LOGS => { + let p: LogsParams = match serde_json::from_value(params) { + Ok(p) => p, + Err(err) => return err_response(id, -32602, format!("invalid params: {err}")), + }; + + match start_log_stream(reg, conn.clone(), state.clone(), p).await { + Ok(sub_id) => ok_response( + id, + serde_json::to_value(LogsSubscribed { + subscription_id: sub_id, + }) + .unwrap(), + ), + Err(err) => err_response(id, err.code, err.message), + } + } + methods::LOGS_CANCEL => { + let p: LogsCancelParams = match serde_json::from_value(params) { + Ok(p) => p, + Err(err) => return err_response(id, -32602, format!("invalid params: {err}")), + }; + + let mut subs = state.subs.lock().await; + + if let Some(h) = subs.remove(&p.subscription_id) { + h.abort(); + } + + ok_response(id, serde_json::json!({})) + } other => err_response(id, -32601, format!("unknown method `{other}`")), } } @@ -361,3 +419,83 @@ async fn reload(reg: &Registry) -> Result { unchanged, }) } + +async fn start_log_stream( + reg: &Registry, + conn: Arc, + state: Arc, + p: LogsParams, +) -> Result { + let Some(entry) = reg.get(&p.name).await else { + return Err(ApiError::rpc( + RpcErrorCode::ServerNotFound, + format!("no such server `{}`", p.name), + )); + }; + + let sub_id = state.next.fetch_add(1, Ordering::Relaxed); + let sink = entry.handle.log_sink.clone(); + let conn2 = conn.clone(); + let state2 = state.clone(); + let follow = p.follow; + let tail = p.tail; + let name = p.name.clone(); + + let task = tokio::spawn(async move { + for line in sink.ring.snapshot_tail(tail) { + let n = xy_ipc::envelope::notification( + notifications::LOG, + Some( + serde_json::to_value(LogLine { + subscription_id: sub_id, + name: name.clone(), + stream: line.stream, + line: line.line, + ts_unix_ms: line.ts_unix_ms, + }) + .unwrap(), + ), + ); + + if conn2.write_notification(&n).await.is_err() { + return; + } + } + + if !follow { + let end = xy_ipc::envelope::notification( + notifications::LOG_END, + Some( + serde_json::to_value(LogEnd { + subscription_id: sub_id, + }) + .unwrap(), + ), + ); + + let _ = conn2.write_notification(&end).await; + + state2.subs.lock().await.remove(&sub_id); + + return; + } + + let mut rx = sink.broadcast.subscribe(); + + while let Ok(mut line) = rx.recv().await { + line.subscription_id = sub_id; + let n = xy_ipc::envelope::notification( + notifications::LOG, + Some(serde_json::to_value(&line).unwrap()), + ); + + if conn2.write_notification(&n).await.is_err() { + break; + } + } + }); + + state.subs.lock().await.insert(sub_id, task); + + Ok(sub_id) +}