diff --git a/crates/xy-ipc/src/client.rs b/crates/xy-ipc/src/client.rs index 3125247..560442b 100644 --- a/crates/xy-ipc/src/client.rs +++ b/crates/xy-ipc/src/client.rs @@ -1,7 +1,7 @@ use crate::envelope::{Incoming, Notification, Request}; use crate::framing::JsonFramed; -use serde::de::DeserializeOwned; use serde::Serialize; +use serde::de::DeserializeOwned; use std::path::Path; use thiserror::Error; use tokio::net::UnixStream; diff --git a/crates/xy-ipc/src/framing.rs b/crates/xy-ipc/src/framing.rs index 5deb74e..3b4155d 100644 --- a/crates/xy-ipc/src/framing.rs +++ b/crates/xy-ipc/src/framing.rs @@ -16,9 +16,7 @@ impl JsonFramed { } } - pub async fn read( - &mut self, - ) -> std::io::Result> { + pub async fn read(&mut self) -> std::io::Result> { let mut buf = String::new(); let n = self.reader.read_line(&mut buf).await?; if n == 0 { @@ -38,6 +36,47 @@ impl JsonFramed { } } +pub struct JsonFramedReader { + inner: BufReader, +} + +impl JsonFramedReader { + pub async fn read(&mut self) -> std::io::Result> { + let mut buf = String::new(); + let n = self.inner.read_line(&mut buf).await?; + if n == 0 { + return Ok(None); + } + let v: T = serde_json::from_str(buf.trim_end()) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + Ok(Some(v)) + } +} + +pub struct JsonFramedWriter { + inner: tokio::net::unix::OwnedWriteHalf, +} + +impl JsonFramedWriter { + pub async fn write(&mut self, value: &T) -> std::io::Result<()> { + let mut bytes = serde_json::to_vec(value) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + bytes.push(b'\n'); + self.inner.write_all(&bytes).await?; + self.inner.flush().await + } +} + +pub fn split(stream: UnixStream) -> (JsonFramedReader, JsonFramedWriter) { + let (r, w) = stream.into_split(); + ( + JsonFramedReader { + inner: BufReader::new(r), + }, + JsonFramedWriter { inner: w }, + ) +} + #[cfg(test)] mod tests { use super::*; @@ -61,10 +100,13 @@ mod tests { .await .unwrap(); let got: Option = sb.read().await.unwrap(); - assert_eq!(got, Some(M { - x: 1, - name: "hi".into() - })); + assert_eq!( + got, + Some(M { + x: 1, + name: "hi".into() + }) + ); } #[tokio::test] diff --git a/crates/xy-ipc/src/lib.rs b/crates/xy-ipc/src/lib.rs index 8629b3b..3685f6d 100644 --- a/crates/xy-ipc/src/lib.rs +++ b/crates/xy-ipc/src/lib.rs @@ -7,8 +7,8 @@ pub mod server; pub use client::{Client, ClientError}; pub use envelope::{ - err_response, notification, ok_response, request, Incoming, Notification, Request, Response, - RpcError, + Incoming, Notification, Request, Response, RpcError, err_response, notification, ok_response, + request, }; pub use framing::JsonFramed; -pub use server::{bind, Connection}; +pub use server::{Connection, bind}; diff --git a/crates/xy-ipc/src/server.rs b/crates/xy-ipc/src/server.rs index fcead9b..16f4b7e 100644 --- a/crates/xy-ipc/src/server.rs +++ b/crates/xy-ipc/src/server.rs @@ -1,33 +1,36 @@ use crate::envelope::{Incoming, Notification, Response}; -use crate::framing::JsonFramed; +use crate::framing::{JsonFramedReader, JsonFramedWriter}; use std::path::Path; use std::sync::Arc; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::Mutex; pub struct Connection { - inner: Arc>, + reader: Mutex, + writer: Arc>, } impl Connection { pub fn new(stream: UnixStream) -> Self { + let (reader, writer) = crate::framing::split(stream); Self { - inner: Arc::new(Mutex::new(JsonFramed::new(stream))), + reader: Mutex::new(reader), + writer: Arc::new(Mutex::new(writer)), } } pub async fn read_incoming(&self) -> std::io::Result> { - let mut g = self.inner.lock().await; + let mut g = self.reader.lock().await; g.read::().await } pub async fn write_response(&self, r: &Response) -> std::io::Result<()> { - let mut g = self.inner.lock().await; + let mut g = self.writer.lock().await; g.write(r).await } pub async fn write_notification(&self, n: &Notification) -> std::io::Result<()> { - let mut g = self.inner.lock().await; + let mut g = self.writer.lock().await; g.write(n).await } } diff --git a/crates/xy/src/daemon/handlers.rs b/crates/xy/src/daemon/handlers.rs index a77b707..7fae441 100644 --- a/crates/xy/src/daemon/handlers.rs +++ b/crates/xy/src/daemon/handlers.rs @@ -44,9 +44,13 @@ pub async fn serve(conn: Arc, reg: Registry, _paths: Paths) -> std:: }; if let Incoming::Request(req) = incoming { - let resp = handle_request(req, ®, &conn, &state).await; + let (resp, log_ready) = handle_request(req, ®, &conn, &state).await; conn.write_response(&resp).await?; + + if let Some(tx) = log_ready { + let _ = tx.send(()); + } } } } @@ -65,17 +69,19 @@ impl ApiError { } } +type LogReadyTx = tokio::sync::oneshot::Sender<()>; + async fn handle_request( req: Request, reg: &Registry, conn: &Arc, state: &Arc, -) -> Response { +) -> (Response, Option) { let id = req.id.clone(); let method = req.method.as_str(); let params = req.params.unwrap_or(serde_json::Value::Null); - match method { + let resp = match method { methods::LIST => match list(reg).await { Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()), Err(err) => err_response(id, err.code, err.message), @@ -83,7 +89,12 @@ async fn handle_request( methods::STATUS => { let p: xy_protocol::rpc::StatusParams = match serde_json::from_value(params) { Ok(p) => p, - Err(err) => return err_response(id, -32602, format!("invalid params: {err}")), + Err(err) => { + return ( + err_response(id, -32602, format!("invalid params: {err}")), + None, + ); + } }; match status(reg, &p.name).await { @@ -101,24 +112,37 @@ async fn handle_request( methods::LOGS => { let p: LogsParams = match serde_json::from_value(params) { Ok(p) => p, - Err(err) => return err_response(id, -32602, format!("invalid params: {err}")), + Err(err) => { + return ( + err_response(id, -32602, format!("invalid params: {err}")), + None, + ); + } }; match start_log_stream(reg, conn.clone(), state.clone(), p).await { - Ok(sub_id) => ok_response( - id, - serde_json::to_value(LogsSubscribed { - subscription_id: sub_id, - }) - .unwrap(), - ), + Ok((sub_id, ready_tx)) => { + let resp = ok_response( + id, + serde_json::to_value(LogsSubscribed { + subscription_id: sub_id, + }) + .unwrap(), + ); + return (resp, Some(ready_tx)); + } Err(err) => err_response(id, err.code, err.message), } } methods::LOGS_CANCEL => { let p: LogsCancelParams = match serde_json::from_value(params) { Ok(p) => p, - Err(err) => return err_response(id, -32602, format!("invalid params: {err}")), + Err(err) => { + return ( + err_response(id, -32602, format!("invalid params: {err}")), + None, + ); + } }; let mut subs = state.subs.lock().await; @@ -130,7 +154,9 @@ async fn handle_request( ok_response(id, serde_json::json!({})) } other => err_response(id, -32601, format!("unknown method `{other}`")), - } + }; + + (resp, None) } async fn list(reg: &Registry) -> Result, ApiError> { @@ -425,7 +451,7 @@ async fn start_log_stream( conn: Arc, state: Arc, p: LogsParams, -) -> Result { +) -> Result<(u64, LogReadyTx), ApiError> { let Some(entry) = reg.get(&p.name).await else { return Err(ApiError::rpc( RpcErrorCode::ServerNotFound, @@ -441,7 +467,15 @@ async fn start_log_stream( let tail = p.tail; let name = p.name.clone(); + let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>(); + let task = tokio::spawn(async move { + // Wait until `serve` has written the LOGS response before sending any + // LOG notifications. Without this the spawned task can race ahead of + // `conn.write_response` and emit notifications that the client + // discards while still awaiting the response. + let _ = ready_rx.await; + for line in sink.ring.snapshot_tail(tail) { let n = xy_ipc::envelope::notification( notifications::LOG, @@ -497,5 +531,5 @@ async fn start_log_stream( state.subs.lock().await.insert(sub_id, task); - Ok(sub_id) + Ok((sub_id, ready_tx)) } diff --git a/crates/xy/tests/common/mod.rs b/crates/xy/tests/common/mod.rs index d85b375..665e56a 100644 --- a/crates/xy/tests/common/mod.rs +++ b/crates/xy/tests/common/mod.rs @@ -1,10 +1,10 @@ -#![allow(dead_code)] // not all helpers are used by every test file +#![allow(dead_code)] // not all helpers are used by every test file use std::path::PathBuf; use std::process::Stdio; use std::time::Duration; -use tokio::process::{Child, Command}; use tempfile::TempDir; +use tokio::process::{Child, Command}; pub struct Harness { pub tmp: TempDir, @@ -25,7 +25,13 @@ impl Harness { std::fs::create_dir_all(&config_dir).unwrap(); std::fs::create_dir_all(&state_dir).unwrap(); let socket = tmp.path().join("run/xy.sock"); - Self { tmp, config_dir, state_dir, socket, daemon: None } + Self { + tmp, + config_dir, + state_dir, + socket, + daemon: None, + } } pub fn write_server(&self, name: &str, command: &str, port: u16, restart_policy: &str) { @@ -49,7 +55,9 @@ impl Harness { self.daemon = Some(child); let deadline = std::time::Instant::now() + Duration::from_secs(5); while !self.socket.exists() { - if std::time::Instant::now() > deadline { panic!("daemon socket never appeared"); } + if std::time::Instant::now() > deadline { + panic!("daemon socket never appeared"); + } tokio::time::sleep(Duration::from_millis(25)).await; } } @@ -60,7 +68,9 @@ impl Harness { .env("XDG_CONFIG_HOME", self.tmp.path().join("config")) .env("XDG_STATE_HOME", self.tmp.path().join("state")) .env("XDG_RUNTIME_DIR", self.tmp.path().join("run")) - .output().await.expect("run cli"); + .output() + .await + .expect("run cli"); let code = out.status.code().unwrap_or(-1); let stdout = String::from_utf8_lossy(&out.stdout).to_string(); let stderr = String::from_utf8_lossy(&out.stderr).to_string(); @@ -68,15 +78,25 @@ impl Harness { } } -pub fn xy_bin() -> PathBuf { artifact("xy") } -pub fn sleep_server_bin() -> PathBuf { artifact("xy-test-sleep-server") } -pub fn exit_failure_bin() -> PathBuf { artifact("xy-test-exit-failure") } +pub fn xy_bin() -> PathBuf { + artifact("xy") +} +pub fn sleep_server_bin() -> PathBuf { + artifact("xy-test-sleep-server") +} +pub fn exit_failure_bin() -> PathBuf { + artifact("xy-test-exit-failure") +} fn artifact(name: &str) -> PathBuf { let mut p = std::env::current_exe().unwrap(); p.pop(); - if p.ends_with("deps") { p.pop(); } + if p.ends_with("deps") { + p.pop(); + } p.push(name); - if !p.exists() { panic!("artifact `{}` not found at {}", name, p.display()); } + if !p.exists() { + panic!("artifact `{}` not found at {}", name, p.display()); + } p } diff --git a/crates/xy/tests/logs.rs b/crates/xy/tests/logs.rs new file mode 100644 index 0000000..fb32f11 --- /dev/null +++ b/crates/xy/tests/logs.rs @@ -0,0 +1,49 @@ +mod common; +use common::*; +use std::process::Stdio; +use std::time::Duration; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::Command; + +#[tokio::test] +async fn logs_tail_prints_existing_lines() { + let xy = xy_bin(); + let sleeper = sleep_server_bin(); + let mut h = Harness::new(); + h.write_server("svc", sleeper.to_str().unwrap(), 19_030, "always"); + h.start_daemon(&xy).await; + + tokio::time::sleep(Duration::from_millis(500)).await; + let (code, out, _e) = h.run_cli(&xy, &["logs", "svc", "--tail", "10"]).await; + assert_eq!(code, 0); + assert!(out.contains("ready"), "stdout: {out}"); +} + +#[tokio::test] +async fn logs_follow_streams_new_lines() { + let xy = xy_bin(); + let sleeper = sleep_server_bin(); + let mut h = Harness::new(); + h.write_server("svc", sleeper.to_str().unwrap(), 19_031, "always"); + h.start_daemon(&xy).await; + + let mut child = Command::new(&xy) + .args(["logs", "svc", "--follow"]) + .env("XDG_CONFIG_HOME", h.tmp.path().join("config")) + .env("XDG_STATE_HOME", h.tmp.path().join("state")) + .env("XDG_RUNTIME_DIR", h.tmp.path().join("run")) + .stdout(Stdio::piped()) + .stderr(Stdio::null()) + .kill_on_drop(true) + .spawn() + .unwrap(); + + let stdout = child.stdout.take().unwrap(); + let mut lines = BufReader::new(stdout).lines(); + let first = tokio::time::timeout(Duration::from_secs(2), lines.next_line()) + .await + .expect("timeout waiting for first log line") + .unwrap(); + assert!(first.is_some()); + let _ = tokio::time::timeout(Duration::from_secs(2), child.kill()).await; +}