From fbfb1db4270857e870f979f844cadd0a454bf785 Mon Sep 17 00:00:00 2001 From: Anders Olsson Date: Mon, 25 May 2026 11:47:11 +0200 Subject: [PATCH] feat(ipc): client with call + notification reader --- crates/xy-ipc/src/client.rs | 127 ++++++++++++++++++++++++++++++++++++ crates/xy-ipc/src/lib.rs | 2 + 2 files changed, 129 insertions(+) create mode 100644 crates/xy-ipc/src/client.rs diff --git a/crates/xy-ipc/src/client.rs b/crates/xy-ipc/src/client.rs new file mode 100644 index 0000000..3125247 --- /dev/null +++ b/crates/xy-ipc/src/client.rs @@ -0,0 +1,127 @@ +use crate::envelope::{Incoming, Notification, Request}; +use crate::framing::JsonFramed; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::path::Path; +use thiserror::Error; +use tokio::net::UnixStream; + +#[derive(Debug, Error)] +pub enum ClientError { + #[error("io: {0}")] + Io(#[from] std::io::Error), + #[error("rpc error {code}: {message}")] + Rpc { code: i32, message: String }, + #[error("unexpected message kind from daemon")] + Unexpected, + #[error("daemon unreachable: {0}")] + Unreachable(std::io::Error), + #[error("serialization: {0}")] + Serde(#[from] serde_json::Error), +} + +pub struct Client { + framed: JsonFramed, + next_id: u64, +} + +impl Client { + pub async fn connect(socket_path: &Path) -> Result { + let stream = UnixStream::connect(socket_path) + .await + .map_err(ClientError::Unreachable)?; + Ok(Self { + framed: JsonFramed::new(stream), + next_id: 1, + }) + } + + pub async fn call( + &mut self, + method: &str, + params: &P, + ) -> Result { + let id = self.next_id; + self.next_id += 1; + + let params_val = serde_json::to_value(params)?; + let req = crate::envelope::request(id, method, Some(params_val)); + self.framed.write(&req).await?; + + loop { + let msg: Option = self.framed.read().await?; + let Some(msg) = msg else { + return Err(ClientError::Unreachable(std::io::Error::from( + std::io::ErrorKind::UnexpectedEof, + ))); + }; + + match msg { + Incoming::Response(r) => { + if r.id != serde_json::json!(id) { + return Err(ClientError::Unexpected); + } + if let Some(err) = r.error { + return Err(ClientError::Rpc { + code: err.code, + message: err.message, + }); + } + let result = r.result.unwrap_or(serde_json::Value::Null); + return Ok(serde_json::from_value(result)?); + } + Incoming::Notification(_) => continue, + Incoming::Request(_) => return Err(ClientError::Unexpected), + } + } + } + + pub async fn call_no_params( + &mut self, + method: &str, + ) -> Result { + let id = self.next_id; + self.next_id += 1; + + let req = Request { + jsonrpc: "2.0".into(), + id: serde_json::json!(id), + method: method.into(), + params: None, + }; + self.framed.write(&req).await?; + + let msg: Option = self.framed.read().await?; + let Some(Incoming::Response(r)) = msg else { + return Err(ClientError::Unexpected); + }; + + if let Some(err) = r.error { + return Err(ClientError::Rpc { + code: err.code, + message: err.message, + }); + } + + Ok(serde_json::from_value( + r.result.unwrap_or(serde_json::Value::Null), + )?) + } + + pub async fn read_notification(&mut self) -> Result, ClientError> { + loop { + let msg: Option = self.framed.read().await?; + match msg { + None => return Ok(None), + Some(Incoming::Notification(n)) => return Ok(Some(n)), + Some(Incoming::Response(_)) => continue, + Some(Incoming::Request(_)) => return Err(ClientError::Unexpected), + } + } + } + + pub async fn send_notification(&mut self, n: &Notification) -> Result<(), ClientError> { + self.framed.write(n).await?; + Ok(()) + } +} diff --git a/crates/xy-ipc/src/lib.rs b/crates/xy-ipc/src/lib.rs index 97963c7..bc716a6 100644 --- a/crates/xy-ipc/src/lib.rs +++ b/crates/xy-ipc/src/lib.rs @@ -1,8 +1,10 @@ //! JSON-RPC 2.0 over newline-delimited JSON on a Unix socket. +pub mod client; pub mod envelope; pub mod framing; +pub use client::{Client, ClientError}; pub use envelope::{ err_response, notification, ok_response, request, Incoming, Notification, Request, Response, RpcError,