feat(xy): logs streaming via subscription notifications

Implement per-connection ConnState tracking active subscriptions, and the
logs/logs_cancel RPC handlers. Snapshot-only streams terminate with a
log_end notification; follow streams forward broadcast lines until
cancelled or connection close.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 11:59:46 +02:00
parent c679465f12
commit b434c636a6
+148 -10
View File
@@ -1,28 +1,52 @@
use crate::daemon::registry::Registry;
use crate::paths::Paths;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use xy_ipc::Connection;
use xy_ipc::envelope::{Incoming, Request, Response, err_response, ok_response};
use xy_protocol::RpcErrorCode;
use xy_protocol::rpc::{
NameOrAll, RestartResult, ServerSummary, StartResult, StatusDetail, StopResult, methods,
LogEnd, LogLine, LogsCancelParams, LogsParams, LogsSubscribed, NameOrAll, RestartResult,
ServerSummary, StartResult, StatusDetail, StopResult, methods, notifications,
};
use xy_supervisor::supervisor::{StartAck, StopAck, SupervisorCmd};
pub struct ConnState {
pub subs: Mutex<HashMap<u64, JoinHandle<()>>>,
pub next: AtomicU64,
}
impl ConnState {
pub fn new() -> Self {
Self {
subs: Mutex::new(HashMap::new()),
next: AtomicU64::new(1),
}
}
}
pub async fn serve(conn: Arc<Connection>, reg: Registry, _paths: Paths) -> std::io::Result<()> {
let state = Arc::new(ConnState::new());
loop {
let Some(incoming) = conn.read_incoming().await? else {
let mut subs = state.subs.lock().await;
for (_, h) in subs.drain() {
h.abort();
}
return Ok(());
};
match incoming {
Incoming::Request(req) => {
let resp = handle_request(req, &reg).await;
if let Incoming::Request(req) = incoming {
let resp = handle_request(req, &reg, &conn, &state).await;
conn.write_response(&resp).await?;
}
_ => continue,
conn.write_response(&resp).await?;
}
}
}
@@ -41,7 +65,12 @@ impl ApiError {
}
}
async fn handle_request(req: Request, reg: &Registry) -> Response {
async fn handle_request(
req: Request,
reg: &Registry,
conn: &Arc<Connection>,
state: &Arc<ConnState>,
) -> Response {
let id = req.id.clone();
let method = req.method.as_str();
let params = req.params.unwrap_or(serde_json::Value::Null);
@@ -69,8 +98,37 @@ async fn handle_request(req: Request, reg: &Registry) -> Response {
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
Err(e) => err_response(id, e.code, e.message),
},
methods::LOGS => err_response(id, -32601, "logs not yet implemented".into()),
methods::LOGS_CANCEL => err_response(id, -32601, "logs_cancel not yet implemented".into()),
methods::LOGS => {
let p: LogsParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(err) => return err_response(id, -32602, format!("invalid params: {err}")),
};
match start_log_stream(reg, conn.clone(), state.clone(), p).await {
Ok(sub_id) => ok_response(
id,
serde_json::to_value(LogsSubscribed {
subscription_id: sub_id,
})
.unwrap(),
),
Err(err) => err_response(id, err.code, err.message),
}
}
methods::LOGS_CANCEL => {
let p: LogsCancelParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(err) => return err_response(id, -32602, format!("invalid params: {err}")),
};
let mut subs = state.subs.lock().await;
if let Some(h) = subs.remove(&p.subscription_id) {
h.abort();
}
ok_response(id, serde_json::json!({}))
}
other => err_response(id, -32601, format!("unknown method `{other}`")),
}
}
@@ -361,3 +419,83 @@ async fn reload(reg: &Registry) -> Result<ReloadResult, ApiError> {
unchanged,
})
}
async fn start_log_stream(
reg: &Registry,
conn: Arc<Connection>,
state: Arc<ConnState>,
p: LogsParams,
) -> Result<u64, ApiError> {
let Some(entry) = reg.get(&p.name).await else {
return Err(ApiError::rpc(
RpcErrorCode::ServerNotFound,
format!("no such server `{}`", p.name),
));
};
let sub_id = state.next.fetch_add(1, Ordering::Relaxed);
let sink = entry.handle.log_sink.clone();
let conn2 = conn.clone();
let state2 = state.clone();
let follow = p.follow;
let tail = p.tail;
let name = p.name.clone();
let task = tokio::spawn(async move {
for line in sink.ring.snapshot_tail(tail) {
let n = xy_ipc::envelope::notification(
notifications::LOG,
Some(
serde_json::to_value(LogLine {
subscription_id: sub_id,
name: name.clone(),
stream: line.stream,
line: line.line,
ts_unix_ms: line.ts_unix_ms,
})
.unwrap(),
),
);
if conn2.write_notification(&n).await.is_err() {
return;
}
}
if !follow {
let end = xy_ipc::envelope::notification(
notifications::LOG_END,
Some(
serde_json::to_value(LogEnd {
subscription_id: sub_id,
})
.unwrap(),
),
);
let _ = conn2.write_notification(&end).await;
state2.subs.lock().await.remove(&sub_id);
return;
}
let mut rx = sink.broadcast.subscribe();
while let Ok(mut line) = rx.recv().await {
line.subscription_id = sub_id;
let n = xy_ipc::envelope::notification(
notifications::LOG,
Some(serde_json::to_value(&line).unwrap()),
);
if conn2.write_notification(&n).await.is_err() {
break;
}
}
});
state.subs.lock().await.insert(sub_id, task);
Ok(sub_id)
}