feat(ipc): newline-delimited JSON framing
This commit is contained in:
@@ -0,0 +1,78 @@
|
|||||||
|
use serde::Serialize;
|
||||||
|
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||||
|
use tokio::net::UnixStream;
|
||||||
|
|
||||||
|
pub struct JsonFramed {
|
||||||
|
reader: BufReader<tokio::net::unix::OwnedReadHalf>,
|
||||||
|
writer: tokio::net::unix::OwnedWriteHalf,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JsonFramed {
|
||||||
|
pub fn new(stream: UnixStream) -> Self {
|
||||||
|
let (r, w) = stream.into_split();
|
||||||
|
Self {
|
||||||
|
reader: BufReader::new(r),
|
||||||
|
writer: w,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read<T: serde::de::DeserializeOwned>(
|
||||||
|
&mut self,
|
||||||
|
) -> std::io::Result<Option<T>> {
|
||||||
|
let mut buf = String::new();
|
||||||
|
let n = self.reader.read_line(&mut buf).await?;
|
||||||
|
if n == 0 {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
let v: T = serde_json::from_str(buf.trim_end())
|
||||||
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
|
||||||
|
Ok(Some(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn write<T: Serialize>(&mut self, value: &T) -> std::io::Result<()> {
|
||||||
|
let mut bytes = serde_json::to_vec(value)
|
||||||
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
|
||||||
|
bytes.push(b'\n');
|
||||||
|
self.writer.write_all(&bytes).await?;
|
||||||
|
self.writer.flush().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||||
|
struct M {
|
||||||
|
x: u32,
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn round_trip_over_socket_pair() {
|
||||||
|
let (a, b) = UnixStream::pair().unwrap();
|
||||||
|
let mut sa = JsonFramed::new(a);
|
||||||
|
let mut sb = JsonFramed::new(b);
|
||||||
|
sa.write(&M {
|
||||||
|
x: 1,
|
||||||
|
name: "hi".into(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let got: Option<M> = sb.read().await.unwrap();
|
||||||
|
assert_eq!(got, Some(M {
|
||||||
|
x: 1,
|
||||||
|
name: "hi".into()
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn eof_returns_none() {
|
||||||
|
let (a, b) = UnixStream::pair().unwrap();
|
||||||
|
drop(a);
|
||||||
|
let mut sb = JsonFramed::new(b);
|
||||||
|
let got: Option<M> = sb.read().await.unwrap();
|
||||||
|
assert!(got.is_none());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
//! JSON-RPC 2.0 over newline-delimited JSON on a Unix socket.
|
//! JSON-RPC 2.0 over newline-delimited JSON on a Unix socket.
|
||||||
|
|
||||||
pub mod envelope;
|
pub mod envelope;
|
||||||
|
pub mod framing;
|
||||||
|
|
||||||
pub use envelope::{
|
pub use envelope::{
|
||||||
err_response, notification, ok_response, request, Incoming, Notification, Request, Response,
|
err_response, notification, ok_response, request, Incoming, Notification, Request, Response,
|
||||||
RpcError,
|
RpcError,
|
||||||
};
|
};
|
||||||
|
pub use framing::JsonFramed;
|
||||||
|
|||||||
Reference in New Issue
Block a user