b1e7dea739
Fix a deadlock in the log-stream handler that caused all logs requests to hang: Connection used a single Mutex<JsonFramed> for both reads and writes, so the serve loop holding the read lock blocked the spawned notification task from writing. Split Connection into separate reader and writer mutexes. Also fix a response/notification ordering race: the log task now waits for an explicit ready signal sent by serve after writing the LOGS response, ensuring notifications never arrive at the client before their initiating response.
71 lines
1.9 KiB
Rust
71 lines
1.9 KiB
Rust
use crate::envelope::{Incoming, Notification, Response};
|
|
use crate::framing::{JsonFramedReader, JsonFramedWriter};
|
|
use std::path::Path;
|
|
use std::sync::Arc;
|
|
use tokio::net::{UnixListener, UnixStream};
|
|
use tokio::sync::Mutex;
|
|
|
|
pub struct Connection {
|
|
reader: Mutex<JsonFramedReader>,
|
|
writer: Arc<Mutex<JsonFramedWriter>>,
|
|
}
|
|
|
|
impl Connection {
|
|
pub fn new(stream: UnixStream) -> Self {
|
|
let (reader, writer) = crate::framing::split(stream);
|
|
Self {
|
|
reader: Mutex::new(reader),
|
|
writer: Arc::new(Mutex::new(writer)),
|
|
}
|
|
}
|
|
|
|
pub async fn read_incoming(&self) -> std::io::Result<Option<Incoming>> {
|
|
let mut g = self.reader.lock().await;
|
|
g.read::<Incoming>().await
|
|
}
|
|
|
|
pub async fn write_response(&self, r: &Response) -> std::io::Result<()> {
|
|
let mut g = self.writer.lock().await;
|
|
g.write(r).await
|
|
}
|
|
|
|
pub async fn write_notification(&self, n: &Notification) -> std::io::Result<()> {
|
|
let mut g = self.writer.lock().await;
|
|
g.write(n).await
|
|
}
|
|
}
|
|
|
|
pub fn bind(socket_path: &Path) -> std::io::Result<UnixListener> {
|
|
if socket_path.exists() {
|
|
std::fs::remove_file(socket_path)?;
|
|
}
|
|
|
|
if let Some(parent) = socket_path.parent() {
|
|
std::fs::create_dir_all(parent)?;
|
|
}
|
|
|
|
let listener = UnixListener::bind(socket_path)?;
|
|
|
|
use std::os::unix::fs::PermissionsExt;
|
|
std::fs::set_permissions(socket_path, std::fs::Permissions::from_mode(0o600))?;
|
|
|
|
Ok(listener)
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use tempfile::tempdir;
|
|
|
|
#[tokio::test]
|
|
async fn bind_creates_socket_with_0600() {
|
|
let dir = tempdir().unwrap();
|
|
let path = dir.path().join("x.sock");
|
|
let _listener = bind(&path).unwrap();
|
|
|
|
use std::os::unix::fs::PermissionsExt;
|
|
let mode = std::fs::metadata(&path).unwrap().permissions().mode() & 0o777;
|
|
assert_eq!(mode, 0o600);
|
|
}
|
|
}
|