test(xy): logs --tail and --follow

Fix a deadlock in the log-stream handler that caused all logs
requests to hang: Connection used a single Mutex<JsonFramed> for
both reads and writes, so the serve loop holding the read lock
blocked the spawned notification task from writing.  Split
Connection into separate reader and writer mutexes.

Also fix a response/notification ordering race: the log task now
waits for an explicit ready signal sent by serve after writing the
LOGS response, ensuring notifications never arrive at the client
before their initiating response.
This commit is contained in:
2026-05-25 12:17:32 +02:00
parent 15791c628b
commit b1e7dea739
7 changed files with 191 additions and 43 deletions
+1 -1
View File
@@ -1,7 +1,7 @@
use crate::envelope::{Incoming, Notification, Request}; use crate::envelope::{Incoming, Notification, Request};
use crate::framing::JsonFramed; use crate::framing::JsonFramed;
use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use serde::de::DeserializeOwned;
use std::path::Path; use std::path::Path;
use thiserror::Error; use thiserror::Error;
use tokio::net::UnixStream; use tokio::net::UnixStream;
+49 -7
View File
@@ -16,9 +16,7 @@ impl JsonFramed {
} }
} }
pub async fn read<T: serde::de::DeserializeOwned>( pub async fn read<T: serde::de::DeserializeOwned>(&mut self) -> std::io::Result<Option<T>> {
&mut self,
) -> std::io::Result<Option<T>> {
let mut buf = String::new(); let mut buf = String::new();
let n = self.reader.read_line(&mut buf).await?; let n = self.reader.read_line(&mut buf).await?;
if n == 0 { if n == 0 {
@@ -38,6 +36,47 @@ impl JsonFramed {
} }
} }
pub struct JsonFramedReader {
inner: BufReader<tokio::net::unix::OwnedReadHalf>,
}
impl JsonFramedReader {
pub async fn read<T: serde::de::DeserializeOwned>(&mut self) -> std::io::Result<Option<T>> {
let mut buf = String::new();
let n = self.inner.read_line(&mut buf).await?;
if n == 0 {
return Ok(None);
}
let v: T = serde_json::from_str(buf.trim_end())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
Ok(Some(v))
}
}
pub struct JsonFramedWriter {
inner: tokio::net::unix::OwnedWriteHalf,
}
impl JsonFramedWriter {
pub async fn write<T: Serialize>(&mut self, value: &T) -> std::io::Result<()> {
let mut bytes = serde_json::to_vec(value)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
bytes.push(b'\n');
self.inner.write_all(&bytes).await?;
self.inner.flush().await
}
}
pub fn split(stream: UnixStream) -> (JsonFramedReader, JsonFramedWriter) {
let (r, w) = stream.into_split();
(
JsonFramedReader {
inner: BufReader::new(r),
},
JsonFramedWriter { inner: w },
)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@@ -61,10 +100,13 @@ mod tests {
.await .await
.unwrap(); .unwrap();
let got: Option<M> = sb.read().await.unwrap(); let got: Option<M> = sb.read().await.unwrap();
assert_eq!(got, Some(M { assert_eq!(
x: 1, got,
name: "hi".into() Some(M {
})); x: 1,
name: "hi".into()
})
);
} }
#[tokio::test] #[tokio::test]
+3 -3
View File
@@ -7,8 +7,8 @@ pub mod server;
pub use client::{Client, ClientError}; pub use client::{Client, ClientError};
pub use envelope::{ pub use envelope::{
err_response, notification, ok_response, request, Incoming, Notification, Request, Response, Incoming, Notification, Request, Response, RpcError, err_response, notification, ok_response,
RpcError, request,
}; };
pub use framing::JsonFramed; pub use framing::JsonFramed;
pub use server::{bind, Connection}; pub use server::{Connection, bind};
+9 -6
View File
@@ -1,33 +1,36 @@
use crate::envelope::{Incoming, Notification, Response}; use crate::envelope::{Incoming, Notification, Response};
use crate::framing::JsonFramed; use crate::framing::{JsonFramedReader, JsonFramedWriter};
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use tokio::net::{UnixListener, UnixStream}; use tokio::net::{UnixListener, UnixStream};
use tokio::sync::Mutex; use tokio::sync::Mutex;
pub struct Connection { pub struct Connection {
inner: Arc<Mutex<JsonFramed>>, reader: Mutex<JsonFramedReader>,
writer: Arc<Mutex<JsonFramedWriter>>,
} }
impl Connection { impl Connection {
pub fn new(stream: UnixStream) -> Self { pub fn new(stream: UnixStream) -> Self {
let (reader, writer) = crate::framing::split(stream);
Self { Self {
inner: Arc::new(Mutex::new(JsonFramed::new(stream))), reader: Mutex::new(reader),
writer: Arc::new(Mutex::new(writer)),
} }
} }
pub async fn read_incoming(&self) -> std::io::Result<Option<Incoming>> { pub async fn read_incoming(&self) -> std::io::Result<Option<Incoming>> {
let mut g = self.inner.lock().await; let mut g = self.reader.lock().await;
g.read::<Incoming>().await g.read::<Incoming>().await
} }
pub async fn write_response(&self, r: &Response) -> std::io::Result<()> { pub async fn write_response(&self, r: &Response) -> std::io::Result<()> {
let mut g = self.inner.lock().await; let mut g = self.writer.lock().await;
g.write(r).await g.write(r).await
} }
pub async fn write_notification(&self, n: &Notification) -> std::io::Result<()> { pub async fn write_notification(&self, n: &Notification) -> std::io::Result<()> {
let mut g = self.inner.lock().await; let mut g = self.writer.lock().await;
g.write(n).await g.write(n).await
} }
} }
+50 -16
View File
@@ -44,9 +44,13 @@ pub async fn serve(conn: Arc<Connection>, reg: Registry, _paths: Paths) -> std::
}; };
if let Incoming::Request(req) = incoming { if let Incoming::Request(req) = incoming {
let resp = handle_request(req, &reg, &conn, &state).await; let (resp, log_ready) = handle_request(req, &reg, &conn, &state).await;
conn.write_response(&resp).await?; conn.write_response(&resp).await?;
if let Some(tx) = log_ready {
let _ = tx.send(());
}
} }
} }
} }
@@ -65,17 +69,19 @@ impl ApiError {
} }
} }
type LogReadyTx = tokio::sync::oneshot::Sender<()>;
async fn handle_request( async fn handle_request(
req: Request, req: Request,
reg: &Registry, reg: &Registry,
conn: &Arc<Connection>, conn: &Arc<Connection>,
state: &Arc<ConnState>, state: &Arc<ConnState>,
) -> Response { ) -> (Response, Option<LogReadyTx>) {
let id = req.id.clone(); let id = req.id.clone();
let method = req.method.as_str(); let method = req.method.as_str();
let params = req.params.unwrap_or(serde_json::Value::Null); let params = req.params.unwrap_or(serde_json::Value::Null);
match method { let resp = match method {
methods::LIST => match list(reg).await { methods::LIST => match list(reg).await {
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()), Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
Err(err) => err_response(id, err.code, err.message), Err(err) => err_response(id, err.code, err.message),
@@ -83,7 +89,12 @@ async fn handle_request(
methods::STATUS => { methods::STATUS => {
let p: xy_protocol::rpc::StatusParams = match serde_json::from_value(params) { let p: xy_protocol::rpc::StatusParams = match serde_json::from_value(params) {
Ok(p) => p, Ok(p) => p,
Err(err) => return err_response(id, -32602, format!("invalid params: {err}")), Err(err) => {
return (
err_response(id, -32602, format!("invalid params: {err}")),
None,
);
}
}; };
match status(reg, &p.name).await { match status(reg, &p.name).await {
@@ -101,24 +112,37 @@ async fn handle_request(
methods::LOGS => { methods::LOGS => {
let p: LogsParams = match serde_json::from_value(params) { let p: LogsParams = match serde_json::from_value(params) {
Ok(p) => p, Ok(p) => p,
Err(err) => return err_response(id, -32602, format!("invalid params: {err}")), Err(err) => {
return (
err_response(id, -32602, format!("invalid params: {err}")),
None,
);
}
}; };
match start_log_stream(reg, conn.clone(), state.clone(), p).await { match start_log_stream(reg, conn.clone(), state.clone(), p).await {
Ok(sub_id) => ok_response( Ok((sub_id, ready_tx)) => {
id, let resp = ok_response(
serde_json::to_value(LogsSubscribed { id,
subscription_id: sub_id, serde_json::to_value(LogsSubscribed {
}) subscription_id: sub_id,
.unwrap(), })
), .unwrap(),
);
return (resp, Some(ready_tx));
}
Err(err) => err_response(id, err.code, err.message), Err(err) => err_response(id, err.code, err.message),
} }
} }
methods::LOGS_CANCEL => { methods::LOGS_CANCEL => {
let p: LogsCancelParams = match serde_json::from_value(params) { let p: LogsCancelParams = match serde_json::from_value(params) {
Ok(p) => p, Ok(p) => p,
Err(err) => return err_response(id, -32602, format!("invalid params: {err}")), Err(err) => {
return (
err_response(id, -32602, format!("invalid params: {err}")),
None,
);
}
}; };
let mut subs = state.subs.lock().await; let mut subs = state.subs.lock().await;
@@ -130,7 +154,9 @@ async fn handle_request(
ok_response(id, serde_json::json!({})) ok_response(id, serde_json::json!({}))
} }
other => err_response(id, -32601, format!("unknown method `{other}`")), other => err_response(id, -32601, format!("unknown method `{other}`")),
} };
(resp, None)
} }
async fn list(reg: &Registry) -> Result<Vec<ServerSummary>, ApiError> { async fn list(reg: &Registry) -> Result<Vec<ServerSummary>, ApiError> {
@@ -425,7 +451,7 @@ async fn start_log_stream(
conn: Arc<Connection>, conn: Arc<Connection>,
state: Arc<ConnState>, state: Arc<ConnState>,
p: LogsParams, p: LogsParams,
) -> Result<u64, ApiError> { ) -> Result<(u64, LogReadyTx), ApiError> {
let Some(entry) = reg.get(&p.name).await else { let Some(entry) = reg.get(&p.name).await else {
return Err(ApiError::rpc( return Err(ApiError::rpc(
RpcErrorCode::ServerNotFound, RpcErrorCode::ServerNotFound,
@@ -441,7 +467,15 @@ async fn start_log_stream(
let tail = p.tail; let tail = p.tail;
let name = p.name.clone(); let name = p.name.clone();
let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
// Wait until `serve` has written the LOGS response before sending any
// LOG notifications. Without this the spawned task can race ahead of
// `conn.write_response` and emit notifications that the client
// discards while still awaiting the response.
let _ = ready_rx.await;
for line in sink.ring.snapshot_tail(tail) { for line in sink.ring.snapshot_tail(tail) {
let n = xy_ipc::envelope::notification( let n = xy_ipc::envelope::notification(
notifications::LOG, notifications::LOG,
@@ -497,5 +531,5 @@ async fn start_log_stream(
state.subs.lock().await.insert(sub_id, task); state.subs.lock().await.insert(sub_id, task);
Ok(sub_id) Ok((sub_id, ready_tx))
} }
+30 -10
View File
@@ -1,10 +1,10 @@
#![allow(dead_code)] // not all helpers are used by every test file #![allow(dead_code)] // not all helpers are used by every test file
use std::path::PathBuf; use std::path::PathBuf;
use std::process::Stdio; use std::process::Stdio;
use std::time::Duration; use std::time::Duration;
use tokio::process::{Child, Command};
use tempfile::TempDir; use tempfile::TempDir;
use tokio::process::{Child, Command};
pub struct Harness { pub struct Harness {
pub tmp: TempDir, pub tmp: TempDir,
@@ -25,7 +25,13 @@ impl Harness {
std::fs::create_dir_all(&config_dir).unwrap(); std::fs::create_dir_all(&config_dir).unwrap();
std::fs::create_dir_all(&state_dir).unwrap(); std::fs::create_dir_all(&state_dir).unwrap();
let socket = tmp.path().join("run/xy.sock"); let socket = tmp.path().join("run/xy.sock");
Self { tmp, config_dir, state_dir, socket, daemon: None } Self {
tmp,
config_dir,
state_dir,
socket,
daemon: None,
}
} }
pub fn write_server(&self, name: &str, command: &str, port: u16, restart_policy: &str) { pub fn write_server(&self, name: &str, command: &str, port: u16, restart_policy: &str) {
@@ -49,7 +55,9 @@ impl Harness {
self.daemon = Some(child); self.daemon = Some(child);
let deadline = std::time::Instant::now() + Duration::from_secs(5); let deadline = std::time::Instant::now() + Duration::from_secs(5);
while !self.socket.exists() { while !self.socket.exists() {
if std::time::Instant::now() > deadline { panic!("daemon socket never appeared"); } if std::time::Instant::now() > deadline {
panic!("daemon socket never appeared");
}
tokio::time::sleep(Duration::from_millis(25)).await; tokio::time::sleep(Duration::from_millis(25)).await;
} }
} }
@@ -60,7 +68,9 @@ impl Harness {
.env("XDG_CONFIG_HOME", self.tmp.path().join("config")) .env("XDG_CONFIG_HOME", self.tmp.path().join("config"))
.env("XDG_STATE_HOME", self.tmp.path().join("state")) .env("XDG_STATE_HOME", self.tmp.path().join("state"))
.env("XDG_RUNTIME_DIR", self.tmp.path().join("run")) .env("XDG_RUNTIME_DIR", self.tmp.path().join("run"))
.output().await.expect("run cli"); .output()
.await
.expect("run cli");
let code = out.status.code().unwrap_or(-1); let code = out.status.code().unwrap_or(-1);
let stdout = String::from_utf8_lossy(&out.stdout).to_string(); let stdout = String::from_utf8_lossy(&out.stdout).to_string();
let stderr = String::from_utf8_lossy(&out.stderr).to_string(); let stderr = String::from_utf8_lossy(&out.stderr).to_string();
@@ -68,15 +78,25 @@ impl Harness {
} }
} }
pub fn xy_bin() -> PathBuf { artifact("xy") } pub fn xy_bin() -> PathBuf {
pub fn sleep_server_bin() -> PathBuf { artifact("xy-test-sleep-server") } artifact("xy")
pub fn exit_failure_bin() -> PathBuf { artifact("xy-test-exit-failure") } }
pub fn sleep_server_bin() -> PathBuf {
artifact("xy-test-sleep-server")
}
pub fn exit_failure_bin() -> PathBuf {
artifact("xy-test-exit-failure")
}
fn artifact(name: &str) -> PathBuf { fn artifact(name: &str) -> PathBuf {
let mut p = std::env::current_exe().unwrap(); let mut p = std::env::current_exe().unwrap();
p.pop(); p.pop();
if p.ends_with("deps") { p.pop(); } if p.ends_with("deps") {
p.pop();
}
p.push(name); p.push(name);
if !p.exists() { panic!("artifact `{}` not found at {}", name, p.display()); } if !p.exists() {
panic!("artifact `{}` not found at {}", name, p.display());
}
p p
} }
+49
View File
@@ -0,0 +1,49 @@
mod common;
use common::*;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
#[tokio::test]
async fn logs_tail_prints_existing_lines() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("svc", sleeper.to_str().unwrap(), 19_030, "always");
h.start_daemon(&xy).await;
tokio::time::sleep(Duration::from_millis(500)).await;
let (code, out, _e) = h.run_cli(&xy, &["logs", "svc", "--tail", "10"]).await;
assert_eq!(code, 0);
assert!(out.contains("ready"), "stdout: {out}");
}
#[tokio::test]
async fn logs_follow_streams_new_lines() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("svc", sleeper.to_str().unwrap(), 19_031, "always");
h.start_daemon(&xy).await;
let mut child = Command::new(&xy)
.args(["logs", "svc", "--follow"])
.env("XDG_CONFIG_HOME", h.tmp.path().join("config"))
.env("XDG_STATE_HOME", h.tmp.path().join("state"))
.env("XDG_RUNTIME_DIR", h.tmp.path().join("run"))
.stdout(Stdio::piped())
.stderr(Stdio::null())
.kill_on_drop(true)
.spawn()
.unwrap();
let stdout = child.stdout.take().unwrap();
let mut lines = BufReader::new(stdout).lines();
let first = tokio::time::timeout(Duration::from_secs(2), lines.next_line())
.await
.expect("timeout waiting for first log line")
.unwrap();
assert!(first.is_some());
let _ = tokio::time::timeout(Duration::from_secs(2), child.kill()).await;
}