From e58b6866eff975b41116ca741feb0a2924501be0 Mon Sep 17 00:00:00 2001 From: Anders Olsson Date: Mon, 25 May 2026 11:45:50 +0200 Subject: [PATCH] feat(ipc): newline-delimited JSON framing --- crates/xy-ipc/src/framing.rs | 78 ++++++++++++++++++++++++++++++++++++ crates/xy-ipc/src/lib.rs | 2 + 2 files changed, 80 insertions(+) create mode 100644 crates/xy-ipc/src/framing.rs diff --git a/crates/xy-ipc/src/framing.rs b/crates/xy-ipc/src/framing.rs new file mode 100644 index 0000000..5deb74e --- /dev/null +++ b/crates/xy-ipc/src/framing.rs @@ -0,0 +1,78 @@ +use serde::Serialize; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixStream; + +pub struct JsonFramed { + reader: BufReader, + writer: tokio::net::unix::OwnedWriteHalf, +} + +impl JsonFramed { + pub fn new(stream: UnixStream) -> Self { + let (r, w) = stream.into_split(); + Self { + reader: BufReader::new(r), + writer: w, + } + } + + pub async fn read( + &mut self, + ) -> std::io::Result> { + let mut buf = String::new(); + let n = self.reader.read_line(&mut buf).await?; + if n == 0 { + return Ok(None); + } + let v: T = serde_json::from_str(buf.trim_end()) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + Ok(Some(v)) + } + + pub async fn write(&mut self, value: &T) -> std::io::Result<()> { + let mut bytes = serde_json::to_vec(value) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + bytes.push(b'\n'); + self.writer.write_all(&bytes).await?; + self.writer.flush().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde::Deserialize; + + #[derive(Debug, Serialize, Deserialize, PartialEq)] + struct M { + x: u32, + name: String, + } + + #[tokio::test] + async fn round_trip_over_socket_pair() { + let (a, b) = UnixStream::pair().unwrap(); + let mut sa = JsonFramed::new(a); + let mut sb = JsonFramed::new(b); + sa.write(&M { + x: 1, + name: "hi".into(), + }) + .await + .unwrap(); + let got: Option = sb.read().await.unwrap(); + assert_eq!(got, Some(M { + x: 1, + name: "hi".into() + })); + } + + #[tokio::test] + async fn eof_returns_none() { + let (a, b) = UnixStream::pair().unwrap(); + drop(a); + let mut sb = JsonFramed::new(b); + let got: Option = sb.read().await.unwrap(); + assert!(got.is_none()); + } +} diff --git a/crates/xy-ipc/src/lib.rs b/crates/xy-ipc/src/lib.rs index 59c3f78..97963c7 100644 --- a/crates/xy-ipc/src/lib.rs +++ b/crates/xy-ipc/src/lib.rs @@ -1,8 +1,10 @@ //! JSON-RPC 2.0 over newline-delimited JSON on a Unix socket. pub mod envelope; +pub mod framing; pub use envelope::{ err_response, notification, ok_response, request, Incoming, Notification, Request, Response, RpcError, }; +pub use framing::JsonFramed;