Compare commits

..

45 Commits

Author SHA1 Message Date
logaritmisk 40ffd9b06c fix(pidfile): replace stale pidfile after unclean shutdown
PidFile::acquire used create_new(true) with Drop-based cleanup, so a
pidfile surviving power loss or SIGKILL made the daemon refuse to start
until the file was deleted by hand.

On AlreadyExists, read the recorded PID and probe it with kill(pid, 0):
ESRCH (or unparseable content) means stale, so remove the file and
retry the atomic create. A live PID keeps the refusal and now names the
holding process. The retry loop is bounded to stay race-safe against a
concurrent starter.

Closes #1

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 18:16:54 +02:00
logaritmisk 00c7e7e812 feat(paths): auto-create config dir on daemon startup
ensure_dirs() now creates config_dir alongside state_dir and log_dir,
so first daemon run materializes $XDG_CONFIG_HOME/xy/servers/ — making
it obvious where to drop server .kdl files.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 12:44:47 +02:00
logaritmisk 740b8b4c84 Merge feat/mcp-supervisor: HTTP MCP server supervisor MVP
37 planned tasks plus 3 follow-up fixes from final code review.

Architecture:
- Cargo workspace: xy-protocol, xy-supervisor, xy-ipc, xy (single binary)
- Unix socket + newline-delimited JSON-RPC 2.0
- Per-server KDL configs at XDG paths (XDG on macOS via etcetera)
- One supervisor task per managed server, owning all state
- Per-server log capture: rotating disk + ring buffer + broadcast stream

Features:
- Daemon auto-launches all configured servers on boot
- start/stop/restart (single or --all), reload (diff added/removed/changed),
  list/status, logs (--tail / --follow)
- Per-server restart policy (always/on-failure/never) with exponential
  backoff, sliding 60s retry window, and Failed state on cap
- Graceful shutdown via SIGTERM/SIGINT, SIGKILL escalation after grace
- 51 tests: unit (state machine via MockChild, KDL parser, framing) +
  integration (real daemon + helper bins exercising lifecycle/reload/
  restart-cap/logs)

Bugs found and fixed during execution:
- Connection deadlock from single shared read/write mutex (split into
  separate reader/writer halves)
- LOGS response vs notification ordering race (oneshot gate)
- StartAck::Started returned even on spawn failure (added SpawnFailed)
- Backoff sleep blocked the supervisor's command channel (interruptible
  select)
- list/status returned zeroed fields (now publish full Status via watch)
2026-05-25 13:22:28 +02:00
logaritmisk 4a0b32d90e fix(supervisor): StartAck::SpawnFailed surfaces real failures
Add StartAck::SpawnFailed(String) so callers can distinguish a successful
start from a failed spawn. The Start command arm now sends SpawnFailed on
io::Error rather than the misleading Started. handlers.rs maps the new
variant to an RpcErrorCode::SpawnFailed JSON-RPC error response.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 12:32:07 +02:00
logaritmisk b366df0482 fix(supervisor): make backoff sleep interruptible by Stop/Shutdown
Replace the bare sleep(delay).await in the Restart backoff arm with a
tokio::select! over the timer and cmd_rx. Stop/Shutdown are now handled
immediately during backoff (Stop → Stopped, Shutdown → clean exit);
Start/Restart/Reconfigure skip the remaining delay and retry at once.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 12:31:32 +02:00
logaritmisk 3e4ad79137 fix(supervisor): publish full status (pid, port, uptime, restart_count, last_exit) via watch channel
Replace watch::Receiver<ServerState> on SupervisorHandle with watch::Receiver<Status>,
a richer snapshot type that carries pid, port, uptime_secs, restart_count and last_exit.
SupervisorTask maintains current_pid and publishes a fresh Status on every state
transition; handlers.rs reads the full Status so list/status no longer return
zeroed/None fields.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 12:30:56 +02:00
logaritmisk ae6ed1cf0a chore: remove stray libnull.rlib and gitignore *.rlib
Accidentally committed in 9d5d8f0 during the polish task.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 12:25:13 +02:00
logaritmisk 0261d58d5d docs: README and example KDL config 2026-05-25 12:19:34 +02:00
logaritmisk 9d5d8f04a2 chore: clippy fixes - allow should_implement_trait and collapse nested if 2026-05-25 12:19:24 +02:00
logaritmisk b1e7dea739 test(xy): logs --tail and --follow
Fix a deadlock in the log-stream handler that caused all logs
requests to hang: Connection used a single Mutex<JsonFramed> for
both reads and writes, so the serve loop holding the read lock
blocked the spawned notification task from writing.  Split
Connection into separate reader and writer mutexes.

Also fix a response/notification ordering race: the log task now
waits for an explicit ready signal sent by serve after writing the
LOGS response, ensuring notifications never arrive at the client
before their initiating response.
2026-05-25 12:17:32 +02:00
logaritmisk 15791c628b test(xy): reload diff 2026-05-25 12:05:58 +02:00
logaritmisk 284b6e7402 test(xy): restart cap escalates to failed 2026-05-25 12:05:45 +02:00
logaritmisk 434828c14e test(xy): auto-start + stop/start lifecycle 2026-05-25 12:05:28 +02:00
logaritmisk 48d63a0549 test(xy): integration test harness 2026-05-25 12:03:38 +02:00
logaritmisk 7107977637 test(xy): helper binaries for integration tests 2026-05-25 12:03:13 +02:00
logaritmisk c1f6225e26 feat(xy): CLI client commands
Replace bail!("not implemented") stubs with real RPC calls over the Unix
socket; add format::list_table for fixed-width list output.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 12:02:09 +02:00
logaritmisk b434c636a6 feat(xy): logs streaming via subscription notifications
Implement per-connection ConnState tracking active subscriptions, and the
logs/logs_cancel RPC handlers. Snapshot-only streams terminate with a
log_end notification; follow streams forward broadcast lines until
cancelled or connection close.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 11:59:46 +02:00
logaritmisk c679465f12 feat(xy): reload handler with diff
Implements the `reload` JSON-RPC method: diffs the on-disk config dir
against the in-memory registry and reconciles — stops removed servers,
restarts changed servers (shutdown-then-respawn), and starts new ones.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 11:56:45 +02:00
logaritmisk 736e6d1854 feat(xy): RPC handlers for list/status/start/stop/restart
Per-connection JSON-RPC dispatch in daemon/handlers.rs — list, status,
start, stop, and restart are fully implemented; reload, logs, and
logs_cancel are stubbed with -32601 for later tasks.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 11:54:52 +02:00
logaritmisk 3ab982aea1 feat(xy): daemon boot + accept loop + graceful shutdown
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 11:52:41 +02:00
logaritmisk d7aa543ac0 feat(xy): daemon Registry with config-hash entries 2026-05-25 11:49:58 +02:00
logaritmisk 71808783c4 feat(xy): clap CLI scaffold 2026-05-25 11:49:47 +02:00
logaritmisk 49c006df10 feat(xy): exclusive pidfile guard 2026-05-25 11:48:38 +02:00
logaritmisk 58c44e0b48 feat(xy): XDG path resolution 2026-05-25 11:48:36 +02:00
logaritmisk b137f85a0c feat(ipc): server bind + Connection wrapper 2026-05-25 11:47:24 +02:00
logaritmisk fbfb1db427 feat(ipc): client with call + notification reader 2026-05-25 11:47:11 +02:00
logaritmisk e58b6866ef feat(ipc): newline-delimited JSON framing 2026-05-25 11:45:50 +02:00
logaritmisk 53f6b82f2b feat(ipc): JSON-RPC envelope types 2026-05-25 11:45:37 +02:00
logaritmisk a3c979511e feat(supervisor): supervisor task with state machine
One async task per managed server owns all state transitions via a
tokio::select! loop over cmd_rx and wait_child. Includes RealSpawner
and a smoke test covering the Start → Running → exit → Stopped →
Shutdown happy path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 11:44:12 +02:00
logaritmisk f1b2306156 feat(supervisor): RealChild + spawn_with_logs
Append RealChild (real tokio::process::Child wrapper) and spawn_with_logs
to child.rs. Uses nix::unistd::setpgid via tokio's re-exported pre_exec
to create an own process group, and fires per-stream log pump tasks that
drain stdout/stderr into the provided LogSink. terminate/kill signal the
whole process group via kill(-pgid, SIG*).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 11:40:19 +02:00
logaritmisk e121fe28bb feat(supervisor): LogSink fans out to file, ring buffer, broadcast 2026-05-25 11:37:19 +02:00
logaritmisk 7995a53e82 feat(supervisor): ring buffer for recent log lines 2026-05-25 11:36:52 +02:00
logaritmisk d51f25350c feat(supervisor): rotating log writer 2026-05-25 11:36:23 +02:00
logaritmisk d237e980e9 feat(supervisor): sliding retry-window tracker 2026-05-25 11:34:55 +02:00
logaritmisk 54045da2df feat(supervisor): exponential backoff calculator 2026-05-25 11:34:53 +02:00
logaritmisk 4837a73167 feat(supervisor): restart-policy decision logic 2026-05-25 11:34:50 +02:00
logaritmisk 1d2848f03a feat(supervisor): ChildHandle trait + MockChild 2026-05-25 11:33:23 +02:00
logaritmisk bd926061bf feat(protocol): JSON-RPC method param/result types 2026-05-25 11:31:56 +02:00
logaritmisk e8f5846cec feat(protocol): load_all_configs from dir with duplicate port detection 2026-05-25 11:30:38 +02:00
logaritmisk 7e59d7d050 feat(protocol): KDL parser for ServerConfig
Adds kdl_parse module with parse_server_config() that deserialises a
KDL document into ServerConfig, with full validation of name, types,
durations, and restart/stop blocks. Also derives Default on
RestartPolicy to satisfy clippy.
2026-05-25 11:29:05 +02:00
logaritmisk 355d0debda feat(protocol): ServerConfig + ConfigError + RpcErrorCode 2026-05-25 11:23:57 +02:00
logaritmisk 5a0963665d feat(protocol): RestartPolicy/RestartConfig/StopConfig with defaults 2026-05-25 11:22:52 +02:00
logaritmisk 0e49834c93 feat(protocol): ServerState enum 2026-05-25 11:21:43 +02:00
logaritmisk 1b76378b37 chore: bump workspace resolver to "3"
cargo 1.95 supports resolver 3; align with plan spec.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 11:20:54 +02:00
logaritmisk 5b1314b0af chore: convert to cargo workspace with four crates 2026-05-25 11:17:24 +02:00
44 changed files with 5302 additions and 8 deletions
+1
View File
@@ -1 +1,2 @@
/target /target
*.rlib
Generated
+1140
View File
File diff suppressed because it is too large Load Diff
+34 -5
View File
@@ -1,6 +1,35 @@
[package] [workspace]
name = "xy" resolver = "3"
version = "0.1.0" members = [
edition = "2024" "crates/xy-protocol",
"crates/xy-supervisor",
"crates/xy-ipc",
"crates/xy",
]
[dependencies] [workspace.package]
edition = "2024"
version = "0.1.0"
license = "MIT OR Apache-2.0"
[workspace.dependencies]
xy-protocol = { path = "crates/xy-protocol" }
xy-supervisor = { path = "crates/xy-supervisor" }
xy-ipc = { path = "crates/xy-ipc" }
tokio = { version = "1", features = ["rt-multi-thread", "net", "process", "signal", "sync", "fs", "io-util", "macros", "time"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "2"
anyhow = "1"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
clap = { version = "4", features = ["derive"] }
kdl = "6"
etcetera = "0.10"
nix = { version = "0.30", features = ["signal", "process"] }
humantime = "2"
humantime-serde = "1"
async-trait = "0.1"
tempfile = "3"
tokio-test = "0.4"
+26
View File
@@ -0,0 +1,26 @@
# xy — HTTP MCP server supervisor
Daemon + CLI that launches and supervises HTTP-based MCP servers.
## Build
cargo build --release
## Run
target/release/xy daemon # foreground
Drop a server definition into `$XDG_CONFIG_HOME/xy/servers/<name>.kdl`
(see `examples/insikt.kdl`) and `xy reload`.
Commands:
xy list
xy status <name>
xy start <name|--all>
xy stop <name|--all>
xy restart <name|--all>
xy reload
xy logs <name> [--tail N] [--follow]
Exit codes: 0 success, 1 operational error, 2 daemon unreachable, 3 config invalid.
+16
View File
@@ -0,0 +1,16 @@
[package]
name = "xy-ipc"
edition.workspace = true
version.workspace = true
license.workspace = true
[dependencies]
xy-protocol.workspace = true
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
thiserror.workspace = true
[dev-dependencies]
tempfile.workspace = true
+127
View File
@@ -0,0 +1,127 @@
use crate::envelope::{Incoming, Notification, Request};
use crate::framing::JsonFramed;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::path::Path;
use thiserror::Error;
use tokio::net::UnixStream;
#[derive(Debug, Error)]
pub enum ClientError {
#[error("io: {0}")]
Io(#[from] std::io::Error),
#[error("rpc error {code}: {message}")]
Rpc { code: i32, message: String },
#[error("unexpected message kind from daemon")]
Unexpected,
#[error("daemon unreachable: {0}")]
Unreachable(std::io::Error),
#[error("serialization: {0}")]
Serde(#[from] serde_json::Error),
}
pub struct Client {
framed: JsonFramed,
next_id: u64,
}
impl Client {
pub async fn connect(socket_path: &Path) -> Result<Self, ClientError> {
let stream = UnixStream::connect(socket_path)
.await
.map_err(ClientError::Unreachable)?;
Ok(Self {
framed: JsonFramed::new(stream),
next_id: 1,
})
}
pub async fn call<P: Serialize, R: DeserializeOwned>(
&mut self,
method: &str,
params: &P,
) -> Result<R, ClientError> {
let id = self.next_id;
self.next_id += 1;
let params_val = serde_json::to_value(params)?;
let req = crate::envelope::request(id, method, Some(params_val));
self.framed.write(&req).await?;
loop {
let msg: Option<Incoming> = self.framed.read().await?;
let Some(msg) = msg else {
return Err(ClientError::Unreachable(std::io::Error::from(
std::io::ErrorKind::UnexpectedEof,
)));
};
match msg {
Incoming::Response(r) => {
if r.id != serde_json::json!(id) {
return Err(ClientError::Unexpected);
}
if let Some(err) = r.error {
return Err(ClientError::Rpc {
code: err.code,
message: err.message,
});
}
let result = r.result.unwrap_or(serde_json::Value::Null);
return Ok(serde_json::from_value(result)?);
}
Incoming::Notification(_) => continue,
Incoming::Request(_) => return Err(ClientError::Unexpected),
}
}
}
pub async fn call_no_params<R: DeserializeOwned>(
&mut self,
method: &str,
) -> Result<R, ClientError> {
let id = self.next_id;
self.next_id += 1;
let req = Request {
jsonrpc: "2.0".into(),
id: serde_json::json!(id),
method: method.into(),
params: None,
};
self.framed.write(&req).await?;
let msg: Option<Incoming> = self.framed.read().await?;
let Some(Incoming::Response(r)) = msg else {
return Err(ClientError::Unexpected);
};
if let Some(err) = r.error {
return Err(ClientError::Rpc {
code: err.code,
message: err.message,
});
}
Ok(serde_json::from_value(
r.result.unwrap_or(serde_json::Value::Null),
)?)
}
pub async fn read_notification(&mut self) -> Result<Option<Notification>, ClientError> {
loop {
let msg: Option<Incoming> = self.framed.read().await?;
match msg {
None => return Ok(None),
Some(Incoming::Notification(n)) => return Ok(Some(n)),
Some(Incoming::Response(_)) => continue,
Some(Incoming::Request(_)) => return Err(ClientError::Unexpected),
}
}
}
pub async fn send_notification(&mut self, n: &Notification) -> Result<(), ClientError> {
self.framed.write(n).await?;
Ok(())
}
}
+114
View File
@@ -0,0 +1,114 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Request {
pub jsonrpc: String,
pub id: serde_json::Value,
pub method: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub params: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Notification {
pub jsonrpc: String,
pub method: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub params: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Response {
pub jsonrpc: String,
pub id: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<RpcError>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpcError {
pub code: i32,
pub message: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Incoming {
Request(Request),
Response(Response),
Notification(Notification),
}
pub const JSONRPC_VERSION: &str = "2.0";
pub fn request(id: u64, method: &str, params: Option<Value>) -> Request {
Request {
jsonrpc: JSONRPC_VERSION.into(),
id: serde_json::json!(id),
method: method.into(),
params,
}
}
pub fn notification(method: &str, params: Option<Value>) -> Notification {
Notification {
jsonrpc: JSONRPC_VERSION.into(),
method: method.into(),
params,
}
}
pub fn ok_response(id: serde_json::Value, result: Value) -> Response {
Response {
jsonrpc: JSONRPC_VERSION.into(),
id,
result: Some(result),
error: None,
}
}
pub fn err_response(id: serde_json::Value, code: i32, message: String) -> Response {
Response {
jsonrpc: JSONRPC_VERSION.into(),
id,
result: None,
error: Some(RpcError {
code,
message,
data: None,
}),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn request_round_trip() {
let r = request(7, "list", None);
let s = serde_json::to_string(&r).unwrap();
let back: Request = serde_json::from_str(&s).unwrap();
assert_eq!(back.method, "list");
assert_eq!(back.id, serde_json::json!(7));
}
#[test]
fn incoming_is_response() {
let s = r#"{"jsonrpc":"2.0","id":1,"result":{"ok":true}}"#;
let i: Incoming = serde_json::from_str(s).unwrap();
assert!(matches!(i, Incoming::Response(_)));
}
#[test]
fn incoming_is_notification() {
let s = r#"{"jsonrpc":"2.0","method":"log","params":{}}"#;
let i: Incoming = serde_json::from_str(s).unwrap();
assert!(matches!(i, Incoming::Notification(_)));
}
}
+120
View File
@@ -0,0 +1,120 @@
use serde::Serialize;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
pub struct JsonFramed {
reader: BufReader<tokio::net::unix::OwnedReadHalf>,
writer: tokio::net::unix::OwnedWriteHalf,
}
impl JsonFramed {
pub fn new(stream: UnixStream) -> Self {
let (r, w) = stream.into_split();
Self {
reader: BufReader::new(r),
writer: w,
}
}
pub async fn read<T: serde::de::DeserializeOwned>(&mut self) -> std::io::Result<Option<T>> {
let mut buf = String::new();
let n = self.reader.read_line(&mut buf).await?;
if n == 0 {
return Ok(None);
}
let v: T = serde_json::from_str(buf.trim_end())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
Ok(Some(v))
}
pub async fn write<T: Serialize>(&mut self, value: &T) -> std::io::Result<()> {
let mut bytes = serde_json::to_vec(value)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
bytes.push(b'\n');
self.writer.write_all(&bytes).await?;
self.writer.flush().await
}
}
pub struct JsonFramedReader {
inner: BufReader<tokio::net::unix::OwnedReadHalf>,
}
impl JsonFramedReader {
pub async fn read<T: serde::de::DeserializeOwned>(&mut self) -> std::io::Result<Option<T>> {
let mut buf = String::new();
let n = self.inner.read_line(&mut buf).await?;
if n == 0 {
return Ok(None);
}
let v: T = serde_json::from_str(buf.trim_end())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
Ok(Some(v))
}
}
pub struct JsonFramedWriter {
inner: tokio::net::unix::OwnedWriteHalf,
}
impl JsonFramedWriter {
pub async fn write<T: Serialize>(&mut self, value: &T) -> std::io::Result<()> {
let mut bytes = serde_json::to_vec(value)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
bytes.push(b'\n');
self.inner.write_all(&bytes).await?;
self.inner.flush().await
}
}
pub fn split(stream: UnixStream) -> (JsonFramedReader, JsonFramedWriter) {
let (r, w) = stream.into_split();
(
JsonFramedReader {
inner: BufReader::new(r),
},
JsonFramedWriter { inner: w },
)
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct M {
x: u32,
name: String,
}
#[tokio::test]
async fn round_trip_over_socket_pair() {
let (a, b) = UnixStream::pair().unwrap();
let mut sa = JsonFramed::new(a);
let mut sb = JsonFramed::new(b);
sa.write(&M {
x: 1,
name: "hi".into(),
})
.await
.unwrap();
let got: Option<M> = sb.read().await.unwrap();
assert_eq!(
got,
Some(M {
x: 1,
name: "hi".into()
})
);
}
#[tokio::test]
async fn eof_returns_none() {
let (a, b) = UnixStream::pair().unwrap();
drop(a);
let mut sb = JsonFramed::new(b);
let got: Option<M> = sb.read().await.unwrap();
assert!(got.is_none());
}
}
+14
View File
@@ -0,0 +1,14 @@
//! JSON-RPC 2.0 over newline-delimited JSON on a Unix socket.
pub mod client;
pub mod envelope;
pub mod framing;
pub mod server;
pub use client::{Client, ClientError};
pub use envelope::{
Incoming, Notification, Request, Response, RpcError, err_response, notification, ok_response,
request,
};
pub use framing::JsonFramed;
pub use server::{Connection, bind};
+70
View File
@@ -0,0 +1,70 @@
use crate::envelope::{Incoming, Notification, Response};
use crate::framing::{JsonFramedReader, JsonFramedWriter};
use std::path::Path;
use std::sync::Arc;
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::Mutex;
pub struct Connection {
reader: Mutex<JsonFramedReader>,
writer: Arc<Mutex<JsonFramedWriter>>,
}
impl Connection {
pub fn new(stream: UnixStream) -> Self {
let (reader, writer) = crate::framing::split(stream);
Self {
reader: Mutex::new(reader),
writer: Arc::new(Mutex::new(writer)),
}
}
pub async fn read_incoming(&self) -> std::io::Result<Option<Incoming>> {
let mut g = self.reader.lock().await;
g.read::<Incoming>().await
}
pub async fn write_response(&self, r: &Response) -> std::io::Result<()> {
let mut g = self.writer.lock().await;
g.write(r).await
}
pub async fn write_notification(&self, n: &Notification) -> std::io::Result<()> {
let mut g = self.writer.lock().await;
g.write(n).await
}
}
pub fn bind(socket_path: &Path) -> std::io::Result<UnixListener> {
if socket_path.exists() {
std::fs::remove_file(socket_path)?;
}
if let Some(parent) = socket_path.parent() {
std::fs::create_dir_all(parent)?;
}
let listener = UnixListener::bind(socket_path)?;
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(socket_path, std::fs::Permissions::from_mode(0o600))?;
Ok(listener)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn bind_creates_socket_with_0600() {
let dir = tempdir().unwrap();
let path = dir.path().join("x.sock");
let _listener = bind(&path).unwrap();
use std::os::unix::fs::PermissionsExt;
let mode = std::fs::metadata(&path).unwrap().permissions().mode() & 0o777;
assert_eq!(mode, 0o600);
}
}
+16
View File
@@ -0,0 +1,16 @@
[package]
name = "xy-protocol"
edition.workspace = true
version.workspace = true
license.workspace = true
[dependencies]
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
kdl.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
[dev-dependencies]
tempfile.workspace = true
+103
View File
@@ -0,0 +1,103 @@
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum RestartPolicy {
Always,
#[default]
OnFailure,
Never,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RestartConfig {
#[serde(default)]
pub policy: RestartPolicy,
#[serde(default = "default_backoff_initial", with = "humantime_serde")]
pub backoff_initial: Duration,
#[serde(default = "default_backoff_max", with = "humantime_serde")]
pub backoff_max: Duration,
#[serde(default = "default_max_retries_per_minute")]
pub max_retries_per_minute: u32,
}
fn default_backoff_initial() -> Duration {
Duration::from_secs(1)
}
fn default_backoff_max() -> Duration {
Duration::from_secs(30)
}
fn default_max_retries_per_minute() -> u32 {
5
}
impl Default for RestartConfig {
fn default() -> Self {
Self {
policy: RestartPolicy::default(),
backoff_initial: default_backoff_initial(),
backoff_max: default_backoff_max(),
max_retries_per_minute: default_max_retries_per_minute(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StopConfig {
#[serde(default = "default_grace", with = "humantime_serde")]
pub grace: Duration,
}
fn default_grace() -> Duration {
Duration::from_secs(10)
}
impl Default for StopConfig {
fn default() -> Self {
Self {
grace: default_grace(),
}
}
}
use std::collections::BTreeMap;
use std::path::PathBuf;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServerConfig {
pub name: String,
pub command: PathBuf,
#[serde(default)]
pub args: Vec<String>,
pub port: u16,
#[serde(default)]
pub env: BTreeMap<String, String>,
#[serde(default)]
pub working_dir: Option<PathBuf>,
#[serde(default)]
pub restart: RestartConfig,
#[serde(default)]
pub stop: StopConfig,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn restart_config_defaults() {
let c = RestartConfig::default();
assert_eq!(c.policy, RestartPolicy::OnFailure);
assert_eq!(c.backoff_initial, Duration::from_secs(1));
assert_eq!(c.backoff_max, Duration::from_secs(30));
assert_eq!(c.max_retries_per_minute, 5);
}
#[test]
fn stop_config_defaults() {
assert_eq!(StopConfig::default().grace, Duration::from_secs(10));
}
}
+51
View File
@@ -0,0 +1,51 @@
use std::path::PathBuf;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ConfigError {
#[error("failed to read {path}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("failed to parse KDL in {path}: {message}")]
Parse { path: PathBuf, message: String },
#[error("missing required field `{field}` in {path}")]
MissingField { path: PathBuf, field: &'static str },
#[error("invalid value for `{field}` in {path}: {message}")]
InvalidValue {
path: PathBuf,
field: &'static str,
message: String,
},
#[error("duplicate port {port} declared by both `{name_a}` and `{name_b}`")]
DuplicatePort {
name_a: String,
name_b: String,
port: u16,
},
#[error("server name `{name}` contains invalid characters (allowed: a-z, 0-9, '-', '_')")]
InvalidName { name: String },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RpcErrorCode {
ServerNotFound = -32001,
PortConflict = -32002,
ConfigInvalid = -32003,
AlreadyRunning = -32004,
NotRunning = -32005,
SpawnFailed = -32006,
}
impl RpcErrorCode {
pub fn as_i32(self) -> i32 {
self as i32
}
}
+447
View File
@@ -0,0 +1,447 @@
use crate::{ConfigError, RestartConfig, RestartPolicy, ServerConfig, StopConfig};
use kdl::{KdlDocument, KdlNode};
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::time::Duration;
pub fn parse_server_config(
name: &str,
text: &str,
source_path: &Path,
) -> Result<ServerConfig, ConfigError> {
validate_name(name).map_err(|_| ConfigError::InvalidName {
name: name.to_string(),
})?;
let doc: KdlDocument = text
.parse()
.map_err(|err: kdl::KdlError| ConfigError::Parse {
path: source_path.to_path_buf(),
message: err.to_string(),
})?;
let command = require_string_arg(&doc, "command", source_path)?;
let args = optional_string_args(&doc, "args");
let port = require_u16_arg(&doc, "port", source_path)?;
let env = optional_string_map(&doc, "env");
let working_dir = optional_string_arg(&doc, "working-dir").map(PathBuf::from);
let restart = parse_restart(&doc, source_path)?;
let stop = parse_stop(&doc, source_path)?;
Ok(ServerConfig {
name: name.to_string(),
command: PathBuf::from(command),
args,
port,
env,
working_dir,
restart,
stop,
})
}
fn validate_name(name: &str) -> Result<(), ()> {
if name.is_empty() {
return Err(());
}
if name
.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '_')
{
Ok(())
} else {
Err(())
}
}
fn require_string_arg(
doc: &KdlDocument,
name: &'static str,
path: &Path,
) -> Result<String, ConfigError> {
let node = find_node(doc, name).ok_or(ConfigError::MissingField {
path: path.to_path_buf(),
field: name,
})?;
node.entries()
.first()
.and_then(|e| e.value().as_string().map(str::to_string))
.ok_or(ConfigError::InvalidValue {
path: path.to_path_buf(),
field: name,
message: "expected string argument".into(),
})
}
fn require_u16_arg(doc: &KdlDocument, name: &'static str, path: &Path) -> Result<u16, ConfigError> {
let node = find_node(doc, name).ok_or(ConfigError::MissingField {
path: path.to_path_buf(),
field: name,
})?;
let v = node
.entries()
.first()
.and_then(|e| e.value().as_integer())
.ok_or(ConfigError::InvalidValue {
path: path.to_path_buf(),
field: name,
message: "expected integer".into(),
})?;
u16::try_from(v).map_err(|_| ConfigError::InvalidValue {
path: path.to_path_buf(),
field: name,
message: format!("port {v} out of u16 range"),
})
}
fn optional_string_arg(doc: &KdlDocument, name: &str) -> Option<String> {
find_node(doc, name)
.and_then(|n| n.entries().first())
.and_then(|e| e.value().as_string().map(str::to_string))
}
fn optional_string_args(doc: &KdlDocument, name: &str) -> Vec<String> {
find_node(doc, name)
.map(|n| {
n.entries()
.iter()
.filter_map(|e| e.value().as_string().map(str::to_string))
.collect()
})
.unwrap_or_default()
}
fn optional_string_map(doc: &KdlDocument, name: &str) -> BTreeMap<String, String> {
let Some(node) = find_node(doc, name) else {
return BTreeMap::new();
};
let mut out = BTreeMap::new();
if let Some(children) = node.children() {
for child in children.nodes() {
let key = child.name().value().to_string();
if let Some(val) = child.entries().first().and_then(|e| e.value().as_string()) {
out.insert(key, val.to_string());
}
}
}
out
}
fn parse_restart(doc: &KdlDocument, path: &Path) -> Result<RestartConfig, ConfigError> {
let Some(node) = find_node(doc, "restart") else {
return Ok(RestartConfig::default());
};
let Some(children) = node.children() else {
return Ok(RestartConfig::default());
};
let mut out = RestartConfig::default();
for child in children.nodes() {
match child.name().value() {
"policy" => {
let s = string_arg(child, "policy", path)?;
out.policy = match s.as_str() {
"always" => RestartPolicy::Always,
"on-failure" => RestartPolicy::OnFailure,
"never" => RestartPolicy::Never,
other => {
return Err(ConfigError::InvalidValue {
path: path.to_path_buf(),
field: "restart.policy",
message: format!("unknown policy `{other}`"),
});
}
};
}
"backoff-initial" => {
out.backoff_initial = parse_duration_arg(child, "restart.backoff-initial", path)?;
}
"backoff-max" => {
out.backoff_max = parse_duration_arg(child, "restart.backoff-max", path)?;
}
"max-retries-per-minute" => {
let v = child
.entries()
.first()
.and_then(|e| e.value().as_integer())
.ok_or(ConfigError::InvalidValue {
path: path.to_path_buf(),
field: "restart.max-retries-per-minute",
message: "expected integer".into(),
})?;
out.max_retries_per_minute =
u32::try_from(v).map_err(|_| ConfigError::InvalidValue {
path: path.to_path_buf(),
field: "restart.max-retries-per-minute",
message: format!("out of u32 range: {v}"),
})?;
}
other => {
return Err(ConfigError::InvalidValue {
path: path.to_path_buf(),
field: "restart",
message: format!("unknown key `{other}`"),
});
}
}
}
Ok(out)
}
fn parse_stop(doc: &KdlDocument, path: &Path) -> Result<StopConfig, ConfigError> {
let Some(node) = find_node(doc, "stop") else {
return Ok(StopConfig::default());
};
let Some(children) = node.children() else {
return Ok(StopConfig::default());
};
let mut out = StopConfig::default();
for child in children.nodes() {
match child.name().value() {
"grace" => {
out.grace = parse_duration_arg(child, "stop.grace", path)?;
}
other => {
return Err(ConfigError::InvalidValue {
path: path.to_path_buf(),
field: "stop",
message: format!("unknown key `{other}`"),
});
}
}
}
Ok(out)
}
fn string_arg(node: &KdlNode, field: &'static str, path: &Path) -> Result<String, ConfigError> {
node.entries()
.first()
.and_then(|e| e.value().as_string().map(str::to_string))
.ok_or(ConfigError::InvalidValue {
path: path.to_path_buf(),
field,
message: "expected string".into(),
})
}
fn parse_duration_arg(
node: &KdlNode,
field: &'static str,
path: &Path,
) -> Result<Duration, ConfigError> {
let s = string_arg(node, field, path)?;
humantime::parse_duration(&s).map_err(|err| ConfigError::InvalidValue {
path: path.to_path_buf(),
field,
message: format!("invalid duration `{s}`: {err}"),
})
}
fn find_node<'a>(doc: &'a KdlDocument, name: &str) -> Option<&'a KdlNode> {
doc.nodes().iter().find(|n| n.name().value() == name)
}
pub fn load_all_configs(dir: &Path) -> Result<Vec<ServerConfig>, ConfigError> {
if !dir.exists() {
return Ok(Vec::new());
}
let entries = std::fs::read_dir(dir).map_err(|e| ConfigError::Io {
path: dir.to_path_buf(),
source: e,
})?;
let mut configs = Vec::new();
for entry in entries {
let entry = entry.map_err(|e| ConfigError::Io {
path: dir.to_path_buf(),
source: e,
})?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("kdl") {
continue;
}
let name = path
.file_stem()
.and_then(|s| s.to_str())
.ok_or(ConfigError::InvalidName {
name: path.display().to_string(),
})?
.to_string();
let text = std::fs::read_to_string(&path).map_err(|e| ConfigError::Io {
path: path.clone(),
source: e,
})?;
configs.push(parse_server_config(&name, &text, &path)?);
}
check_duplicate_ports(&configs)?;
Ok(configs)
}
fn check_duplicate_ports(configs: &[ServerConfig]) -> Result<(), ConfigError> {
let mut seen: std::collections::HashMap<u16, String> = std::collections::HashMap::new();
for c in configs {
if let Some(other) = seen.insert(c.port, c.name.clone()) {
return Err(ConfigError::DuplicatePort {
name_a: other,
name_b: c.name.clone(),
port: c.port,
});
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
fn p() -> &'static Path {
Path::new("/tmp/test.kdl")
}
#[test]
fn parses_minimal_config() {
let text = "command \"/usr/local/bin/foo\"\nport 8421\n";
let cfg = parse_server_config("foo", text, p()).unwrap();
assert_eq!(cfg.name, "foo");
assert_eq!(cfg.command, PathBuf::from("/usr/local/bin/foo"));
assert_eq!(cfg.port, 8421);
assert!(cfg.args.is_empty());
assert!(cfg.env.is_empty());
}
#[test]
fn parses_full_config() {
let text = r#"
command "/usr/local/bin/foo"
args "--http" "--port" "8421"
port 8421
env {
RUST_LOG "info"
FOO_BAR "baz"
}
working-dir "/tmp/work"
restart {
policy "always"
backoff-initial "2s"
backoff-max "1m"
max-retries-per-minute 10
}
stop {
grace "30s"
}
"#;
let cfg = parse_server_config("foo", text, p()).unwrap();
assert_eq!(cfg.args, vec!["--http", "--port", "8421"]);
assert_eq!(cfg.env.get("RUST_LOG").map(String::as_str), Some("info"));
assert_eq!(cfg.working_dir, Some(PathBuf::from("/tmp/work")));
assert_eq!(cfg.restart.policy, RestartPolicy::Always);
assert_eq!(cfg.restart.backoff_initial, Duration::from_secs(2));
assert_eq!(cfg.restart.backoff_max, Duration::from_secs(60));
assert_eq!(cfg.restart.max_retries_per_minute, 10);
assert_eq!(cfg.stop.grace, Duration::from_secs(30));
}
#[test]
fn missing_command_fails() {
let err = parse_server_config("foo", "port 8421", p()).unwrap_err();
assert!(matches!(
err,
ConfigError::MissingField {
field: "command",
..
}
));
}
#[test]
fn missing_port_fails() {
let err = parse_server_config("foo", "command \"/bin/x\"", p()).unwrap_err();
assert!(matches!(
err,
ConfigError::MissingField { field: "port", .. }
));
}
#[test]
fn unknown_restart_policy_fails() {
let text = "command \"/bin/x\"\nport 1\nrestart { policy \"maybe\" }";
let err = parse_server_config("foo", text, p()).unwrap_err();
assert!(matches!(
err,
ConfigError::InvalidValue {
field: "restart.policy",
..
}
));
}
#[test]
fn invalid_name_rejected() {
let text = "command \"/bin/x\"\nport 1";
let err = parse_server_config("Foo Bar", text, p()).unwrap_err();
assert!(matches!(err, ConfigError::InvalidName { .. }));
}
use std::fs;
use tempfile::tempdir;
#[test]
fn load_all_finds_and_parses_files() {
let dir = tempdir().unwrap();
fs::write(dir.path().join("a.kdl"), "command \"/bin/a\"\nport 8001").unwrap();
fs::write(dir.path().join("b.kdl"), "command \"/bin/b\"\nport 8002").unwrap();
fs::write(dir.path().join("ignored.txt"), "not a config").unwrap();
let mut configs = load_all_configs(dir.path()).unwrap();
configs.sort_by(|x, y| x.name.cmp(&y.name));
assert_eq!(configs.len(), 2);
assert_eq!(configs[0].name, "a");
assert_eq!(configs[1].port, 8002);
}
#[test]
fn load_all_returns_empty_for_missing_dir() {
let dir = tempdir().unwrap();
let configs = load_all_configs(&dir.path().join("does-not-exist")).unwrap();
assert!(configs.is_empty());
}
#[test]
fn duplicate_ports_detected() {
let dir = tempdir().unwrap();
fs::write(dir.path().join("a.kdl"), "command \"/bin/a\"\nport 8001").unwrap();
fs::write(dir.path().join("b.kdl"), "command \"/bin/b\"\nport 8001").unwrap();
let err = load_all_configs(dir.path()).unwrap_err();
match err {
ConfigError::DuplicatePort { port, .. } => assert_eq!(port, 8001),
other => panic!("unexpected error: {other:?}"),
}
}
}
+12
View File
@@ -0,0 +1,12 @@
//! Wire types and config schema shared between the xy daemon and CLI.
pub mod config;
pub mod error;
pub mod kdl_parse;
pub mod rpc;
pub mod state;
pub use config::{RestartConfig, RestartPolicy, ServerConfig, StopConfig};
pub use error::{ConfigError, RpcErrorCode};
pub use kdl_parse::{load_all_configs, parse_server_config};
pub use state::ServerState;
+148
View File
@@ -0,0 +1,148 @@
use crate::ServerState;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerSummary {
pub name: String,
pub state: ServerState,
#[serde(skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
pub port: u16,
#[serde(skip_serializing_if = "Option::is_none")]
pub uptime_secs: Option<u64>,
pub restart_count: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_exit: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusDetail {
#[serde(flatten)]
pub summary: ServerSummary,
pub recent_transitions: Vec<StateTransition>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateTransition {
pub from: ServerState,
pub to: ServerState,
pub at_unix_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum NameOrAll {
All { all: bool },
Name { name: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusParams {
pub name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StartResult {
pub started: Vec<String>,
pub already_running: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StopResult {
pub stopped: Vec<String>,
pub not_running: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RestartResult {
pub restarted: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReloadResult {
pub added: Vec<String>,
pub removed: Vec<String>,
pub changed: Vec<String>,
pub unchanged: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogsParams {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tail: Option<u32>,
#[serde(default)]
pub follow: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogsSubscribed {
pub subscription_id: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogsCancelParams {
pub subscription_id: u64,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum LogStream {
Stdout,
Stderr,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogLine {
pub subscription_id: u64,
pub name: String,
pub stream: LogStream,
pub line: String,
pub ts_unix_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEnd {
pub subscription_id: u64,
}
pub mod methods {
pub const LIST: &str = "list";
pub const STATUS: &str = "status";
pub const START: &str = "start";
pub const STOP: &str = "stop";
pub const RESTART: &str = "restart";
pub const RELOAD: &str = "reload";
pub const LOGS: &str = "logs";
pub const LOGS_CANCEL: &str = "logs_cancel";
}
pub mod notifications {
pub const LOG: &str = "log";
pub const LOG_END: &str = "log_end";
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn name_or_all_round_trips_name() {
let v: NameOrAll = serde_json::from_str(r#"{"name":"foo"}"#).unwrap();
match v {
NameOrAll::Name { name } => assert_eq!(name, "foo"),
_ => panic!("expected Name variant"),
}
}
#[test]
fn name_or_all_round_trips_all() {
let v: NameOrAll = serde_json::from_str(r#"{"all":true}"#).unwrap();
match v {
NameOrAll::All { all } => assert!(all),
_ => panic!("expected All variant"),
}
}
}
+31
View File
@@ -0,0 +1,31 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum ServerState {
Stopped,
Starting,
Running,
Restarting,
Failed,
Stopping,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn serializes_to_kebab_case() {
assert_eq!(
serde_json::to_string(&ServerState::Restarting).unwrap(),
"\"restarting\""
);
}
#[test]
fn deserializes_from_kebab_case() {
let s: ServerState = serde_json::from_str("\"failed\"").unwrap();
assert_eq!(s, ServerState::Failed);
}
}
+17
View File
@@ -0,0 +1,17 @@
[package]
name = "xy-supervisor"
edition.workspace = true
version.workspace = true
license.workspace = true
[dependencies]
xy-protocol.workspace = true
tokio.workspace = true
tracing.workspace = true
thiserror.workspace = true
nix.workspace = true
async-trait.workspace = true
[dev-dependencies]
tokio-test.workspace = true
tempfile.workspace = true
+71
View File
@@ -0,0 +1,71 @@
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct Backoff {
initial: Duration,
max: Duration,
current: Option<Duration>,
}
impl Backoff {
pub fn new(initial: Duration, max: Duration) -> Self {
Self {
initial,
max,
current: None,
}
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Duration {
let next = match self.current {
None => self.initial,
Some(d) => (d * 2).min(self.max),
};
self.current = Some(next);
next
}
pub fn reset(&mut self) {
self.current = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn starts_at_initial() {
let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30));
assert_eq!(b.next(), Duration::from_secs(1));
}
#[test]
fn doubles_each_call() {
let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30));
assert_eq!(b.next(), Duration::from_secs(1));
assert_eq!(b.next(), Duration::from_secs(2));
assert_eq!(b.next(), Duration::from_secs(4));
assert_eq!(b.next(), Duration::from_secs(8));
}
#[test]
fn caps_at_max() {
let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(5));
for _ in 0..10 {
b.next();
}
assert_eq!(b.next(), Duration::from_secs(5));
}
#[test]
fn reset_starts_over() {
let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30));
b.next();
b.next();
b.next();
b.reset();
assert_eq!(b.next(), Duration::from_secs(1));
}
}
+211
View File
@@ -0,0 +1,211 @@
use std::sync::Arc;
use tokio::sync::{Mutex, oneshot};
#[async_trait::async_trait]
pub trait ChildHandle: Send + 'static {
fn pid(&self) -> u32;
async fn wait(&mut self) -> std::io::Result<Option<i32>>;
fn terminate(&mut self) -> std::io::Result<()>;
fn kill(&mut self) -> std::io::Result<()>;
}
pub struct MockChild {
pid: u32,
exit_rx: Arc<Mutex<oneshot::Receiver<Option<i32>>>>,
terminate_tx: Option<oneshot::Sender<()>>,
kill_tx: Option<oneshot::Sender<()>>,
}
pub struct MockChildController {
pub exit_tx: Option<oneshot::Sender<Option<i32>>>,
pub terminate_rx: oneshot::Receiver<()>,
pub kill_rx: oneshot::Receiver<()>,
}
impl MockChild {
pub fn new(pid: u32) -> (Self, MockChildController) {
let (exit_tx, exit_rx) = oneshot::channel();
let (terminate_tx, terminate_rx) = oneshot::channel();
let (kill_tx, kill_rx) = oneshot::channel();
let child = Self {
pid,
exit_rx: Arc::new(Mutex::new(exit_rx)),
terminate_tx: Some(terminate_tx),
kill_tx: Some(kill_tx),
};
let ctl = MockChildController {
exit_tx: Some(exit_tx),
terminate_rx,
kill_rx,
};
(child, ctl)
}
}
#[async_trait::async_trait]
impl ChildHandle for MockChild {
fn pid(&self) -> u32 {
self.pid
}
async fn wait(&mut self) -> std::io::Result<Option<i32>> {
let mut rx = self.exit_rx.lock().await;
match (&mut *rx).await {
Ok(code) => Ok(code),
Err(_) => Err(std::io::Error::other("exit_tx dropped")),
}
}
fn terminate(&mut self) -> std::io::Result<()> {
if let Some(tx) = self.terminate_tx.take() {
let _ = tx.send(());
}
Ok(())
}
fn kill(&mut self) -> std::io::Result<()> {
if let Some(tx) = self.kill_tx.take() {
let _ = tx.send(());
}
Ok(())
}
}
use crate::logs::LogSink;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child as TokioChild, Command};
use xy_protocol::{ServerConfig, rpc::LogStream};
pub struct RealChild {
pid: u32,
pgid: Pid,
child: Option<TokioChild>,
}
impl RealChild {
pub fn pgid(&self) -> Pid {
self.pgid
}
}
#[async_trait::async_trait]
impl ChildHandle for RealChild {
fn pid(&self) -> u32 {
self.pid
}
async fn wait(&mut self) -> std::io::Result<Option<i32>> {
let child = self
.child
.as_mut()
.ok_or_else(|| std::io::Error::other("already waited"))?;
let status = child.wait().await?;
Ok(status.code())
}
fn terminate(&mut self) -> std::io::Result<()> {
kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGTERM)
.map_err(|err| std::io::Error::other(err.to_string()))
}
fn kill(&mut self) -> std::io::Result<()> {
kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGKILL)
.map_err(|err| std::io::Error::other(err.to_string()))
}
}
pub fn spawn_with_logs(cfg: &ServerConfig, sink: LogSink) -> std::io::Result<RealChild> {
let mut cmd = Command::new(&cfg.command);
cmd.args(&cfg.args);
for (k, v) in &cfg.env {
cmd.env(k, v);
}
if let Some(dir) = &cfg.working_dir {
cmd.current_dir(dir);
}
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.kill_on_drop(true);
// Own process group so signals reach the whole tree.
unsafe {
cmd.pre_exec(|| {
nix::unistd::setpgid(Pid::from_raw(0), Pid::from_raw(0))
.map_err(|err| std::io::Error::other(err.to_string()))
});
}
let mut child = cmd.spawn()?;
let pid = child.id().ok_or_else(|| std::io::Error::other("no pid"))?;
let pgid = Pid::from_raw(pid as i32);
if let Some(out) = child.stdout.take() {
spawn_pump(out, sink.clone(), LogStream::Stdout);
}
if let Some(err) = child.stderr.take() {
spawn_pump(err, sink.clone(), LogStream::Stderr);
}
Ok(RealChild {
pid,
pgid,
child: Some(child),
})
}
fn spawn_pump<R: tokio::io::AsyncRead + Unpin + Send + 'static>(
reader: R,
sink: LogSink,
stream: LogStream,
) {
tokio::spawn(async move {
let mut lines = BufReader::new(reader).lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => sink.record(stream, line),
Ok(None) => break,
Err(err) => {
tracing::warn!(
server = %sink.server_name,
error = %err,
?stream,
"log pump read error"
);
break;
}
}
}
});
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn mock_child_exit() {
let (mut child, mut ctl) = MockChild::new(123);
assert_eq!(child.pid(), 123);
ctl.exit_tx.take().unwrap().send(Some(0)).unwrap();
assert_eq!(child.wait().await.unwrap(), Some(0));
}
#[tokio::test]
async fn mock_child_terminate() {
let (mut child, mut ctl) = MockChild::new(1);
child.terminate().unwrap();
ctl.terminate_rx.try_recv().unwrap();
}
}
+18
View File
@@ -0,0 +1,18 @@
//! Process-supervision primitives for the xy daemon.
pub mod backoff;
pub mod child;
pub mod logs;
pub mod policy;
pub mod retry_window;
pub mod supervisor;
pub use backoff::Backoff;
pub use child::{ChildHandle, MockChild, MockChildController, RealChild, spawn_with_logs};
pub use logs::{LogSink, RecordedLine, RingBuffer, RotatingLogWriter};
pub use policy::{RestartDecision, decide};
pub use retry_window::RetryWindow;
pub use supervisor::{
RealSpawner, Spawner, StartAck, Status, StopAck, SupervisorCmd, SupervisorHandle,
SupervisorTask,
};
+263
View File
@@ -0,0 +1,263 @@
use std::collections::VecDeque;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use xy_protocol::rpc::{LogLine, LogStream};
pub struct RotatingLogWriter {
base: PathBuf,
max_bytes: u64,
keep: usize,
file: File,
written: u64,
}
impl RotatingLogWriter {
pub fn open(base: &Path, max_bytes: u64, keep: usize) -> std::io::Result<Self> {
if let Some(parent) = base.parent() {
std::fs::create_dir_all(parent)?;
}
let file = OpenOptions::new().create(true).append(true).open(base)?;
let written = file.metadata()?.len();
Ok(Self {
base: base.to_path_buf(),
max_bytes,
keep,
file,
written,
})
}
pub fn write_line(&mut self, tag: &str, line: &str) -> std::io::Result<()> {
let bytes = format!("{tag} {line}\n");
self.file.write_all(bytes.as_bytes())?;
self.written += bytes.len() as u64;
if self.written >= self.max_bytes {
self.rotate()?;
}
Ok(())
}
fn rotate(&mut self) -> std::io::Result<()> {
// Drop the current handle by replacing with /dev/null briefly.
self.file = OpenOptions::new().read(true).open("/dev/null")?;
for i in (1..self.keep).rev() {
let src = self.gen_path(i);
let dst = self.gen_path(i + 1);
if src.exists() {
let _ = std::fs::rename(&src, &dst);
}
}
if self.base.exists() {
let _ = std::fs::rename(&self.base, self.gen_path(1));
}
self.file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.base)?;
self.written = 0;
Ok(())
}
fn gen_path(&self, n: usize) -> PathBuf {
let mut s = self.base.as_os_str().to_os_string();
s.push(format!(".{n}"));
PathBuf::from(s)
}
}
#[derive(Clone)]
pub struct RingBuffer {
inner: Arc<Mutex<RingBufferInner>>,
capacity_bytes: usize,
}
struct RingBufferInner {
lines: VecDeque<RecordedLine>,
bytes: usize,
}
#[derive(Debug, Clone)]
pub struct RecordedLine {
pub stream: xy_protocol::rpc::LogStream,
pub line: String,
pub ts_unix_ms: u64,
}
impl RingBuffer {
pub fn new(capacity_bytes: usize) -> Self {
Self {
inner: Arc::new(Mutex::new(RingBufferInner {
lines: VecDeque::new(),
bytes: 0,
})),
capacity_bytes,
}
}
pub fn push(&self, line: RecordedLine) {
let mut g = self.inner.lock().unwrap();
g.bytes += line.line.len();
g.lines.push_back(line);
while g.bytes > self.capacity_bytes {
if let Some(removed) = g.lines.pop_front() {
g.bytes -= removed.line.len();
} else {
break;
}
}
}
pub fn snapshot_tail(&self, n: Option<u32>) -> Vec<RecordedLine> {
let g = self.inner.lock().unwrap();
match n {
None => g.lines.iter().cloned().collect(),
Some(n) => {
let take = (n as usize).min(g.lines.len());
let start = g.lines.len() - take;
g.lines.iter().skip(start).cloned().collect()
}
}
}
}
const LOG_BROADCAST_CAP: usize = 256;
#[derive(Clone)]
pub struct LogSink {
pub server_name: String,
writer: Arc<Mutex<RotatingLogWriter>>,
pub ring: RingBuffer,
pub broadcast: broadcast::Sender<LogLine>,
}
impl LogSink {
pub fn new(server_name: String, writer: RotatingLogWriter, ring_capacity_bytes: usize) -> Self {
let (tx, _) = broadcast::channel(LOG_BROADCAST_CAP);
Self {
server_name,
writer: Arc::new(Mutex::new(writer)),
ring: RingBuffer::new(ring_capacity_bytes),
broadcast: tx,
}
}
pub fn record(&self, stream: LogStream, line: String) {
let ts = now_unix_ms();
let tag = match stream {
LogStream::Stdout => "[out]",
LogStream::Stderr => "[err]",
};
if let Err(e) = self.writer.lock().unwrap().write_line(tag, &line) {
tracing::warn!(server = %self.server_name, error = %e, "log file write failed");
}
self.ring.push(RecordedLine {
stream,
line: line.clone(),
ts_unix_ms: ts,
});
let _ = self.broadcast.send(LogLine {
subscription_id: 0,
name: self.server_name.clone(),
stream,
line,
ts_unix_ms: ts,
});
}
}
fn now_unix_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Read;
use tempfile::tempdir;
#[test]
fn writes_lines_with_tags() {
let dir = tempdir().unwrap();
let base = dir.path().join("x.log");
let mut w = RotatingLogWriter::open(&base, 1024, 3).unwrap();
w.write_line("[out]", "hello").unwrap();
w.write_line("[err]", "boom").unwrap();
let mut s = String::new();
File::open(&base).unwrap().read_to_string(&mut s).unwrap();
assert_eq!(s, "[out] hello\n[err] boom\n");
}
#[test]
fn rotates_at_threshold() {
let dir = tempdir().unwrap();
let base = dir.path().join("x.log");
let mut w = RotatingLogWriter::open(&base, 20, 3).unwrap();
for _ in 0..5 {
w.write_line("[out]", "0123456789").unwrap();
}
assert!(base.exists());
let rotated = dir.path().join("x.log.1");
assert!(
rotated.exists(),
"expected rotated file at {}",
rotated.display()
);
}
use xy_protocol::rpc::LogStream;
fn recorded(s: &str) -> RecordedLine {
RecordedLine {
stream: LogStream::Stdout,
line: s.to_string(),
ts_unix_ms: 0,
}
}
#[test]
fn ring_buffer_drops_oldest_when_full() {
let rb = RingBuffer::new(10);
rb.push(recorded("aaaaa"));
rb.push(recorded("bbbbb"));
rb.push(recorded("ccc"));
let snap = rb.snapshot_tail(None);
assert_eq!(snap.len(), 2);
assert_eq!(snap[0].line, "bbbbb");
assert_eq!(snap[1].line, "ccc");
}
#[test]
fn ring_buffer_tail_n() {
let rb = RingBuffer::new(1024);
for i in 0..5 {
rb.push(recorded(&format!("line{i}")));
}
let snap = rb.snapshot_tail(Some(2));
assert_eq!(snap.len(), 2);
assert_eq!(snap[0].line, "line3");
assert_eq!(snap[1].line, "line4");
}
#[tokio::test]
async fn log_sink_records_and_broadcasts() {
let dir = tempdir().unwrap();
let writer = RotatingLogWriter::open(&dir.path().join("s.log"), 1024, 3).unwrap();
let sink = LogSink::new("s".to_string(), writer, 1024);
let mut rx = sink.broadcast.subscribe();
sink.record(LogStream::Stdout, "hello".to_string());
let got = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
.await
.unwrap()
.unwrap();
assert_eq!(got.line, "hello");
assert_eq!(got.stream, LogStream::Stdout);
assert_eq!(sink.ring.snapshot_tail(None).len(), 1);
}
}
+102
View File
@@ -0,0 +1,102 @@
use xy_protocol::RestartPolicy;
#[derive(Debug, PartialEq, Eq)]
pub enum RestartDecision {
Restart,
StayStopped,
MarkFailed,
}
pub fn decide(
policy: RestartPolicy,
exit_code: Option<i32>,
retry_cap_reached: bool,
) -> RestartDecision {
let clean = matches!(exit_code, Some(0));
let want = match policy {
RestartPolicy::Never => false,
RestartPolicy::OnFailure => !clean,
RestartPolicy::Always => true,
};
if !want {
return RestartDecision::StayStopped;
}
if retry_cap_reached {
RestartDecision::MarkFailed
} else {
RestartDecision::Restart
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn never_never_restarts() {
assert_eq!(
decide(RestartPolicy::Never, Some(0), false),
RestartDecision::StayStopped
);
assert_eq!(
decide(RestartPolicy::Never, Some(1), false),
RestartDecision::StayStopped
);
assert_eq!(
decide(RestartPolicy::Never, None, false),
RestartDecision::StayStopped
);
}
#[test]
fn on_failure_skips_clean() {
assert_eq!(
decide(RestartPolicy::OnFailure, Some(0), false),
RestartDecision::StayStopped
);
}
#[test]
fn on_failure_restarts_nonzero() {
assert_eq!(
decide(RestartPolicy::OnFailure, Some(1), false),
RestartDecision::Restart
);
}
#[test]
fn on_failure_restarts_signal() {
assert_eq!(
decide(RestartPolicy::OnFailure, None, false),
RestartDecision::Restart
);
}
#[test]
fn always_restarts_on_clean() {
assert_eq!(
decide(RestartPolicy::Always, Some(0), false),
RestartDecision::Restart
);
}
#[test]
fn cap_reached_marks_failed() {
assert_eq!(
decide(RestartPolicy::Always, Some(0), true),
RestartDecision::MarkFailed
);
assert_eq!(
decide(RestartPolicy::OnFailure, Some(1), true),
RestartDecision::MarkFailed
);
}
#[test]
fn cap_reached_never_still_stopped() {
assert_eq!(
decide(RestartPolicy::Never, Some(1), true),
RestartDecision::StayStopped
);
}
}
+80
View File
@@ -0,0 +1,80 @@
use std::collections::VecDeque;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct RetryWindow {
window: Duration,
cap: u32,
events: VecDeque<Instant>,
}
impl RetryWindow {
pub fn new(window: Duration, cap: u32) -> Self {
Self {
window,
cap,
events: VecDeque::new(),
}
}
pub fn record(&mut self, now: Instant) {
self.events.push_back(now);
self.prune(now);
}
pub fn cap_reached(&mut self, now: Instant) -> bool {
self.prune(now);
self.events.len() as u32 >= self.cap
}
pub fn count(&mut self, now: Instant) -> u32 {
self.prune(now);
self.events.len() as u32
}
fn prune(&mut self, now: Instant) {
while let Some(&front) = self.events.front() {
if now.duration_since(front) > self.window {
self.events.pop_front();
} else {
break;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn below_cap_not_reached() {
let mut w = RetryWindow::new(Duration::from_secs(60), 3);
let t = Instant::now();
w.record(t);
w.record(t);
assert!(!w.cap_reached(t));
}
#[test]
fn at_cap_reached() {
let mut w = RetryWindow::new(Duration::from_secs(60), 3);
let t = Instant::now();
w.record(t);
w.record(t);
w.record(t);
assert!(w.cap_reached(t));
}
#[test]
fn old_events_pruned() {
let mut w = RetryWindow::new(Duration::from_secs(60), 3);
let t0 = Instant::now();
w.record(t0);
w.record(t0);
w.record(t0);
let t1 = t0 + Duration::from_secs(61);
assert_eq!(w.count(t1), 0);
assert!(!w.cap_reached(t1));
}
}
+451
View File
@@ -0,0 +1,451 @@
use crate::{
backoff::Backoff,
child::ChildHandle,
logs::LogSink,
policy::{RestartDecision, decide},
retry_window::RetryWindow,
};
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::sleep;
use tracing::{debug, info, warn};
use xy_protocol::{ServerConfig, ServerState};
pub enum SupervisorCmd {
Start {
ack: oneshot::Sender<StartAck>,
},
Stop {
ack: oneshot::Sender<StopAck>,
},
Restart {
ack: oneshot::Sender<()>,
},
Reconfigure {
new: ServerConfig,
ack: oneshot::Sender<()>,
},
Shutdown {
ack: oneshot::Sender<()>,
},
}
#[derive(Debug, PartialEq, Eq)]
pub enum StartAck {
Started,
AlreadyRunning,
SpawnFailed(String),
}
#[derive(Debug, PartialEq, Eq)]
pub enum StopAck {
Stopped,
NotRunning,
}
#[derive(Debug, Clone)]
pub struct Status {
pub state: ServerState,
pub pid: Option<u32>,
pub port: u16,
pub uptime_secs: Option<u64>,
pub restart_count: u32,
pub last_exit: Option<i32>,
}
#[derive(Clone)]
pub struct SupervisorHandle {
pub name: String,
pub tx: mpsc::Sender<SupervisorCmd>,
pub status: watch::Receiver<Status>,
pub log_sink: LogSink,
}
#[async_trait::async_trait]
pub trait Spawner: Send + 'static {
type Child: ChildHandle;
async fn spawn(&self, cfg: &ServerConfig, sink: LogSink) -> std::io::Result<Self::Child>;
}
pub struct SupervisorTask<S: Spawner> {
cfg: ServerConfig,
log_sink: LogSink,
spawner: S,
status_tx: watch::Sender<Status>,
cmd_rx: mpsc::Receiver<SupervisorCmd>,
backoff: Backoff,
retry_window: RetryWindow,
restart_count: u32,
last_exit: Option<i32>,
started_at: Option<Instant>,
current_pid: Option<u32>,
}
impl<S: Spawner> SupervisorTask<S> {
pub fn new(
cfg: ServerConfig,
log_sink: LogSink,
spawner: S,
status_tx: watch::Sender<Status>,
cmd_rx: mpsc::Receiver<SupervisorCmd>,
) -> Self {
let backoff = Backoff::new(cfg.restart.backoff_initial, cfg.restart.backoff_max);
let retry_window =
RetryWindow::new(Duration::from_secs(60), cfg.restart.max_retries_per_minute);
Self {
cfg,
log_sink,
spawner,
status_tx,
cmd_rx,
backoff,
retry_window,
restart_count: 0,
last_exit: None,
started_at: None,
current_pid: None,
}
}
fn set_state(&mut self, s: ServerState) {
let uptime_secs = self.started_at.map(|t| t.elapsed().as_secs());
let _ = self.status_tx.send(Status {
state: s,
pid: self.current_pid,
port: self.cfg.port,
uptime_secs,
restart_count: self.restart_count,
last_exit: self.last_exit,
});
}
pub async fn run(mut self) {
let mut child: Option<S::Child> = None;
loop {
tokio::select! {
cmd = self.cmd_rx.recv() => {
let Some(cmd) = cmd else { break; };
match cmd {
SupervisorCmd::Start { ack } => {
if child.is_some() {
let _ = ack.send(StartAck::AlreadyRunning);
} else {
match self.do_start().await {
Ok(c) => {
child = Some(c);
let _ = ack.send(StartAck::Started);
}
Err(err) => {
warn!(name = %self.cfg.name, error = %err, "spawn failed");
self.set_state(ServerState::Failed);
let _ = ack.send(StartAck::SpawnFailed(err.to_string()));
}
}
}
}
SupervisorCmd::Stop { ack } => {
if let Some(c) = child.take() {
self.do_stop(c).await;
let _ = ack.send(StopAck::Stopped);
} else {
let _ = ack.send(StopAck::NotRunning);
}
}
SupervisorCmd::Restart { ack } => {
if let Some(c) = child.take() {
self.set_state(ServerState::Restarting);
self.do_stop(c).await;
}
match self.do_start().await {
Ok(c) => child = Some(c),
Err(err) => {
warn!(name = %self.cfg.name, error = %err, "restart spawn failed");
self.set_state(ServerState::Failed);
}
}
let _ = ack.send(());
}
SupervisorCmd::Reconfigure { new, ack } => {
self.cfg = new;
self.backoff =
Backoff::new(self.cfg.restart.backoff_initial, self.cfg.restart.backoff_max);
self.retry_window = RetryWindow::new(
Duration::from_secs(60),
self.cfg.restart.max_retries_per_minute,
);
let _ = ack.send(());
}
SupervisorCmd::Shutdown { ack } => {
if let Some(c) = child.take() {
self.do_stop(c).await;
}
let _ = ack.send(());
return;
}
}
}
code = wait_child(&mut child) => {
child = None;
self.last_exit = code;
self.current_pid = None;
let now = Instant::now();
self.retry_window.record(now);
let cap = self.retry_window.cap_reached(now);
let decision = decide(self.cfg.restart.policy, code, cap);
debug!(name = %self.cfg.name, ?code, ?decision, "child exited");
match decision {
RestartDecision::StayStopped => {
self.started_at = None;
self.set_state(ServerState::Stopped);
}
RestartDecision::MarkFailed => {
self.started_at = None;
self.set_state(ServerState::Failed);
}
RestartDecision::Restart => {
self.set_state(ServerState::Restarting);
let delay = self.backoff.next();
enum Action {
RetryNow,
Cancel,
Exit,
}
let mut delay_fut = std::pin::pin!(sleep(delay));
let action = tokio::select! {
_ = &mut delay_fut => Action::RetryNow,
cmd = self.cmd_rx.recv() => match cmd {
None => Action::Exit,
Some(SupervisorCmd::Stop { ack }) => {
let _ = ack.send(StopAck::NotRunning);
Action::Cancel
}
Some(SupervisorCmd::Shutdown { ack }) => {
let _ = ack.send(());
return;
}
Some(SupervisorCmd::Start { ack }) => {
let _ = ack.send(StartAck::Started);
Action::RetryNow
}
Some(SupervisorCmd::Restart { ack }) => {
let _ = ack.send(());
Action::RetryNow
}
Some(SupervisorCmd::Reconfigure { new, ack }) => {
self.cfg = new;
self.backoff = Backoff::new(
self.cfg.restart.backoff_initial,
self.cfg.restart.backoff_max,
);
self.retry_window = RetryWindow::new(
Duration::from_secs(60),
self.cfg.restart.max_retries_per_minute,
);
let _ = ack.send(());
Action::RetryNow
}
},
};
match action {
Action::RetryNow => {
match self.do_start().await {
Ok(c) => child = Some(c),
Err(err) => {
warn!(name = %self.cfg.name, error = %err, "restart spawn failed");
self.set_state(ServerState::Failed);
}
}
}
Action::Cancel => {
self.started_at = None;
self.set_state(ServerState::Stopped);
}
Action::Exit => return,
}
}
}
}
}
}
}
async fn do_start(&mut self) -> std::io::Result<S::Child> {
self.set_state(ServerState::Starting);
let c = self.spawner.spawn(&self.cfg, self.log_sink.clone()).await?;
self.restart_count = self.restart_count.saturating_add(1);
self.started_at = Some(Instant::now());
self.current_pid = Some(c.pid());
self.backoff.reset();
self.set_state(ServerState::Running);
info!(name = %self.cfg.name, pid = c.pid(), "started");
Ok(c)
}
async fn do_stop(&mut self, mut c: S::Child) {
self.set_state(ServerState::Stopping);
let _ = c.terminate();
let grace = self.cfg.stop.grace;
match tokio::time::timeout(grace, c.wait()).await {
Ok(_) => {}
Err(_) => {
let _ = c.kill();
let _ = c.wait().await;
}
}
self.current_pid = None;
self.started_at = None;
self.set_state(ServerState::Stopped);
}
}
async fn wait_child<C: ChildHandle>(slot: &mut Option<C>) -> Option<i32> {
match slot.as_mut() {
Some(c) => c.wait().await.ok().flatten(),
None => std::future::pending().await,
}
}
pub struct RealSpawner;
#[async_trait::async_trait]
impl Spawner for RealSpawner {
type Child = crate::child::RealChild;
async fn spawn(&self, cfg: &ServerConfig, sink: LogSink) -> std::io::Result<Self::Child> {
crate::child::spawn_with_logs(cfg, sink)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::child::MockChild;
use crate::logs::{LogSink, RotatingLogWriter};
use std::sync::{Arc, Mutex};
use tempfile::tempdir;
use xy_protocol::{RestartConfig, RestartPolicy, StopConfig};
struct QueueSpawner {
queue: Arc<Mutex<Vec<MockChild>>>,
}
#[async_trait::async_trait]
impl Spawner for QueueSpawner {
type Child = MockChild;
async fn spawn(&self, _cfg: &ServerConfig, _sink: LogSink) -> std::io::Result<MockChild> {
let mut q = self.queue.lock().unwrap();
Ok(q.remove(0))
}
}
fn cfg(name: &str, policy: RestartPolicy, max_retries: u32) -> ServerConfig {
ServerConfig {
name: name.to_string(),
command: "/bin/true".into(),
args: vec![],
port: 1,
env: Default::default(),
working_dir: None,
restart: RestartConfig {
policy,
backoff_initial: Duration::from_millis(1),
backoff_max: Duration::from_millis(1),
max_retries_per_minute: max_retries,
},
stop: StopConfig {
grace: Duration::from_millis(50),
},
}
}
fn sink(name: &str) -> LogSink {
let dir = tempdir().unwrap();
let writer = RotatingLogWriter::open(&dir.path().join("s.log"), 1024, 3).unwrap();
std::mem::forget(dir);
LogSink::new(name.to_string(), writer, 1024)
}
fn initial_status(cfg: &ServerConfig) -> Status {
Status {
state: ServerState::Stopped,
pid: None,
port: cfg.port,
uptime_secs: None,
restart_count: 0,
last_exit: None,
}
}
async fn wait_for(rx: &mut watch::Receiver<Status>, want: ServerState) {
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
if rx.borrow().state == want {
return;
}
tokio::select! {
_ = rx.changed() => {}
_ = tokio::time::sleep_until(deadline) => panic!("never reached {want:?}, last={:?}", rx.borrow().state),
}
}
}
#[tokio::test]
async fn start_runs_to_running_and_stop_to_stopped() {
let cfg = cfg("x", RestartPolicy::Never, 5);
let (mock, mut ctl) = MockChild::new(1);
let queue = Arc::new(Mutex::new(vec![mock]));
let spawner = QueueSpawner { queue };
let (status_tx, mut status_rx) = watch::channel(initial_status(&cfg));
let (cmd_tx, cmd_rx) = mpsc::channel(8);
let task = SupervisorTask::new(cfg, sink("x"), spawner, status_tx, cmd_rx);
let h = tokio::spawn(task.run());
let (ack_tx, ack_rx) = oneshot::channel();
cmd_tx
.send(SupervisorCmd::Start { ack: ack_tx })
.await
.unwrap();
assert_eq!(ack_rx.await.unwrap(), StartAck::Started);
wait_for(&mut status_rx, ServerState::Running).await;
ctl.exit_tx.take().unwrap().send(Some(0)).unwrap();
wait_for(&mut status_rx, ServerState::Stopped).await;
let (ack_tx, ack_rx) = oneshot::channel();
cmd_tx
.send(SupervisorCmd::Shutdown { ack: ack_tx })
.await
.unwrap();
ack_rx.await.unwrap();
h.await.unwrap();
}
}
+35
View File
@@ -0,0 +1,35 @@
[package]
name = "xy"
edition.workspace = true
version.workspace = true
license.workspace = true
[[bin]]
name = "xy"
path = "src/main.rs"
[[bin]]
name = "xy-test-sleep-server"
path = "src/bin/xy_test_sleep_server.rs"
[[bin]]
name = "xy-test-exit-failure"
path = "src/bin/xy_test_exit_failure.rs"
[dependencies]
xy-protocol.workspace = true
xy-supervisor.workspace = true
xy-ipc.workspace = true
tokio.workspace = true
clap.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
anyhow.workspace = true
etcetera.workspace = true
nix.workspace = true
humantime.workspace = true
[dev-dependencies]
tempfile.workspace = true
@@ -0,0 +1,4 @@
fn main() {
eprintln!("exit_failure dying immediately");
std::process::exit(7);
}
+12
View File
@@ -0,0 +1,12 @@
fn main() {
use std::io::Write;
let pid = std::process::id();
eprintln!("sleep_server start pid={pid}");
println!("ready");
std::io::stdout().flush().ok();
loop {
std::thread::sleep(std::time::Duration::from_secs(60));
println!("tick");
std::io::stdout().flush().ok();
}
}
+27
View File
@@ -0,0 +1,27 @@
use xy_protocol::rpc::ServerSummary;
pub fn list_table(rows: &[ServerSummary]) -> String {
let mut out = String::new();
out.push_str("NAME STATE PID PORT UPTIME RESTARTS\n");
for r in rows {
let pid = r.pid.map(|p| p.to_string()).unwrap_or_else(|| "-".into());
let up = r
.uptime_secs
.map(|s| format!("{}s", s))
.unwrap_or_else(|| "-".into());
out.push_str(&format!(
"{:<20}{:<12}{:<8}{:<8}{:<10}{}\n",
r.name,
format!("{:?}", r.state).to_lowercase(),
pid,
r.port,
up,
r.restart_count
));
}
out
}
+194
View File
@@ -0,0 +1,194 @@
use crate::paths::Paths;
use anyhow::Result;
use serde_json::json;
use xy_ipc::{Client, ClientError};
use xy_protocol::rpc::{
LogLine, LogStream, LogsParams, LogsSubscribed, ReloadResult, RestartResult, ServerSummary,
StartResult, StatusDetail, StopResult, methods, notifications,
};
mod format;
async fn connect(paths: &Paths) -> Result<Client> {
match Client::connect(&paths.socket).await {
Ok(c) => Ok(c),
Err(err) => {
eprintln!(
"xy: cannot reach daemon at {}: {err}",
paths.socket.display()
);
std::process::exit(2);
}
}
}
fn rpc_to_exit(err: &ClientError) -> i32 {
match err {
ClientError::Unreachable(_) => 2,
ClientError::Rpc { .. } => 1,
_ => 1,
}
}
pub async fn list(paths: Paths) -> Result<i32> {
let mut c = connect(&paths).await?;
let rows: Vec<ServerSummary> = match c.call_no_params(methods::LIST).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
print!("{}", format::list_table(&rows));
Ok(0)
}
pub async fn status(paths: Paths, name: String) -> Result<i32> {
let mut c = connect(&paths).await?;
let d: StatusDetail = match c.call(methods::STATUS, &json!({"name": name})).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
println!("{:#?}", d);
Ok(0)
}
fn name_or_all(all: bool, name: Option<String>) -> serde_json::Value {
if all {
json!({"all": true})
} else {
json!({"name": name.unwrap()})
}
}
pub async fn start(paths: Paths, all: bool, name: Option<String>) -> Result<i32> {
let mut c = connect(&paths).await?;
let r: StartResult = match c.call(methods::START, &name_or_all(all, name)).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
if !r.started.is_empty() {
println!("started: {}", r.started.join(", "));
}
if !r.already_running.is_empty() {
println!("already running: {}", r.already_running.join(", "));
}
Ok(0)
}
pub async fn stop(paths: Paths, all: bool, name: Option<String>) -> Result<i32> {
let mut c = connect(&paths).await?;
let r: StopResult = match c.call(methods::STOP, &name_or_all(all, name)).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
if !r.stopped.is_empty() {
println!("stopped: {}", r.stopped.join(", "));
}
if !r.not_running.is_empty() {
println!("not running: {}", r.not_running.join(", "));
}
Ok(0)
}
pub async fn restart(paths: Paths, all: bool, name: Option<String>) -> Result<i32> {
let mut c = connect(&paths).await?;
let r: RestartResult = match c.call(methods::RESTART, &name_or_all(all, name)).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
println!("restarted: {}", r.restarted.join(", "));
Ok(0)
}
pub async fn reload(paths: Paths) -> Result<i32> {
let mut c = connect(&paths).await?;
let r: ReloadResult = match c.call_no_params(methods::RELOAD).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
println!("added: {}", r.added.join(", "));
println!("removed: {}", r.removed.join(", "));
println!("changed: {}", r.changed.join(", "));
println!("unchanged: {}", r.unchanged.join(", "));
Ok(0)
}
pub async fn logs(paths: Paths, name: String, tail: Option<u32>, follow: bool) -> Result<i32> {
let mut c = connect(&paths).await?;
let p = LogsParams {
name: name.clone(),
tail,
follow,
};
let _sub: LogsSubscribed = match c.call(methods::LOGS, &p).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
loop {
match c.read_notification().await {
Ok(None) => return Ok(0),
Ok(Some(n)) => match n.method.as_str() {
notifications::LOG => {
if let Some(params) = n.params
&& let Ok(line) = serde_json::from_value::<LogLine>(params)
{
let tag = match line.stream {
LogStream::Stdout => "out",
LogStream::Stderr => "err",
};
println!("[{tag}] {}", line.line);
}
}
notifications::LOG_END => return Ok(0),
_ => {}
},
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
}
}
}
+546
View File
@@ -0,0 +1,546 @@
use crate::daemon::registry::Registry;
use crate::paths::Paths;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use xy_ipc::Connection;
use xy_ipc::envelope::{Incoming, Request, Response, err_response, ok_response};
use xy_protocol::RpcErrorCode;
use xy_protocol::rpc::{
LogEnd, LogLine, LogsCancelParams, LogsParams, LogsSubscribed, NameOrAll, RestartResult,
ServerSummary, StartResult, StatusDetail, StopResult, methods, notifications,
};
use xy_supervisor::supervisor::{StartAck, StopAck, SupervisorCmd};
pub struct ConnState {
pub subs: Mutex<HashMap<u64, JoinHandle<()>>>,
pub next: AtomicU64,
}
impl ConnState {
pub fn new() -> Self {
Self {
subs: Mutex::new(HashMap::new()),
next: AtomicU64::new(1),
}
}
}
pub async fn serve(conn: Arc<Connection>, reg: Registry, _paths: Paths) -> std::io::Result<()> {
let state = Arc::new(ConnState::new());
loop {
let Some(incoming) = conn.read_incoming().await? else {
let mut subs = state.subs.lock().await;
for (_, h) in subs.drain() {
h.abort();
}
return Ok(());
};
if let Incoming::Request(req) = incoming {
let (resp, log_ready) = handle_request(req, &reg, &conn, &state).await;
conn.write_response(&resp).await?;
if let Some(tx) = log_ready {
let _ = tx.send(());
}
}
}
}
struct ApiError {
code: i32,
message: String,
}
impl ApiError {
fn rpc(code: RpcErrorCode, msg: impl Into<String>) -> Self {
Self {
code: code.as_i32(),
message: msg.into(),
}
}
}
type LogReadyTx = tokio::sync::oneshot::Sender<()>;
async fn handle_request(
req: Request,
reg: &Registry,
conn: &Arc<Connection>,
state: &Arc<ConnState>,
) -> (Response, Option<LogReadyTx>) {
let id = req.id.clone();
let method = req.method.as_str();
let params = req.params.unwrap_or(serde_json::Value::Null);
let resp = match method {
methods::LIST => match list(reg).await {
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
Err(err) => err_response(id, err.code, err.message),
},
methods::STATUS => {
let p: xy_protocol::rpc::StatusParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(err) => {
return (
err_response(id, -32602, format!("invalid params: {err}")),
None,
);
}
};
match status(reg, &p.name).await {
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
Err(err) => err_response(id, err.code, err.message),
}
}
methods::START => dispatch_lifecycle(id, params, reg, Op::Start).await,
methods::STOP => dispatch_lifecycle(id, params, reg, Op::Stop).await,
methods::RESTART => dispatch_lifecycle(id, params, reg, Op::Restart).await,
methods::RELOAD => match reload(reg).await {
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
Err(e) => err_response(id, e.code, e.message),
},
methods::LOGS => {
let p: LogsParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(err) => {
return (
err_response(id, -32602, format!("invalid params: {err}")),
None,
);
}
};
match start_log_stream(reg, conn.clone(), state.clone(), p).await {
Ok((sub_id, ready_tx)) => {
let resp = ok_response(
id,
serde_json::to_value(LogsSubscribed {
subscription_id: sub_id,
})
.unwrap(),
);
return (resp, Some(ready_tx));
}
Err(err) => err_response(id, err.code, err.message),
}
}
methods::LOGS_CANCEL => {
let p: LogsCancelParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(err) => {
return (
err_response(id, -32602, format!("invalid params: {err}")),
None,
);
}
};
let mut subs = state.subs.lock().await;
if let Some(h) = subs.remove(&p.subscription_id) {
h.abort();
}
ok_response(id, serde_json::json!({}))
}
other => err_response(id, -32601, format!("unknown method `{other}`")),
};
(resp, None)
}
async fn list(reg: &Registry) -> Result<Vec<ServerSummary>, ApiError> {
let mut out = Vec::new();
for (name, entry) in reg.snapshot().await {
let s = entry.handle.status.borrow();
out.push(ServerSummary {
name,
state: s.state,
pid: s.pid,
port: s.port,
uptime_secs: s.uptime_secs,
restart_count: s.restart_count,
last_exit: s.last_exit,
});
}
Ok(out)
}
async fn status(reg: &Registry, name: &str) -> Result<StatusDetail, ApiError> {
let Some(entry) = reg.get(name).await else {
return Err(ApiError::rpc(
RpcErrorCode::ServerNotFound,
format!("no such server `{name}`"),
));
};
let s = entry.handle.status.borrow();
Ok(StatusDetail {
summary: ServerSummary {
name: entry.handle.name.clone(),
state: s.state,
pid: s.pid,
port: s.port,
uptime_secs: s.uptime_secs,
restart_count: s.restart_count,
last_exit: s.last_exit,
},
recent_transitions: Vec::new(),
})
}
enum Op {
Start,
Stop,
Restart,
}
async fn dispatch_lifecycle(
id: serde_json::Value,
params: serde_json::Value,
reg: &Registry,
op: Op,
) -> Response {
let p: NameOrAll = match serde_json::from_value(params) {
Ok(p) => p,
Err(err) => return err_response(id, -32602, format!("invalid params: {err}")),
};
let targets: Vec<String> = match p {
NameOrAll::All { all } if all => reg.names().await,
NameOrAll::Name { name } => vec![name],
NameOrAll::All { .. } => return err_response(id, -32602, "must set all=true".into()),
};
match op {
Op::Start => {
let mut started = Vec::new();
let mut already = Vec::new();
for name in targets {
let Some(entry) = reg.get(&name).await else {
return err_response(
id,
RpcErrorCode::ServerNotFound.as_i32(),
format!("no such server `{name}`"),
);
};
let (tx, rx) = oneshot::channel();
let _ = entry.handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
match rx.await {
Ok(StartAck::Started) => started.push(name),
Ok(StartAck::AlreadyRunning) => already.push(name),
Ok(StartAck::SpawnFailed(msg)) => {
return err_response(
id,
RpcErrorCode::SpawnFailed.as_i32(),
format!("failed to start `{name}`: {msg}"),
);
}
Err(_) => {
return err_response(
id,
RpcErrorCode::SpawnFailed.as_i32(),
format!("supervisor for `{name}` dropped"),
);
}
}
}
ok_response(
id,
serde_json::to_value(StartResult {
started,
already_running: already,
})
.unwrap(),
)
}
Op::Stop => {
let mut stopped = Vec::new();
let mut not_running = Vec::new();
for name in targets {
let Some(entry) = reg.get(&name).await else {
return err_response(
id,
RpcErrorCode::ServerNotFound.as_i32(),
format!("no such server `{name}`"),
);
};
let (tx, rx) = oneshot::channel();
let _ = entry.handle.tx.send(SupervisorCmd::Stop { ack: tx }).await;
match rx.await {
Ok(StopAck::Stopped) => stopped.push(name),
Ok(StopAck::NotRunning) => not_running.push(name),
Err(_) => {
return err_response(
id,
RpcErrorCode::SpawnFailed.as_i32(),
format!("supervisor for `{name}` dropped"),
);
}
}
}
ok_response(
id,
serde_json::to_value(StopResult {
stopped,
not_running,
})
.unwrap(),
)
}
Op::Restart => {
let mut restarted = Vec::new();
for name in targets {
let Some(entry) = reg.get(&name).await else {
return err_response(
id,
RpcErrorCode::ServerNotFound.as_i32(),
format!("no such server `{name}`"),
);
};
let (tx, rx) = oneshot::channel();
let _ = entry
.handle
.tx
.send(SupervisorCmd::Restart { ack: tx })
.await;
let _ = rx.await;
restarted.push(name);
}
ok_response(
id,
serde_json::to_value(RestartResult { restarted }).unwrap(),
)
}
}
}
use xy_protocol::rpc::ReloadResult;
async fn reload(reg: &Registry) -> Result<ReloadResult, ApiError> {
let paths = crate::daemon::PATHS.get().ok_or_else(|| {
ApiError::rpc(RpcErrorCode::ConfigInvalid, "daemon paths not initialized")
})?;
let new_configs = xy_protocol::kdl_parse::load_all_configs(&paths.config_dir)
.map_err(|err| ApiError::rpc(RpcErrorCode::ConfigInvalid, err.to_string()))?;
use std::collections::HashMap;
let new_by_name: HashMap<String, xy_protocol::ServerConfig> = new_configs
.into_iter()
.map(|c| (c.name.clone(), c))
.collect();
let existing_names: Vec<String> = reg.names().await;
let mut added = Vec::new();
let mut removed = Vec::new();
let mut changed = Vec::new();
let mut unchanged = Vec::new();
for name in &existing_names {
if !new_by_name.contains_key(name)
&& let Some(entry) = reg.remove(name).await
{
let (tx, rx) = oneshot::channel();
let _ = entry
.handle
.tx
.send(SupervisorCmd::Shutdown { ack: tx })
.await;
let _ = rx.await;
removed.push(name.clone());
}
}
for (name, cfg) in new_by_name {
let new_hash = crate::daemon::config_hash(&cfg);
match reg.get(&name).await {
None => {
let handle = crate::daemon::spawn_supervisor(paths, cfg)
.map_err(|err| ApiError::rpc(RpcErrorCode::SpawnFailed, err.to_string()))?;
reg.insert(
name.clone(),
crate::daemon::registry::Entry {
handle: handle.clone(),
config_hash: new_hash,
},
)
.await;
let (tx, rx) = oneshot::channel();
let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
let _ = rx.await;
added.push(name);
}
Some(entry) if entry.config_hash != new_hash => {
let (tx, rx) = oneshot::channel();
let _ = entry
.handle
.tx
.send(SupervisorCmd::Shutdown { ack: tx })
.await;
let _ = rx.await;
reg.remove(&name).await;
let handle = crate::daemon::spawn_supervisor(paths, cfg)
.map_err(|err| ApiError::rpc(RpcErrorCode::SpawnFailed, err.to_string()))?;
reg.insert(
name.clone(),
crate::daemon::registry::Entry {
handle: handle.clone(),
config_hash: new_hash,
},
)
.await;
let (tx, rx) = oneshot::channel();
let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
let _ = rx.await;
changed.push(name);
}
Some(_) => unchanged.push(name),
}
}
Ok(ReloadResult {
added,
removed,
changed,
unchanged,
})
}
async fn start_log_stream(
reg: &Registry,
conn: Arc<Connection>,
state: Arc<ConnState>,
p: LogsParams,
) -> Result<(u64, LogReadyTx), ApiError> {
let Some(entry) = reg.get(&p.name).await else {
return Err(ApiError::rpc(
RpcErrorCode::ServerNotFound,
format!("no such server `{}`", p.name),
));
};
let sub_id = state.next.fetch_add(1, Ordering::Relaxed);
let sink = entry.handle.log_sink.clone();
let conn2 = conn.clone();
let state2 = state.clone();
let follow = p.follow;
let tail = p.tail;
let name = p.name.clone();
let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
let task = tokio::spawn(async move {
// Wait until `serve` has written the LOGS response before sending any
// LOG notifications. Without this the spawned task can race ahead of
// `conn.write_response` and emit notifications that the client
// discards while still awaiting the response.
let _ = ready_rx.await;
for line in sink.ring.snapshot_tail(tail) {
let n = xy_ipc::envelope::notification(
notifications::LOG,
Some(
serde_json::to_value(LogLine {
subscription_id: sub_id,
name: name.clone(),
stream: line.stream,
line: line.line,
ts_unix_ms: line.ts_unix_ms,
})
.unwrap(),
),
);
if conn2.write_notification(&n).await.is_err() {
return;
}
}
if !follow {
let end = xy_ipc::envelope::notification(
notifications::LOG_END,
Some(
serde_json::to_value(LogEnd {
subscription_id: sub_id,
})
.unwrap(),
),
);
let _ = conn2.write_notification(&end).await;
state2.subs.lock().await.remove(&sub_id);
return;
}
let mut rx = sink.broadcast.subscribe();
while let Ok(mut line) = rx.recv().await {
line.subscription_id = sub_id;
let n = xy_ipc::envelope::notification(
notifications::LOG,
Some(serde_json::to_value(&line).unwrap()),
);
if conn2.write_notification(&n).await.is_err() {
break;
}
}
});
state.subs.lock().await.insert(sub_id, task);
Ok((sub_id, ready_tx))
}
+147
View File
@@ -0,0 +1,147 @@
use crate::paths::Paths;
use crate::pidfile::PidFile;
use anyhow::{Context, Result};
use std::sync::{Arc, OnceLock};
use tokio::sync::{mpsc, oneshot, watch};
use tracing::{error, info};
use xy_ipc::{Connection, bind};
use xy_protocol::{ServerConfig, ServerState, kdl_parse::load_all_configs};
use xy_supervisor::{
logs::{LogSink, RotatingLogWriter},
supervisor::{RealSpawner, Status, SupervisorCmd, SupervisorHandle, SupervisorTask},
};
pub mod handlers;
pub mod registry;
pub mod shutdown;
const LOG_FILE_MAX_BYTES: u64 = 10 * 1024 * 1024;
const LOG_FILE_KEEP: usize = 5;
const RING_BUFFER_BYTES: usize = 1024 * 1024;
pub static PATHS: OnceLock<Paths> = OnceLock::new();
pub fn config_hash(cfg: &ServerConfig) -> u64 {
use std::hash::{Hash, Hasher};
let mut h = std::collections::hash_map::DefaultHasher::new();
serde_json::to_string(cfg).unwrap().hash(&mut h);
h.finish()
}
pub fn spawn_supervisor(paths: &Paths, cfg: ServerConfig) -> Result<SupervisorHandle> {
let log_path = paths.log_dir.join(format!("{}.log", cfg.name));
let writer = RotatingLogWriter::open(&log_path, LOG_FILE_MAX_BYTES, LOG_FILE_KEEP)
.with_context(|| format!("open log file {}", log_path.display()))?;
let sink = LogSink::new(cfg.name.clone(), writer, RING_BUFFER_BYTES);
let initial_status = Status {
state: ServerState::Stopped,
pid: None,
port: cfg.port,
uptime_secs: None,
restart_count: 0,
last_exit: None,
};
let (status_tx, status_rx) = watch::channel(initial_status);
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let name = cfg.name.clone();
let task = SupervisorTask::new(cfg, sink.clone(), RealSpawner, status_tx, cmd_rx);
tokio::spawn(task.run());
Ok(SupervisorHandle {
name,
tx: cmd_tx,
status: status_rx,
log_sink: sink,
})
}
pub async fn run(paths: Paths) -> Result<()> {
paths.ensure_dirs().context("create state dirs")?;
let _pid =
PidFile::acquire(&paths.pidfile).context("another xy daemon appears to be running")?;
let listener = bind(&paths.socket).context("bind unix socket")?;
let _ = PATHS.set(paths.clone());
info!(socket = %paths.socket.display(), "daemon listening");
let configs = load_all_configs(&paths.config_dir).context("load configs")?;
let registry = registry::Registry::new();
for cfg in configs {
let hash = config_hash(&cfg);
let handle = spawn_supervisor(&paths, cfg)?;
let name = handle.name.clone();
registry
.insert(
name.clone(),
registry::Entry {
handle: handle.clone(),
config_hash: hash,
},
)
.await;
let (ack_tx, ack_rx) = oneshot::channel();
if handle
.tx
.send(SupervisorCmd::Start { ack: ack_tx })
.await
.is_ok()
{
let _ = ack_rx.await;
}
}
let registry_for_shutdown = registry.clone();
let shutdown_signal = shutdown::install();
let accept = async {
loop {
let (stream, _addr) = match listener.accept().await {
Ok(p) => p,
Err(err) => {
error!(error = %err, "accept failed");
continue;
}
};
let conn = Arc::new(Connection::new(stream));
let reg = registry.clone();
let paths_clone = paths.clone();
tokio::spawn(async move {
if let Err(err) = handlers::serve(conn, reg, paths_clone).await {
error!(error = %err, "connection ended with error");
}
});
}
};
tokio::select! {
_ = accept => {}
_ = shutdown_signal => { info!("shutdown signal received"); }
}
shutdown::shutdown_all(registry_for_shutdown).await;
Ok(())
}
+42
View File
@@ -0,0 +1,42 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use xy_supervisor::SupervisorHandle;
#[derive(Clone)]
pub struct Entry {
pub handle: SupervisorHandle,
pub config_hash: u64,
}
#[derive(Clone, Default)]
pub struct Registry {
inner: Arc<RwLock<HashMap<String, Entry>>>,
}
impl Registry {
pub fn new() -> Self {
Self::default()
}
pub async fn insert(&self, name: String, entry: Entry) {
self.inner.write().await.insert(name, entry);
}
pub async fn remove(&self, name: &str) -> Option<Entry> {
self.inner.write().await.remove(name)
}
pub async fn get(&self, name: &str) -> Option<Entry> {
self.inner.read().await.get(name).cloned()
}
pub async fn names(&self) -> Vec<String> {
let g = self.inner.read().await;
let mut v: Vec<String> = g.keys().cloned().collect();
v.sort();
v
}
pub async fn snapshot(&self) -> Vec<(String, Entry)> {
let g = self.inner.read().await;
let mut v: Vec<_> = g.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
v.sort_by(|a, b| a.0.cmp(&b.0));
v
}
}
+45
View File
@@ -0,0 +1,45 @@
use crate::daemon::registry::Registry;
use tokio::signal::unix::{SignalKind, signal};
use tokio::sync::oneshot;
use xy_supervisor::supervisor::SupervisorCmd;
pub fn install() -> impl std::future::Future<Output = ()> {
let mut term = signal(SignalKind::terminate()).expect("install SIGTERM handler");
let mut int = signal(SignalKind::interrupt()).expect("install SIGINT handler");
async move {
tokio::select! {
_ = term.recv() => {}
_ = int.recv() => {}
}
}
}
pub async fn shutdown_all(reg: Registry) {
let snapshot = reg.snapshot().await;
let mut acks = Vec::new();
for (_name, entry) in &snapshot {
let (tx, rx) = oneshot::channel();
if entry
.handle
.tx
.send(SupervisorCmd::Shutdown { ack: tx })
.await
.is_ok()
{
acks.push(rx);
}
}
let deadline = tokio::time::Duration::from_secs(30);
let _ = tokio::time::timeout(deadline, async {
for rx in acks {
let _ = rx.await;
}
})
.await;
}
+93
View File
@@ -0,0 +1,93 @@
use clap::{Parser, Subcommand};
mod cli;
mod daemon;
mod paths;
mod pidfile;
#[derive(Debug, Parser)]
#[command(name = "xy", version, about = "HTTP MCP server supervisor")]
struct Cli {
#[command(subcommand)]
cmd: Cmd,
}
#[derive(Debug, Subcommand)]
enum Cmd {
/// Run the daemon in the foreground.
Daemon,
/// List all configured servers with state.
List,
/// Show detailed status for a single server.
Status { name: String },
/// Start a server (or all configured servers with --all).
Start {
#[arg(long, conflicts_with = "name")]
all: bool,
#[arg(required_unless_present = "all")]
name: Option<String>,
},
/// Stop a server (or --all).
Stop {
#[arg(long, conflicts_with = "name")]
all: bool,
#[arg(required_unless_present = "all")]
name: Option<String>,
},
/// Restart a server (or --all).
Restart {
#[arg(long, conflicts_with = "name")]
all: bool,
#[arg(required_unless_present = "all")]
name: Option<String>,
},
/// Re-read config dir and reconcile running servers.
Reload,
/// Stream a server's log.
Logs {
name: String,
#[arg(long)]
tail: Option<u32>,
#[arg(short = 'f', long)]
follow: bool,
},
}
#[tokio::main]
async fn main() -> std::process::ExitCode {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.with_writer(std::io::stderr)
.init();
let cli = Cli::parse();
let paths = match paths::Paths::resolve() {
Ok(p) => p,
Err(e) => {
eprintln!("xy: failed to resolve XDG paths: {e}");
return std::process::ExitCode::from(3);
}
};
let result: anyhow::Result<i32> = match cli.cmd {
Cmd::Daemon => daemon::run(paths).await.map(|_| 0),
Cmd::List => cli::list(paths).await,
Cmd::Status { name } => cli::status(paths, name).await,
Cmd::Start { all, name } => cli::start(paths, all, name).await,
Cmd::Stop { all, name } => cli::stop(paths, all, name).await,
Cmd::Restart { all, name } => cli::restart(paths, all, name).await,
Cmd::Reload => cli::reload(paths).await,
Cmd::Logs { name, tail, follow } => cli::logs(paths, name, tail, follow).await,
};
match result {
Ok(code) => std::process::ExitCode::from(code as u8),
Err(e) => {
eprintln!("xy: {e:#}");
std::process::ExitCode::from(1)
}
}
}
+69
View File
@@ -0,0 +1,69 @@
use etcetera::base_strategy::{BaseStrategy, Xdg};
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub struct Paths {
pub config_dir: PathBuf,
pub state_dir: PathBuf,
pub log_dir: PathBuf,
pub socket: PathBuf,
pub pidfile: PathBuf,
}
impl Paths {
pub fn resolve() -> std::io::Result<Self> {
let xdg = Xdg::new().map_err(std::io::Error::other)?;
let config_dir = xdg.config_dir().join("xy").join("servers");
let state_dir = xdg.state_dir().unwrap_or_else(|| xdg.data_dir()).join("xy");
let log_dir = state_dir.join("logs");
let socket = std::env::var_os("XDG_RUNTIME_DIR")
.map(|p| PathBuf::from(p).join("xy.sock"))
.unwrap_or_else(|| state_dir.join("xy.sock"));
let pidfile = state_dir.join("xy.pid");
Ok(Self {
config_dir,
state_dir,
log_dir,
socket,
pidfile,
})
}
pub fn ensure_dirs(&self) -> std::io::Result<()> {
std::fs::create_dir_all(&self.config_dir)?;
std::fs::create_dir_all(&self.state_dir)?;
std::fs::create_dir_all(&self.log_dir)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn resolves_without_panicking() {
let p = Paths::resolve().unwrap();
assert!(p.pidfile.starts_with(&p.state_dir));
}
#[test]
fn ensure_dirs_creates_all_paths() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let paths = Paths {
config_dir: root.join("config/xy/servers"),
state_dir: root.join("state/xy"),
log_dir: root.join("state/xy/logs"),
socket: root.join("run/xy.sock"),
pidfile: root.join("state/xy/xy.pid"),
};
paths.ensure_dirs().unwrap();
assert!(paths.config_dir.is_dir());
assert!(paths.state_dir.is_dir());
assert!(paths.log_dir.is_dir());
}
}
+149
View File
@@ -0,0 +1,149 @@
use std::fs::{File, OpenOptions};
use std::io::{ErrorKind, Write};
use std::os::unix::fs::OpenOptionsExt;
use std::path::{Path, PathBuf};
use nix::{errno::Errno, sys::signal, unistd::Pid};
#[derive(Debug)]
pub struct PidFile {
path: PathBuf,
_file: File,
}
impl PidFile {
pub fn acquire(path: &Path) -> std::io::Result<Self> {
for _ in 0..3 {
match Self::create(path) {
Err(err) if err.kind() == ErrorKind::AlreadyExists => match read_live_pid(path)? {
Some(pid) => {
return Err(std::io::Error::new(
ErrorKind::AlreadyExists,
format!("pidfile {} held by live process {pid}", path.display()),
));
}
None => match std::fs::remove_file(path) {
Ok(()) => continue,
Err(err) if err.kind() == ErrorKind::NotFound => continue,
Err(err) => return Err(err),
},
},
result => return result,
}
}
Err(std::io::Error::new(
ErrorKind::AlreadyExists,
format!("pidfile {} keeps reappearing", path.display()),
))
}
fn create(path: &Path) -> std::io::Result<Self> {
let mut f = OpenOptions::new()
.write(true)
.create_new(true)
.mode(0o600)
.open(path)?;
writeln!(f, "{}", std::process::id())?;
Ok(Self {
path: path.to_path_buf(),
_file: f,
})
}
}
fn read_live_pid(path: &Path) -> std::io::Result<Option<u32>> {
let content = match std::fs::read_to_string(path) {
Ok(content) => content,
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(err),
};
let Ok(pid) = content.trim().parse::<u32>() else {
return Ok(None);
};
match signal::kill(Pid::from_raw(pid as i32), None) {
Err(Errno::ESRCH) => Ok(None),
_ => Ok(Some(pid)),
}
}
impl Drop for PidFile {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn acquire_then_second_fails() {
let dir = tempdir().unwrap();
let p = dir.path().join("x.pid");
let _g = PidFile::acquire(&p).unwrap();
let err = PidFile::acquire(&p).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::AlreadyExists);
}
#[test]
fn drop_removes_file() {
let dir = tempdir().unwrap();
let p = dir.path().join("x.pid");
{
let _g = PidFile::acquire(&p).unwrap();
assert!(p.exists());
}
assert!(!p.exists());
}
fn dead_pid() -> u32 {
let mut child = std::process::Command::new("true").spawn().unwrap();
child.wait().unwrap();
child.id()
}
#[test]
fn acquire_replaces_stale_pidfile() {
let dir = tempdir().unwrap();
let p = dir.path().join("x.pid");
std::fs::write(&p, format!("{}\n", dead_pid())).unwrap();
let _g = PidFile::acquire(&p).unwrap();
let content = std::fs::read_to_string(&p).unwrap();
assert_eq!(content.trim(), std::process::id().to_string());
}
#[test]
fn acquire_replaces_garbage_pidfile() {
let dir = tempdir().unwrap();
let p = dir.path().join("x.pid");
std::fs::write(&p, "not a pid\n").unwrap();
let _g = PidFile::acquire(&p).unwrap();
let content = std::fs::read_to_string(&p).unwrap();
assert_eq!(content.trim(), std::process::id().to_string());
}
#[test]
fn acquire_fails_when_pid_alive() {
let dir = tempdir().unwrap();
let p = dir.path().join("x.pid");
std::fs::write(&p, format!("{}\n", std::process::id())).unwrap();
let err = PidFile::acquire(&p).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::AlreadyExists);
assert!(p.exists());
}
}
+102
View File
@@ -0,0 +1,102 @@
#![allow(dead_code)] // not all helpers are used by every test file
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use tempfile::TempDir;
use tokio::process::{Child, Command};
pub struct Harness {
pub tmp: TempDir,
pub config_dir: PathBuf,
pub state_dir: PathBuf,
pub socket: PathBuf,
pub daemon: Option<Child>,
}
impl Harness {
pub fn new() -> Self {
let tmp = tempfile::tempdir().expect("tempdir");
std::fs::create_dir_all(tmp.path().join("config")).unwrap();
std::fs::create_dir_all(tmp.path().join("state")).unwrap();
std::fs::create_dir_all(tmp.path().join("run")).unwrap();
let config_dir = tmp.path().join("config/xy/servers");
let state_dir = tmp.path().join("state/xy");
std::fs::create_dir_all(&config_dir).unwrap();
std::fs::create_dir_all(&state_dir).unwrap();
let socket = tmp.path().join("run/xy.sock");
Self {
tmp,
config_dir,
state_dir,
socket,
daemon: None,
}
}
pub fn write_server(&self, name: &str, command: &str, port: u16, restart_policy: &str) {
let body = format!(
"command \"{command}\"\nport {port}\nrestart {{ policy \"{restart_policy}\" backoff-initial \"10ms\" backoff-max \"50ms\" max-retries-per-minute 3 }}\nstop {{ grace \"500ms\" }}\n"
);
std::fs::write(self.config_dir.join(format!("{name}.kdl")), body).unwrap();
}
pub async fn start_daemon(&mut self, xy_bin: &PathBuf) {
let child = Command::new(xy_bin)
.arg("daemon")
.env("XDG_CONFIG_HOME", self.tmp.path().join("config"))
.env("XDG_STATE_HOME", self.tmp.path().join("state"))
.env("XDG_RUNTIME_DIR", self.tmp.path().join("run"))
.stdout(Stdio::null())
.stderr(Stdio::inherit())
.kill_on_drop(true)
.spawn()
.expect("spawn daemon");
self.daemon = Some(child);
let deadline = std::time::Instant::now() + Duration::from_secs(5);
while !self.socket.exists() {
if std::time::Instant::now() > deadline {
panic!("daemon socket never appeared");
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
}
pub async fn run_cli(&self, xy_bin: &PathBuf, args: &[&str]) -> (i32, String, String) {
let out = Command::new(xy_bin)
.args(args)
.env("XDG_CONFIG_HOME", self.tmp.path().join("config"))
.env("XDG_STATE_HOME", self.tmp.path().join("state"))
.env("XDG_RUNTIME_DIR", self.tmp.path().join("run"))
.output()
.await
.expect("run cli");
let code = out.status.code().unwrap_or(-1);
let stdout = String::from_utf8_lossy(&out.stdout).to_string();
let stderr = String::from_utf8_lossy(&out.stderr).to_string();
(code, stdout, stderr)
}
}
pub fn xy_bin() -> PathBuf {
artifact("xy")
}
pub fn sleep_server_bin() -> PathBuf {
artifact("xy-test-sleep-server")
}
pub fn exit_failure_bin() -> PathBuf {
artifact("xy-test-exit-failure")
}
fn artifact(name: &str) -> PathBuf {
let mut p = std::env::current_exe().unwrap();
p.pop();
if p.ends_with("deps") {
p.pop();
}
p.push(name);
if !p.exists() {
panic!("artifact `{}` not found at {}", name, p.display());
}
p
}
+31
View File
@@ -0,0 +1,31 @@
mod common;
use common::*;
#[tokio::test]
async fn auto_starts_on_boot_then_stop_and_start() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("alpha", sleeper.to_str().unwrap(), 19_001, "always");
h.start_daemon(&xy).await;
let mut last_stdout = String::new();
for _ in 0..40 {
let (_c, out, _e) = h.run_cli(&xy, &["list"]).await;
last_stdout = out;
if last_stdout.contains("alpha") && last_stdout.contains("running") {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert!(last_stdout.contains("alpha"), "stdout: {last_stdout}");
assert!(last_stdout.contains("running"), "stdout: {last_stdout}");
let (code, out, _e) = h.run_cli(&xy, &["stop", "alpha"]).await;
assert_eq!(code, 0);
assert!(out.contains("stopped: alpha"), "stdout: {out}");
let (code, out, _e) = h.run_cli(&xy, &["start", "alpha"]).await;
assert_eq!(code, 0);
assert!(out.contains("started: alpha"), "stdout: {out}");
}
+49
View File
@@ -0,0 +1,49 @@
mod common;
use common::*;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
#[tokio::test]
async fn logs_tail_prints_existing_lines() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("svc", sleeper.to_str().unwrap(), 19_030, "always");
h.start_daemon(&xy).await;
tokio::time::sleep(Duration::from_millis(500)).await;
let (code, out, _e) = h.run_cli(&xy, &["logs", "svc", "--tail", "10"]).await;
assert_eq!(code, 0);
assert!(out.contains("ready"), "stdout: {out}");
}
#[tokio::test]
async fn logs_follow_streams_new_lines() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("svc", sleeper.to_str().unwrap(), 19_031, "always");
h.start_daemon(&xy).await;
let mut child = Command::new(&xy)
.args(["logs", "svc", "--follow"])
.env("XDG_CONFIG_HOME", h.tmp.path().join("config"))
.env("XDG_STATE_HOME", h.tmp.path().join("state"))
.env("XDG_RUNTIME_DIR", h.tmp.path().join("run"))
.stdout(Stdio::piped())
.stderr(Stdio::null())
.kill_on_drop(true)
.spawn()
.unwrap();
let stdout = child.stdout.take().unwrap();
let mut lines = BufReader::new(stdout).lines();
let first = tokio::time::timeout(Duration::from_secs(2), lines.next_line())
.await
.expect("timeout waiting for first log line")
.unwrap();
assert!(first.is_some());
let _ = tokio::time::timeout(Duration::from_secs(2), child.kill()).await;
}
+37
View File
@@ -0,0 +1,37 @@
mod common;
use common::*;
#[tokio::test]
async fn reload_adds_removes_and_changes() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("a", sleeper.to_str().unwrap(), 19_020, "always");
h.start_daemon(&xy).await;
for _ in 0..40 {
let (_c, out, _e) = h.run_cli(&xy, &["list"]).await;
if out.contains("a") && out.contains("running") {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
h.write_server("a", sleeper.to_str().unwrap(), 19_021, "always");
h.write_server("b", sleeper.to_str().unwrap(), 19_022, "always");
let (code, out, _e) = h.run_cli(&xy, &["reload"]).await;
assert_eq!(code, 0);
assert!(out.contains("added:") && out.contains("b"), "stdout: {out}");
assert!(
out.contains("changed:") && out.contains("a"),
"stdout: {out}"
);
std::fs::remove_file(h.config_dir.join("a.kdl")).unwrap();
let (code, out, _e) = h.run_cli(&xy, &["reload"]).await;
assert_eq!(code, 0);
assert!(
out.contains("removed:") && out.contains("a"),
"stdout: {out}"
);
}
+22
View File
@@ -0,0 +1,22 @@
mod common;
use common::*;
#[tokio::test]
async fn restart_cap_marks_failed() {
let xy = xy_bin();
let bad = exit_failure_bin();
let mut h = Harness::new();
h.write_server("flaky", bad.to_str().unwrap(), 19_010, "always");
h.start_daemon(&xy).await;
let mut saw_failed = false;
for _ in 0..60 {
let (_c, out, _e) = h.run_cli(&xy, &["list"]).await;
if out.contains("flaky") && out.contains("failed") {
saw_failed = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert!(saw_failed, "flaky never reached failed state");
}
+15
View File
@@ -0,0 +1,15 @@
command "/Users/you/.cargo/bin/insikt-mcp"
args "--http" "--port" "8421"
port 8421
env {
RUST_LOG "info"
}
restart {
policy "on-failure"
}
stop {
grace "10s"
}
-3
View File
@@ -1,3 +0,0 @@
fn main() {
println!("Hello, world!");
}