Merge feat/mcp-supervisor: HTTP MCP server supervisor MVP

37 planned tasks plus 3 follow-up fixes from final code review.

Architecture:
- Cargo workspace: xy-protocol, xy-supervisor, xy-ipc, xy (single binary)
- Unix socket + newline-delimited JSON-RPC 2.0
- Per-server KDL configs at XDG paths (XDG on macOS via etcetera)
- One supervisor task per managed server, owning all state
- Per-server log capture: rotating disk + ring buffer + broadcast stream

Features:
- Daemon auto-launches all configured servers on boot
- start/stop/restart (single or --all), reload (diff added/removed/changed),
  list/status, logs (--tail / --follow)
- Per-server restart policy (always/on-failure/never) with exponential
  backoff, sliding 60s retry window, and Failed state on cap
- Graceful shutdown via SIGTERM/SIGINT, SIGKILL escalation after grace
- 51 tests: unit (state machine via MockChild, KDL parser, framing) +
  integration (real daemon + helper bins exercising lifecycle/reload/
  restart-cap/logs)

Bugs found and fixed during execution:
- Connection deadlock from single shared read/write mutex (split into
  separate reader/writer halves)
- LOGS response vs notification ordering race (oneshot gate)
- StartAck::Started returned even on spawn failure (added SpawnFailed)
- Backoff sleep blocked the supervisor's command channel (interruptible
  select)
- list/status returned zeroed fields (now publish full Status via watch)
This commit is contained in:
2026-05-25 13:22:28 +02:00
44 changed files with 5189 additions and 8 deletions
+1
View File
@@ -1 +1,2 @@
/target
*.rlib
Generated
+1140
View File
File diff suppressed because it is too large Load Diff
+34 -5
View File
@@ -1,6 +1,35 @@
[package]
name = "xy"
version = "0.1.0"
edition = "2024"
[workspace]
resolver = "3"
members = [
"crates/xy-protocol",
"crates/xy-supervisor",
"crates/xy-ipc",
"crates/xy",
]
[dependencies]
[workspace.package]
edition = "2024"
version = "0.1.0"
license = "MIT OR Apache-2.0"
[workspace.dependencies]
xy-protocol = { path = "crates/xy-protocol" }
xy-supervisor = { path = "crates/xy-supervisor" }
xy-ipc = { path = "crates/xy-ipc" }
tokio = { version = "1", features = ["rt-multi-thread", "net", "process", "signal", "sync", "fs", "io-util", "macros", "time"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "2"
anyhow = "1"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
clap = { version = "4", features = ["derive"] }
kdl = "6"
etcetera = "0.10"
nix = { version = "0.30", features = ["signal", "process"] }
humantime = "2"
humantime-serde = "1"
async-trait = "0.1"
tempfile = "3"
tokio-test = "0.4"
+26
View File
@@ -0,0 +1,26 @@
# xy — HTTP MCP server supervisor
Daemon + CLI that launches and supervises HTTP-based MCP servers.
## Build
cargo build --release
## Run
target/release/xy daemon # foreground
Drop a server definition into `$XDG_CONFIG_HOME/xy/servers/<name>.kdl`
(see `examples/insikt.kdl`) and `xy reload`.
Commands:
xy list
xy status <name>
xy start <name|--all>
xy stop <name|--all>
xy restart <name|--all>
xy reload
xy logs <name> [--tail N] [--follow]
Exit codes: 0 success, 1 operational error, 2 daemon unreachable, 3 config invalid.
+16
View File
@@ -0,0 +1,16 @@
[package]
name = "xy-ipc"
edition.workspace = true
version.workspace = true
license.workspace = true
[dependencies]
xy-protocol.workspace = true
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
thiserror.workspace = true
[dev-dependencies]
tempfile.workspace = true
+127
View File
@@ -0,0 +1,127 @@
use crate::envelope::{Incoming, Notification, Request};
use crate::framing::JsonFramed;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::path::Path;
use thiserror::Error;
use tokio::net::UnixStream;
#[derive(Debug, Error)]
pub enum ClientError {
#[error("io: {0}")]
Io(#[from] std::io::Error),
#[error("rpc error {code}: {message}")]
Rpc { code: i32, message: String },
#[error("unexpected message kind from daemon")]
Unexpected,
#[error("daemon unreachable: {0}")]
Unreachable(std::io::Error),
#[error("serialization: {0}")]
Serde(#[from] serde_json::Error),
}
pub struct Client {
framed: JsonFramed,
next_id: u64,
}
impl Client {
pub async fn connect(socket_path: &Path) -> Result<Self, ClientError> {
let stream = UnixStream::connect(socket_path)
.await
.map_err(ClientError::Unreachable)?;
Ok(Self {
framed: JsonFramed::new(stream),
next_id: 1,
})
}
pub async fn call<P: Serialize, R: DeserializeOwned>(
&mut self,
method: &str,
params: &P,
) -> Result<R, ClientError> {
let id = self.next_id;
self.next_id += 1;
let params_val = serde_json::to_value(params)?;
let req = crate::envelope::request(id, method, Some(params_val));
self.framed.write(&req).await?;
loop {
let msg: Option<Incoming> = self.framed.read().await?;
let Some(msg) = msg else {
return Err(ClientError::Unreachable(std::io::Error::from(
std::io::ErrorKind::UnexpectedEof,
)));
};
match msg {
Incoming::Response(r) => {
if r.id != serde_json::json!(id) {
return Err(ClientError::Unexpected);
}
if let Some(err) = r.error {
return Err(ClientError::Rpc {
code: err.code,
message: err.message,
});
}
let result = r.result.unwrap_or(serde_json::Value::Null);
return Ok(serde_json::from_value(result)?);
}
Incoming::Notification(_) => continue,
Incoming::Request(_) => return Err(ClientError::Unexpected),
}
}
}
pub async fn call_no_params<R: DeserializeOwned>(
&mut self,
method: &str,
) -> Result<R, ClientError> {
let id = self.next_id;
self.next_id += 1;
let req = Request {
jsonrpc: "2.0".into(),
id: serde_json::json!(id),
method: method.into(),
params: None,
};
self.framed.write(&req).await?;
let msg: Option<Incoming> = self.framed.read().await?;
let Some(Incoming::Response(r)) = msg else {
return Err(ClientError::Unexpected);
};
if let Some(err) = r.error {
return Err(ClientError::Rpc {
code: err.code,
message: err.message,
});
}
Ok(serde_json::from_value(
r.result.unwrap_or(serde_json::Value::Null),
)?)
}
pub async fn read_notification(&mut self) -> Result<Option<Notification>, ClientError> {
loop {
let msg: Option<Incoming> = self.framed.read().await?;
match msg {
None => return Ok(None),
Some(Incoming::Notification(n)) => return Ok(Some(n)),
Some(Incoming::Response(_)) => continue,
Some(Incoming::Request(_)) => return Err(ClientError::Unexpected),
}
}
}
pub async fn send_notification(&mut self, n: &Notification) -> Result<(), ClientError> {
self.framed.write(n).await?;
Ok(())
}
}
+114
View File
@@ -0,0 +1,114 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Request {
pub jsonrpc: String,
pub id: serde_json::Value,
pub method: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub params: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Notification {
pub jsonrpc: String,
pub method: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub params: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Response {
pub jsonrpc: String,
pub id: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<RpcError>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpcError {
pub code: i32,
pub message: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Incoming {
Request(Request),
Response(Response),
Notification(Notification),
}
pub const JSONRPC_VERSION: &str = "2.0";
pub fn request(id: u64, method: &str, params: Option<Value>) -> Request {
Request {
jsonrpc: JSONRPC_VERSION.into(),
id: serde_json::json!(id),
method: method.into(),
params,
}
}
pub fn notification(method: &str, params: Option<Value>) -> Notification {
Notification {
jsonrpc: JSONRPC_VERSION.into(),
method: method.into(),
params,
}
}
pub fn ok_response(id: serde_json::Value, result: Value) -> Response {
Response {
jsonrpc: JSONRPC_VERSION.into(),
id,
result: Some(result),
error: None,
}
}
pub fn err_response(id: serde_json::Value, code: i32, message: String) -> Response {
Response {
jsonrpc: JSONRPC_VERSION.into(),
id,
result: None,
error: Some(RpcError {
code,
message,
data: None,
}),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn request_round_trip() {
let r = request(7, "list", None);
let s = serde_json::to_string(&r).unwrap();
let back: Request = serde_json::from_str(&s).unwrap();
assert_eq!(back.method, "list");
assert_eq!(back.id, serde_json::json!(7));
}
#[test]
fn incoming_is_response() {
let s = r#"{"jsonrpc":"2.0","id":1,"result":{"ok":true}}"#;
let i: Incoming = serde_json::from_str(s).unwrap();
assert!(matches!(i, Incoming::Response(_)));
}
#[test]
fn incoming_is_notification() {
let s = r#"{"jsonrpc":"2.0","method":"log","params":{}}"#;
let i: Incoming = serde_json::from_str(s).unwrap();
assert!(matches!(i, Incoming::Notification(_)));
}
}
+120
View File
@@ -0,0 +1,120 @@
use serde::Serialize;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
pub struct JsonFramed {
reader: BufReader<tokio::net::unix::OwnedReadHalf>,
writer: tokio::net::unix::OwnedWriteHalf,
}
impl JsonFramed {
pub fn new(stream: UnixStream) -> Self {
let (r, w) = stream.into_split();
Self {
reader: BufReader::new(r),
writer: w,
}
}
pub async fn read<T: serde::de::DeserializeOwned>(&mut self) -> std::io::Result<Option<T>> {
let mut buf = String::new();
let n = self.reader.read_line(&mut buf).await?;
if n == 0 {
return Ok(None);
}
let v: T = serde_json::from_str(buf.trim_end())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
Ok(Some(v))
}
pub async fn write<T: Serialize>(&mut self, value: &T) -> std::io::Result<()> {
let mut bytes = serde_json::to_vec(value)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
bytes.push(b'\n');
self.writer.write_all(&bytes).await?;
self.writer.flush().await
}
}
pub struct JsonFramedReader {
inner: BufReader<tokio::net::unix::OwnedReadHalf>,
}
impl JsonFramedReader {
pub async fn read<T: serde::de::DeserializeOwned>(&mut self) -> std::io::Result<Option<T>> {
let mut buf = String::new();
let n = self.inner.read_line(&mut buf).await?;
if n == 0 {
return Ok(None);
}
let v: T = serde_json::from_str(buf.trim_end())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
Ok(Some(v))
}
}
pub struct JsonFramedWriter {
inner: tokio::net::unix::OwnedWriteHalf,
}
impl JsonFramedWriter {
pub async fn write<T: Serialize>(&mut self, value: &T) -> std::io::Result<()> {
let mut bytes = serde_json::to_vec(value)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
bytes.push(b'\n');
self.inner.write_all(&bytes).await?;
self.inner.flush().await
}
}
pub fn split(stream: UnixStream) -> (JsonFramedReader, JsonFramedWriter) {
let (r, w) = stream.into_split();
(
JsonFramedReader {
inner: BufReader::new(r),
},
JsonFramedWriter { inner: w },
)
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct M {
x: u32,
name: String,
}
#[tokio::test]
async fn round_trip_over_socket_pair() {
let (a, b) = UnixStream::pair().unwrap();
let mut sa = JsonFramed::new(a);
let mut sb = JsonFramed::new(b);
sa.write(&M {
x: 1,
name: "hi".into(),
})
.await
.unwrap();
let got: Option<M> = sb.read().await.unwrap();
assert_eq!(
got,
Some(M {
x: 1,
name: "hi".into()
})
);
}
#[tokio::test]
async fn eof_returns_none() {
let (a, b) = UnixStream::pair().unwrap();
drop(a);
let mut sb = JsonFramed::new(b);
let got: Option<M> = sb.read().await.unwrap();
assert!(got.is_none());
}
}
+14
View File
@@ -0,0 +1,14 @@
//! JSON-RPC 2.0 over newline-delimited JSON on a Unix socket.
pub mod client;
pub mod envelope;
pub mod framing;
pub mod server;
pub use client::{Client, ClientError};
pub use envelope::{
Incoming, Notification, Request, Response, RpcError, err_response, notification, ok_response,
request,
};
pub use framing::JsonFramed;
pub use server::{Connection, bind};
+70
View File
@@ -0,0 +1,70 @@
use crate::envelope::{Incoming, Notification, Response};
use crate::framing::{JsonFramedReader, JsonFramedWriter};
use std::path::Path;
use std::sync::Arc;
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::Mutex;
pub struct Connection {
reader: Mutex<JsonFramedReader>,
writer: Arc<Mutex<JsonFramedWriter>>,
}
impl Connection {
pub fn new(stream: UnixStream) -> Self {
let (reader, writer) = crate::framing::split(stream);
Self {
reader: Mutex::new(reader),
writer: Arc::new(Mutex::new(writer)),
}
}
pub async fn read_incoming(&self) -> std::io::Result<Option<Incoming>> {
let mut g = self.reader.lock().await;
g.read::<Incoming>().await
}
pub async fn write_response(&self, r: &Response) -> std::io::Result<()> {
let mut g = self.writer.lock().await;
g.write(r).await
}
pub async fn write_notification(&self, n: &Notification) -> std::io::Result<()> {
let mut g = self.writer.lock().await;
g.write(n).await
}
}
pub fn bind(socket_path: &Path) -> std::io::Result<UnixListener> {
if socket_path.exists() {
std::fs::remove_file(socket_path)?;
}
if let Some(parent) = socket_path.parent() {
std::fs::create_dir_all(parent)?;
}
let listener = UnixListener::bind(socket_path)?;
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(socket_path, std::fs::Permissions::from_mode(0o600))?;
Ok(listener)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn bind_creates_socket_with_0600() {
let dir = tempdir().unwrap();
let path = dir.path().join("x.sock");
let _listener = bind(&path).unwrap();
use std::os::unix::fs::PermissionsExt;
let mode = std::fs::metadata(&path).unwrap().permissions().mode() & 0o777;
assert_eq!(mode, 0o600);
}
}
+16
View File
@@ -0,0 +1,16 @@
[package]
name = "xy-protocol"
edition.workspace = true
version.workspace = true
license.workspace = true
[dependencies]
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
kdl.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
[dev-dependencies]
tempfile.workspace = true
+103
View File
@@ -0,0 +1,103 @@
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum RestartPolicy {
Always,
#[default]
OnFailure,
Never,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RestartConfig {
#[serde(default)]
pub policy: RestartPolicy,
#[serde(default = "default_backoff_initial", with = "humantime_serde")]
pub backoff_initial: Duration,
#[serde(default = "default_backoff_max", with = "humantime_serde")]
pub backoff_max: Duration,
#[serde(default = "default_max_retries_per_minute")]
pub max_retries_per_minute: u32,
}
fn default_backoff_initial() -> Duration {
Duration::from_secs(1)
}
fn default_backoff_max() -> Duration {
Duration::from_secs(30)
}
fn default_max_retries_per_minute() -> u32 {
5
}
impl Default for RestartConfig {
fn default() -> Self {
Self {
policy: RestartPolicy::default(),
backoff_initial: default_backoff_initial(),
backoff_max: default_backoff_max(),
max_retries_per_minute: default_max_retries_per_minute(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StopConfig {
#[serde(default = "default_grace", with = "humantime_serde")]
pub grace: Duration,
}
fn default_grace() -> Duration {
Duration::from_secs(10)
}
impl Default for StopConfig {
fn default() -> Self {
Self {
grace: default_grace(),
}
}
}
use std::collections::BTreeMap;
use std::path::PathBuf;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServerConfig {
pub name: String,
pub command: PathBuf,
#[serde(default)]
pub args: Vec<String>,
pub port: u16,
#[serde(default)]
pub env: BTreeMap<String, String>,
#[serde(default)]
pub working_dir: Option<PathBuf>,
#[serde(default)]
pub restart: RestartConfig,
#[serde(default)]
pub stop: StopConfig,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn restart_config_defaults() {
let c = RestartConfig::default();
assert_eq!(c.policy, RestartPolicy::OnFailure);
assert_eq!(c.backoff_initial, Duration::from_secs(1));
assert_eq!(c.backoff_max, Duration::from_secs(30));
assert_eq!(c.max_retries_per_minute, 5);
}
#[test]
fn stop_config_defaults() {
assert_eq!(StopConfig::default().grace, Duration::from_secs(10));
}
}
+51
View File
@@ -0,0 +1,51 @@
use std::path::PathBuf;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ConfigError {
#[error("failed to read {path}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("failed to parse KDL in {path}: {message}")]
Parse { path: PathBuf, message: String },
#[error("missing required field `{field}` in {path}")]
MissingField { path: PathBuf, field: &'static str },
#[error("invalid value for `{field}` in {path}: {message}")]
InvalidValue {
path: PathBuf,
field: &'static str,
message: String,
},
#[error("duplicate port {port} declared by both `{name_a}` and `{name_b}`")]
DuplicatePort {
name_a: String,
name_b: String,
port: u16,
},
#[error("server name `{name}` contains invalid characters (allowed: a-z, 0-9, '-', '_')")]
InvalidName { name: String },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RpcErrorCode {
ServerNotFound = -32001,
PortConflict = -32002,
ConfigInvalid = -32003,
AlreadyRunning = -32004,
NotRunning = -32005,
SpawnFailed = -32006,
}
impl RpcErrorCode {
pub fn as_i32(self) -> i32 {
self as i32
}
}
+447
View File
@@ -0,0 +1,447 @@
use crate::{ConfigError, RestartConfig, RestartPolicy, ServerConfig, StopConfig};
use kdl::{KdlDocument, KdlNode};
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::time::Duration;
pub fn parse_server_config(
name: &str,
text: &str,
source_path: &Path,
) -> Result<ServerConfig, ConfigError> {
validate_name(name).map_err(|_| ConfigError::InvalidName {
name: name.to_string(),
})?;
let doc: KdlDocument = text
.parse()
.map_err(|err: kdl::KdlError| ConfigError::Parse {
path: source_path.to_path_buf(),
message: err.to_string(),
})?;
let command = require_string_arg(&doc, "command", source_path)?;
let args = optional_string_args(&doc, "args");
let port = require_u16_arg(&doc, "port", source_path)?;
let env = optional_string_map(&doc, "env");
let working_dir = optional_string_arg(&doc, "working-dir").map(PathBuf::from);
let restart = parse_restart(&doc, source_path)?;
let stop = parse_stop(&doc, source_path)?;
Ok(ServerConfig {
name: name.to_string(),
command: PathBuf::from(command),
args,
port,
env,
working_dir,
restart,
stop,
})
}
fn validate_name(name: &str) -> Result<(), ()> {
if name.is_empty() {
return Err(());
}
if name
.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '_')
{
Ok(())
} else {
Err(())
}
}
fn require_string_arg(
doc: &KdlDocument,
name: &'static str,
path: &Path,
) -> Result<String, ConfigError> {
let node = find_node(doc, name).ok_or(ConfigError::MissingField {
path: path.to_path_buf(),
field: name,
})?;
node.entries()
.first()
.and_then(|e| e.value().as_string().map(str::to_string))
.ok_or(ConfigError::InvalidValue {
path: path.to_path_buf(),
field: name,
message: "expected string argument".into(),
})
}
fn require_u16_arg(doc: &KdlDocument, name: &'static str, path: &Path) -> Result<u16, ConfigError> {
let node = find_node(doc, name).ok_or(ConfigError::MissingField {
path: path.to_path_buf(),
field: name,
})?;
let v = node
.entries()
.first()
.and_then(|e| e.value().as_integer())
.ok_or(ConfigError::InvalidValue {
path: path.to_path_buf(),
field: name,
message: "expected integer".into(),
})?;
u16::try_from(v).map_err(|_| ConfigError::InvalidValue {
path: path.to_path_buf(),
field: name,
message: format!("port {v} out of u16 range"),
})
}
fn optional_string_arg(doc: &KdlDocument, name: &str) -> Option<String> {
find_node(doc, name)
.and_then(|n| n.entries().first())
.and_then(|e| e.value().as_string().map(str::to_string))
}
fn optional_string_args(doc: &KdlDocument, name: &str) -> Vec<String> {
find_node(doc, name)
.map(|n| {
n.entries()
.iter()
.filter_map(|e| e.value().as_string().map(str::to_string))
.collect()
})
.unwrap_or_default()
}
fn optional_string_map(doc: &KdlDocument, name: &str) -> BTreeMap<String, String> {
let Some(node) = find_node(doc, name) else {
return BTreeMap::new();
};
let mut out = BTreeMap::new();
if let Some(children) = node.children() {
for child in children.nodes() {
let key = child.name().value().to_string();
if let Some(val) = child.entries().first().and_then(|e| e.value().as_string()) {
out.insert(key, val.to_string());
}
}
}
out
}
fn parse_restart(doc: &KdlDocument, path: &Path) -> Result<RestartConfig, ConfigError> {
let Some(node) = find_node(doc, "restart") else {
return Ok(RestartConfig::default());
};
let Some(children) = node.children() else {
return Ok(RestartConfig::default());
};
let mut out = RestartConfig::default();
for child in children.nodes() {
match child.name().value() {
"policy" => {
let s = string_arg(child, "policy", path)?;
out.policy = match s.as_str() {
"always" => RestartPolicy::Always,
"on-failure" => RestartPolicy::OnFailure,
"never" => RestartPolicy::Never,
other => {
return Err(ConfigError::InvalidValue {
path: path.to_path_buf(),
field: "restart.policy",
message: format!("unknown policy `{other}`"),
});
}
};
}
"backoff-initial" => {
out.backoff_initial = parse_duration_arg(child, "restart.backoff-initial", path)?;
}
"backoff-max" => {
out.backoff_max = parse_duration_arg(child, "restart.backoff-max", path)?;
}
"max-retries-per-minute" => {
let v = child
.entries()
.first()
.and_then(|e| e.value().as_integer())
.ok_or(ConfigError::InvalidValue {
path: path.to_path_buf(),
field: "restart.max-retries-per-minute",
message: "expected integer".into(),
})?;
out.max_retries_per_minute =
u32::try_from(v).map_err(|_| ConfigError::InvalidValue {
path: path.to_path_buf(),
field: "restart.max-retries-per-minute",
message: format!("out of u32 range: {v}"),
})?;
}
other => {
return Err(ConfigError::InvalidValue {
path: path.to_path_buf(),
field: "restart",
message: format!("unknown key `{other}`"),
});
}
}
}
Ok(out)
}
fn parse_stop(doc: &KdlDocument, path: &Path) -> Result<StopConfig, ConfigError> {
let Some(node) = find_node(doc, "stop") else {
return Ok(StopConfig::default());
};
let Some(children) = node.children() else {
return Ok(StopConfig::default());
};
let mut out = StopConfig::default();
for child in children.nodes() {
match child.name().value() {
"grace" => {
out.grace = parse_duration_arg(child, "stop.grace", path)?;
}
other => {
return Err(ConfigError::InvalidValue {
path: path.to_path_buf(),
field: "stop",
message: format!("unknown key `{other}`"),
});
}
}
}
Ok(out)
}
fn string_arg(node: &KdlNode, field: &'static str, path: &Path) -> Result<String, ConfigError> {
node.entries()
.first()
.and_then(|e| e.value().as_string().map(str::to_string))
.ok_or(ConfigError::InvalidValue {
path: path.to_path_buf(),
field,
message: "expected string".into(),
})
}
fn parse_duration_arg(
node: &KdlNode,
field: &'static str,
path: &Path,
) -> Result<Duration, ConfigError> {
let s = string_arg(node, field, path)?;
humantime::parse_duration(&s).map_err(|err| ConfigError::InvalidValue {
path: path.to_path_buf(),
field,
message: format!("invalid duration `{s}`: {err}"),
})
}
fn find_node<'a>(doc: &'a KdlDocument, name: &str) -> Option<&'a KdlNode> {
doc.nodes().iter().find(|n| n.name().value() == name)
}
pub fn load_all_configs(dir: &Path) -> Result<Vec<ServerConfig>, ConfigError> {
if !dir.exists() {
return Ok(Vec::new());
}
let entries = std::fs::read_dir(dir).map_err(|e| ConfigError::Io {
path: dir.to_path_buf(),
source: e,
})?;
let mut configs = Vec::new();
for entry in entries {
let entry = entry.map_err(|e| ConfigError::Io {
path: dir.to_path_buf(),
source: e,
})?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("kdl") {
continue;
}
let name = path
.file_stem()
.and_then(|s| s.to_str())
.ok_or(ConfigError::InvalidName {
name: path.display().to_string(),
})?
.to_string();
let text = std::fs::read_to_string(&path).map_err(|e| ConfigError::Io {
path: path.clone(),
source: e,
})?;
configs.push(parse_server_config(&name, &text, &path)?);
}
check_duplicate_ports(&configs)?;
Ok(configs)
}
fn check_duplicate_ports(configs: &[ServerConfig]) -> Result<(), ConfigError> {
let mut seen: std::collections::HashMap<u16, String> = std::collections::HashMap::new();
for c in configs {
if let Some(other) = seen.insert(c.port, c.name.clone()) {
return Err(ConfigError::DuplicatePort {
name_a: other,
name_b: c.name.clone(),
port: c.port,
});
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
fn p() -> &'static Path {
Path::new("/tmp/test.kdl")
}
#[test]
fn parses_minimal_config() {
let text = "command \"/usr/local/bin/foo\"\nport 8421\n";
let cfg = parse_server_config("foo", text, p()).unwrap();
assert_eq!(cfg.name, "foo");
assert_eq!(cfg.command, PathBuf::from("/usr/local/bin/foo"));
assert_eq!(cfg.port, 8421);
assert!(cfg.args.is_empty());
assert!(cfg.env.is_empty());
}
#[test]
fn parses_full_config() {
let text = r#"
command "/usr/local/bin/foo"
args "--http" "--port" "8421"
port 8421
env {
RUST_LOG "info"
FOO_BAR "baz"
}
working-dir "/tmp/work"
restart {
policy "always"
backoff-initial "2s"
backoff-max "1m"
max-retries-per-minute 10
}
stop {
grace "30s"
}
"#;
let cfg = parse_server_config("foo", text, p()).unwrap();
assert_eq!(cfg.args, vec!["--http", "--port", "8421"]);
assert_eq!(cfg.env.get("RUST_LOG").map(String::as_str), Some("info"));
assert_eq!(cfg.working_dir, Some(PathBuf::from("/tmp/work")));
assert_eq!(cfg.restart.policy, RestartPolicy::Always);
assert_eq!(cfg.restart.backoff_initial, Duration::from_secs(2));
assert_eq!(cfg.restart.backoff_max, Duration::from_secs(60));
assert_eq!(cfg.restart.max_retries_per_minute, 10);
assert_eq!(cfg.stop.grace, Duration::from_secs(30));
}
#[test]
fn missing_command_fails() {
let err = parse_server_config("foo", "port 8421", p()).unwrap_err();
assert!(matches!(
err,
ConfigError::MissingField {
field: "command",
..
}
));
}
#[test]
fn missing_port_fails() {
let err = parse_server_config("foo", "command \"/bin/x\"", p()).unwrap_err();
assert!(matches!(
err,
ConfigError::MissingField { field: "port", .. }
));
}
#[test]
fn unknown_restart_policy_fails() {
let text = "command \"/bin/x\"\nport 1\nrestart { policy \"maybe\" }";
let err = parse_server_config("foo", text, p()).unwrap_err();
assert!(matches!(
err,
ConfigError::InvalidValue {
field: "restart.policy",
..
}
));
}
#[test]
fn invalid_name_rejected() {
let text = "command \"/bin/x\"\nport 1";
let err = parse_server_config("Foo Bar", text, p()).unwrap_err();
assert!(matches!(err, ConfigError::InvalidName { .. }));
}
use std::fs;
use tempfile::tempdir;
#[test]
fn load_all_finds_and_parses_files() {
let dir = tempdir().unwrap();
fs::write(dir.path().join("a.kdl"), "command \"/bin/a\"\nport 8001").unwrap();
fs::write(dir.path().join("b.kdl"), "command \"/bin/b\"\nport 8002").unwrap();
fs::write(dir.path().join("ignored.txt"), "not a config").unwrap();
let mut configs = load_all_configs(dir.path()).unwrap();
configs.sort_by(|x, y| x.name.cmp(&y.name));
assert_eq!(configs.len(), 2);
assert_eq!(configs[0].name, "a");
assert_eq!(configs[1].port, 8002);
}
#[test]
fn load_all_returns_empty_for_missing_dir() {
let dir = tempdir().unwrap();
let configs = load_all_configs(&dir.path().join("does-not-exist")).unwrap();
assert!(configs.is_empty());
}
#[test]
fn duplicate_ports_detected() {
let dir = tempdir().unwrap();
fs::write(dir.path().join("a.kdl"), "command \"/bin/a\"\nport 8001").unwrap();
fs::write(dir.path().join("b.kdl"), "command \"/bin/b\"\nport 8001").unwrap();
let err = load_all_configs(dir.path()).unwrap_err();
match err {
ConfigError::DuplicatePort { port, .. } => assert_eq!(port, 8001),
other => panic!("unexpected error: {other:?}"),
}
}
}
+12
View File
@@ -0,0 +1,12 @@
//! Wire types and config schema shared between the xy daemon and CLI.
pub mod config;
pub mod error;
pub mod kdl_parse;
pub mod rpc;
pub mod state;
pub use config::{RestartConfig, RestartPolicy, ServerConfig, StopConfig};
pub use error::{ConfigError, RpcErrorCode};
pub use kdl_parse::{load_all_configs, parse_server_config};
pub use state::ServerState;
+148
View File
@@ -0,0 +1,148 @@
use crate::ServerState;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerSummary {
pub name: String,
pub state: ServerState,
#[serde(skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
pub port: u16,
#[serde(skip_serializing_if = "Option::is_none")]
pub uptime_secs: Option<u64>,
pub restart_count: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_exit: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusDetail {
#[serde(flatten)]
pub summary: ServerSummary,
pub recent_transitions: Vec<StateTransition>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateTransition {
pub from: ServerState,
pub to: ServerState,
pub at_unix_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum NameOrAll {
All { all: bool },
Name { name: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusParams {
pub name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StartResult {
pub started: Vec<String>,
pub already_running: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StopResult {
pub stopped: Vec<String>,
pub not_running: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RestartResult {
pub restarted: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReloadResult {
pub added: Vec<String>,
pub removed: Vec<String>,
pub changed: Vec<String>,
pub unchanged: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogsParams {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tail: Option<u32>,
#[serde(default)]
pub follow: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogsSubscribed {
pub subscription_id: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogsCancelParams {
pub subscription_id: u64,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum LogStream {
Stdout,
Stderr,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogLine {
pub subscription_id: u64,
pub name: String,
pub stream: LogStream,
pub line: String,
pub ts_unix_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEnd {
pub subscription_id: u64,
}
pub mod methods {
pub const LIST: &str = "list";
pub const STATUS: &str = "status";
pub const START: &str = "start";
pub const STOP: &str = "stop";
pub const RESTART: &str = "restart";
pub const RELOAD: &str = "reload";
pub const LOGS: &str = "logs";
pub const LOGS_CANCEL: &str = "logs_cancel";
}
pub mod notifications {
pub const LOG: &str = "log";
pub const LOG_END: &str = "log_end";
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn name_or_all_round_trips_name() {
let v: NameOrAll = serde_json::from_str(r#"{"name":"foo"}"#).unwrap();
match v {
NameOrAll::Name { name } => assert_eq!(name, "foo"),
_ => panic!("expected Name variant"),
}
}
#[test]
fn name_or_all_round_trips_all() {
let v: NameOrAll = serde_json::from_str(r#"{"all":true}"#).unwrap();
match v {
NameOrAll::All { all } => assert!(all),
_ => panic!("expected All variant"),
}
}
}
+31
View File
@@ -0,0 +1,31 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum ServerState {
Stopped,
Starting,
Running,
Restarting,
Failed,
Stopping,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn serializes_to_kebab_case() {
assert_eq!(
serde_json::to_string(&ServerState::Restarting).unwrap(),
"\"restarting\""
);
}
#[test]
fn deserializes_from_kebab_case() {
let s: ServerState = serde_json::from_str("\"failed\"").unwrap();
assert_eq!(s, ServerState::Failed);
}
}
+17
View File
@@ -0,0 +1,17 @@
[package]
name = "xy-supervisor"
edition.workspace = true
version.workspace = true
license.workspace = true
[dependencies]
xy-protocol.workspace = true
tokio.workspace = true
tracing.workspace = true
thiserror.workspace = true
nix.workspace = true
async-trait.workspace = true
[dev-dependencies]
tokio-test.workspace = true
tempfile.workspace = true
+71
View File
@@ -0,0 +1,71 @@
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct Backoff {
initial: Duration,
max: Duration,
current: Option<Duration>,
}
impl Backoff {
pub fn new(initial: Duration, max: Duration) -> Self {
Self {
initial,
max,
current: None,
}
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Duration {
let next = match self.current {
None => self.initial,
Some(d) => (d * 2).min(self.max),
};
self.current = Some(next);
next
}
pub fn reset(&mut self) {
self.current = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn starts_at_initial() {
let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30));
assert_eq!(b.next(), Duration::from_secs(1));
}
#[test]
fn doubles_each_call() {
let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30));
assert_eq!(b.next(), Duration::from_secs(1));
assert_eq!(b.next(), Duration::from_secs(2));
assert_eq!(b.next(), Duration::from_secs(4));
assert_eq!(b.next(), Duration::from_secs(8));
}
#[test]
fn caps_at_max() {
let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(5));
for _ in 0..10 {
b.next();
}
assert_eq!(b.next(), Duration::from_secs(5));
}
#[test]
fn reset_starts_over() {
let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30));
b.next();
b.next();
b.next();
b.reset();
assert_eq!(b.next(), Duration::from_secs(1));
}
}
+211
View File
@@ -0,0 +1,211 @@
use std::sync::Arc;
use tokio::sync::{Mutex, oneshot};
#[async_trait::async_trait]
pub trait ChildHandle: Send + 'static {
fn pid(&self) -> u32;
async fn wait(&mut self) -> std::io::Result<Option<i32>>;
fn terminate(&mut self) -> std::io::Result<()>;
fn kill(&mut self) -> std::io::Result<()>;
}
pub struct MockChild {
pid: u32,
exit_rx: Arc<Mutex<oneshot::Receiver<Option<i32>>>>,
terminate_tx: Option<oneshot::Sender<()>>,
kill_tx: Option<oneshot::Sender<()>>,
}
pub struct MockChildController {
pub exit_tx: Option<oneshot::Sender<Option<i32>>>,
pub terminate_rx: oneshot::Receiver<()>,
pub kill_rx: oneshot::Receiver<()>,
}
impl MockChild {
pub fn new(pid: u32) -> (Self, MockChildController) {
let (exit_tx, exit_rx) = oneshot::channel();
let (terminate_tx, terminate_rx) = oneshot::channel();
let (kill_tx, kill_rx) = oneshot::channel();
let child = Self {
pid,
exit_rx: Arc::new(Mutex::new(exit_rx)),
terminate_tx: Some(terminate_tx),
kill_tx: Some(kill_tx),
};
let ctl = MockChildController {
exit_tx: Some(exit_tx),
terminate_rx,
kill_rx,
};
(child, ctl)
}
}
#[async_trait::async_trait]
impl ChildHandle for MockChild {
fn pid(&self) -> u32 {
self.pid
}
async fn wait(&mut self) -> std::io::Result<Option<i32>> {
let mut rx = self.exit_rx.lock().await;
match (&mut *rx).await {
Ok(code) => Ok(code),
Err(_) => Err(std::io::Error::other("exit_tx dropped")),
}
}
fn terminate(&mut self) -> std::io::Result<()> {
if let Some(tx) = self.terminate_tx.take() {
let _ = tx.send(());
}
Ok(())
}
fn kill(&mut self) -> std::io::Result<()> {
if let Some(tx) = self.kill_tx.take() {
let _ = tx.send(());
}
Ok(())
}
}
use crate::logs::LogSink;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child as TokioChild, Command};
use xy_protocol::{ServerConfig, rpc::LogStream};
pub struct RealChild {
pid: u32,
pgid: Pid,
child: Option<TokioChild>,
}
impl RealChild {
pub fn pgid(&self) -> Pid {
self.pgid
}
}
#[async_trait::async_trait]
impl ChildHandle for RealChild {
fn pid(&self) -> u32 {
self.pid
}
async fn wait(&mut self) -> std::io::Result<Option<i32>> {
let child = self
.child
.as_mut()
.ok_or_else(|| std::io::Error::other("already waited"))?;
let status = child.wait().await?;
Ok(status.code())
}
fn terminate(&mut self) -> std::io::Result<()> {
kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGTERM)
.map_err(|err| std::io::Error::other(err.to_string()))
}
fn kill(&mut self) -> std::io::Result<()> {
kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGKILL)
.map_err(|err| std::io::Error::other(err.to_string()))
}
}
pub fn spawn_with_logs(cfg: &ServerConfig, sink: LogSink) -> std::io::Result<RealChild> {
let mut cmd = Command::new(&cfg.command);
cmd.args(&cfg.args);
for (k, v) in &cfg.env {
cmd.env(k, v);
}
if let Some(dir) = &cfg.working_dir {
cmd.current_dir(dir);
}
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.kill_on_drop(true);
// Own process group so signals reach the whole tree.
unsafe {
cmd.pre_exec(|| {
nix::unistd::setpgid(Pid::from_raw(0), Pid::from_raw(0))
.map_err(|err| std::io::Error::other(err.to_string()))
});
}
let mut child = cmd.spawn()?;
let pid = child.id().ok_or_else(|| std::io::Error::other("no pid"))?;
let pgid = Pid::from_raw(pid as i32);
if let Some(out) = child.stdout.take() {
spawn_pump(out, sink.clone(), LogStream::Stdout);
}
if let Some(err) = child.stderr.take() {
spawn_pump(err, sink.clone(), LogStream::Stderr);
}
Ok(RealChild {
pid,
pgid,
child: Some(child),
})
}
fn spawn_pump<R: tokio::io::AsyncRead + Unpin + Send + 'static>(
reader: R,
sink: LogSink,
stream: LogStream,
) {
tokio::spawn(async move {
let mut lines = BufReader::new(reader).lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => sink.record(stream, line),
Ok(None) => break,
Err(err) => {
tracing::warn!(
server = %sink.server_name,
error = %err,
?stream,
"log pump read error"
);
break;
}
}
}
});
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn mock_child_exit() {
let (mut child, mut ctl) = MockChild::new(123);
assert_eq!(child.pid(), 123);
ctl.exit_tx.take().unwrap().send(Some(0)).unwrap();
assert_eq!(child.wait().await.unwrap(), Some(0));
}
#[tokio::test]
async fn mock_child_terminate() {
let (mut child, mut ctl) = MockChild::new(1);
child.terminate().unwrap();
ctl.terminate_rx.try_recv().unwrap();
}
}
+18
View File
@@ -0,0 +1,18 @@
//! Process-supervision primitives for the xy daemon.
pub mod backoff;
pub mod child;
pub mod logs;
pub mod policy;
pub mod retry_window;
pub mod supervisor;
pub use backoff::Backoff;
pub use child::{ChildHandle, MockChild, MockChildController, RealChild, spawn_with_logs};
pub use logs::{LogSink, RecordedLine, RingBuffer, RotatingLogWriter};
pub use policy::{RestartDecision, decide};
pub use retry_window::RetryWindow;
pub use supervisor::{
RealSpawner, Spawner, StartAck, Status, StopAck, SupervisorCmd, SupervisorHandle,
SupervisorTask,
};
+263
View File
@@ -0,0 +1,263 @@
use std::collections::VecDeque;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use xy_protocol::rpc::{LogLine, LogStream};
pub struct RotatingLogWriter {
base: PathBuf,
max_bytes: u64,
keep: usize,
file: File,
written: u64,
}
impl RotatingLogWriter {
pub fn open(base: &Path, max_bytes: u64, keep: usize) -> std::io::Result<Self> {
if let Some(parent) = base.parent() {
std::fs::create_dir_all(parent)?;
}
let file = OpenOptions::new().create(true).append(true).open(base)?;
let written = file.metadata()?.len();
Ok(Self {
base: base.to_path_buf(),
max_bytes,
keep,
file,
written,
})
}
pub fn write_line(&mut self, tag: &str, line: &str) -> std::io::Result<()> {
let bytes = format!("{tag} {line}\n");
self.file.write_all(bytes.as_bytes())?;
self.written += bytes.len() as u64;
if self.written >= self.max_bytes {
self.rotate()?;
}
Ok(())
}
fn rotate(&mut self) -> std::io::Result<()> {
// Drop the current handle by replacing with /dev/null briefly.
self.file = OpenOptions::new().read(true).open("/dev/null")?;
for i in (1..self.keep).rev() {
let src = self.gen_path(i);
let dst = self.gen_path(i + 1);
if src.exists() {
let _ = std::fs::rename(&src, &dst);
}
}
if self.base.exists() {
let _ = std::fs::rename(&self.base, self.gen_path(1));
}
self.file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.base)?;
self.written = 0;
Ok(())
}
fn gen_path(&self, n: usize) -> PathBuf {
let mut s = self.base.as_os_str().to_os_string();
s.push(format!(".{n}"));
PathBuf::from(s)
}
}
#[derive(Clone)]
pub struct RingBuffer {
inner: Arc<Mutex<RingBufferInner>>,
capacity_bytes: usize,
}
struct RingBufferInner {
lines: VecDeque<RecordedLine>,
bytes: usize,
}
#[derive(Debug, Clone)]
pub struct RecordedLine {
pub stream: xy_protocol::rpc::LogStream,
pub line: String,
pub ts_unix_ms: u64,
}
impl RingBuffer {
pub fn new(capacity_bytes: usize) -> Self {
Self {
inner: Arc::new(Mutex::new(RingBufferInner {
lines: VecDeque::new(),
bytes: 0,
})),
capacity_bytes,
}
}
pub fn push(&self, line: RecordedLine) {
let mut g = self.inner.lock().unwrap();
g.bytes += line.line.len();
g.lines.push_back(line);
while g.bytes > self.capacity_bytes {
if let Some(removed) = g.lines.pop_front() {
g.bytes -= removed.line.len();
} else {
break;
}
}
}
pub fn snapshot_tail(&self, n: Option<u32>) -> Vec<RecordedLine> {
let g = self.inner.lock().unwrap();
match n {
None => g.lines.iter().cloned().collect(),
Some(n) => {
let take = (n as usize).min(g.lines.len());
let start = g.lines.len() - take;
g.lines.iter().skip(start).cloned().collect()
}
}
}
}
const LOG_BROADCAST_CAP: usize = 256;
#[derive(Clone)]
pub struct LogSink {
pub server_name: String,
writer: Arc<Mutex<RotatingLogWriter>>,
pub ring: RingBuffer,
pub broadcast: broadcast::Sender<LogLine>,
}
impl LogSink {
pub fn new(server_name: String, writer: RotatingLogWriter, ring_capacity_bytes: usize) -> Self {
let (tx, _) = broadcast::channel(LOG_BROADCAST_CAP);
Self {
server_name,
writer: Arc::new(Mutex::new(writer)),
ring: RingBuffer::new(ring_capacity_bytes),
broadcast: tx,
}
}
pub fn record(&self, stream: LogStream, line: String) {
let ts = now_unix_ms();
let tag = match stream {
LogStream::Stdout => "[out]",
LogStream::Stderr => "[err]",
};
if let Err(e) = self.writer.lock().unwrap().write_line(tag, &line) {
tracing::warn!(server = %self.server_name, error = %e, "log file write failed");
}
self.ring.push(RecordedLine {
stream,
line: line.clone(),
ts_unix_ms: ts,
});
let _ = self.broadcast.send(LogLine {
subscription_id: 0,
name: self.server_name.clone(),
stream,
line,
ts_unix_ms: ts,
});
}
}
fn now_unix_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Read;
use tempfile::tempdir;
#[test]
fn writes_lines_with_tags() {
let dir = tempdir().unwrap();
let base = dir.path().join("x.log");
let mut w = RotatingLogWriter::open(&base, 1024, 3).unwrap();
w.write_line("[out]", "hello").unwrap();
w.write_line("[err]", "boom").unwrap();
let mut s = String::new();
File::open(&base).unwrap().read_to_string(&mut s).unwrap();
assert_eq!(s, "[out] hello\n[err] boom\n");
}
#[test]
fn rotates_at_threshold() {
let dir = tempdir().unwrap();
let base = dir.path().join("x.log");
let mut w = RotatingLogWriter::open(&base, 20, 3).unwrap();
for _ in 0..5 {
w.write_line("[out]", "0123456789").unwrap();
}
assert!(base.exists());
let rotated = dir.path().join("x.log.1");
assert!(
rotated.exists(),
"expected rotated file at {}",
rotated.display()
);
}
use xy_protocol::rpc::LogStream;
fn recorded(s: &str) -> RecordedLine {
RecordedLine {
stream: LogStream::Stdout,
line: s.to_string(),
ts_unix_ms: 0,
}
}
#[test]
fn ring_buffer_drops_oldest_when_full() {
let rb = RingBuffer::new(10);
rb.push(recorded("aaaaa"));
rb.push(recorded("bbbbb"));
rb.push(recorded("ccc"));
let snap = rb.snapshot_tail(None);
assert_eq!(snap.len(), 2);
assert_eq!(snap[0].line, "bbbbb");
assert_eq!(snap[1].line, "ccc");
}
#[test]
fn ring_buffer_tail_n() {
let rb = RingBuffer::new(1024);
for i in 0..5 {
rb.push(recorded(&format!("line{i}")));
}
let snap = rb.snapshot_tail(Some(2));
assert_eq!(snap.len(), 2);
assert_eq!(snap[0].line, "line3");
assert_eq!(snap[1].line, "line4");
}
#[tokio::test]
async fn log_sink_records_and_broadcasts() {
let dir = tempdir().unwrap();
let writer = RotatingLogWriter::open(&dir.path().join("s.log"), 1024, 3).unwrap();
let sink = LogSink::new("s".to_string(), writer, 1024);
let mut rx = sink.broadcast.subscribe();
sink.record(LogStream::Stdout, "hello".to_string());
let got = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
.await
.unwrap()
.unwrap();
assert_eq!(got.line, "hello");
assert_eq!(got.stream, LogStream::Stdout);
assert_eq!(sink.ring.snapshot_tail(None).len(), 1);
}
}
+102
View File
@@ -0,0 +1,102 @@
use xy_protocol::RestartPolicy;
#[derive(Debug, PartialEq, Eq)]
pub enum RestartDecision {
Restart,
StayStopped,
MarkFailed,
}
pub fn decide(
policy: RestartPolicy,
exit_code: Option<i32>,
retry_cap_reached: bool,
) -> RestartDecision {
let clean = matches!(exit_code, Some(0));
let want = match policy {
RestartPolicy::Never => false,
RestartPolicy::OnFailure => !clean,
RestartPolicy::Always => true,
};
if !want {
return RestartDecision::StayStopped;
}
if retry_cap_reached {
RestartDecision::MarkFailed
} else {
RestartDecision::Restart
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn never_never_restarts() {
assert_eq!(
decide(RestartPolicy::Never, Some(0), false),
RestartDecision::StayStopped
);
assert_eq!(
decide(RestartPolicy::Never, Some(1), false),
RestartDecision::StayStopped
);
assert_eq!(
decide(RestartPolicy::Never, None, false),
RestartDecision::StayStopped
);
}
#[test]
fn on_failure_skips_clean() {
assert_eq!(
decide(RestartPolicy::OnFailure, Some(0), false),
RestartDecision::StayStopped
);
}
#[test]
fn on_failure_restarts_nonzero() {
assert_eq!(
decide(RestartPolicy::OnFailure, Some(1), false),
RestartDecision::Restart
);
}
#[test]
fn on_failure_restarts_signal() {
assert_eq!(
decide(RestartPolicy::OnFailure, None, false),
RestartDecision::Restart
);
}
#[test]
fn always_restarts_on_clean() {
assert_eq!(
decide(RestartPolicy::Always, Some(0), false),
RestartDecision::Restart
);
}
#[test]
fn cap_reached_marks_failed() {
assert_eq!(
decide(RestartPolicy::Always, Some(0), true),
RestartDecision::MarkFailed
);
assert_eq!(
decide(RestartPolicy::OnFailure, Some(1), true),
RestartDecision::MarkFailed
);
}
#[test]
fn cap_reached_never_still_stopped() {
assert_eq!(
decide(RestartPolicy::Never, Some(1), true),
RestartDecision::StayStopped
);
}
}
+80
View File
@@ -0,0 +1,80 @@
use std::collections::VecDeque;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct RetryWindow {
window: Duration,
cap: u32,
events: VecDeque<Instant>,
}
impl RetryWindow {
pub fn new(window: Duration, cap: u32) -> Self {
Self {
window,
cap,
events: VecDeque::new(),
}
}
pub fn record(&mut self, now: Instant) {
self.events.push_back(now);
self.prune(now);
}
pub fn cap_reached(&mut self, now: Instant) -> bool {
self.prune(now);
self.events.len() as u32 >= self.cap
}
pub fn count(&mut self, now: Instant) -> u32 {
self.prune(now);
self.events.len() as u32
}
fn prune(&mut self, now: Instant) {
while let Some(&front) = self.events.front() {
if now.duration_since(front) > self.window {
self.events.pop_front();
} else {
break;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn below_cap_not_reached() {
let mut w = RetryWindow::new(Duration::from_secs(60), 3);
let t = Instant::now();
w.record(t);
w.record(t);
assert!(!w.cap_reached(t));
}
#[test]
fn at_cap_reached() {
let mut w = RetryWindow::new(Duration::from_secs(60), 3);
let t = Instant::now();
w.record(t);
w.record(t);
w.record(t);
assert!(w.cap_reached(t));
}
#[test]
fn old_events_pruned() {
let mut w = RetryWindow::new(Duration::from_secs(60), 3);
let t0 = Instant::now();
w.record(t0);
w.record(t0);
w.record(t0);
let t1 = t0 + Duration::from_secs(61);
assert_eq!(w.count(t1), 0);
assert!(!w.cap_reached(t1));
}
}
+451
View File
@@ -0,0 +1,451 @@
use crate::{
backoff::Backoff,
child::ChildHandle,
logs::LogSink,
policy::{RestartDecision, decide},
retry_window::RetryWindow,
};
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::sleep;
use tracing::{debug, info, warn};
use xy_protocol::{ServerConfig, ServerState};
pub enum SupervisorCmd {
Start {
ack: oneshot::Sender<StartAck>,
},
Stop {
ack: oneshot::Sender<StopAck>,
},
Restart {
ack: oneshot::Sender<()>,
},
Reconfigure {
new: ServerConfig,
ack: oneshot::Sender<()>,
},
Shutdown {
ack: oneshot::Sender<()>,
},
}
#[derive(Debug, PartialEq, Eq)]
pub enum StartAck {
Started,
AlreadyRunning,
SpawnFailed(String),
}
#[derive(Debug, PartialEq, Eq)]
pub enum StopAck {
Stopped,
NotRunning,
}
#[derive(Debug, Clone)]
pub struct Status {
pub state: ServerState,
pub pid: Option<u32>,
pub port: u16,
pub uptime_secs: Option<u64>,
pub restart_count: u32,
pub last_exit: Option<i32>,
}
#[derive(Clone)]
pub struct SupervisorHandle {
pub name: String,
pub tx: mpsc::Sender<SupervisorCmd>,
pub status: watch::Receiver<Status>,
pub log_sink: LogSink,
}
#[async_trait::async_trait]
pub trait Spawner: Send + 'static {
type Child: ChildHandle;
async fn spawn(&self, cfg: &ServerConfig, sink: LogSink) -> std::io::Result<Self::Child>;
}
pub struct SupervisorTask<S: Spawner> {
cfg: ServerConfig,
log_sink: LogSink,
spawner: S,
status_tx: watch::Sender<Status>,
cmd_rx: mpsc::Receiver<SupervisorCmd>,
backoff: Backoff,
retry_window: RetryWindow,
restart_count: u32,
last_exit: Option<i32>,
started_at: Option<Instant>,
current_pid: Option<u32>,
}
impl<S: Spawner> SupervisorTask<S> {
pub fn new(
cfg: ServerConfig,
log_sink: LogSink,
spawner: S,
status_tx: watch::Sender<Status>,
cmd_rx: mpsc::Receiver<SupervisorCmd>,
) -> Self {
let backoff = Backoff::new(cfg.restart.backoff_initial, cfg.restart.backoff_max);
let retry_window =
RetryWindow::new(Duration::from_secs(60), cfg.restart.max_retries_per_minute);
Self {
cfg,
log_sink,
spawner,
status_tx,
cmd_rx,
backoff,
retry_window,
restart_count: 0,
last_exit: None,
started_at: None,
current_pid: None,
}
}
fn set_state(&mut self, s: ServerState) {
let uptime_secs = self.started_at.map(|t| t.elapsed().as_secs());
let _ = self.status_tx.send(Status {
state: s,
pid: self.current_pid,
port: self.cfg.port,
uptime_secs,
restart_count: self.restart_count,
last_exit: self.last_exit,
});
}
pub async fn run(mut self) {
let mut child: Option<S::Child> = None;
loop {
tokio::select! {
cmd = self.cmd_rx.recv() => {
let Some(cmd) = cmd else { break; };
match cmd {
SupervisorCmd::Start { ack } => {
if child.is_some() {
let _ = ack.send(StartAck::AlreadyRunning);
} else {
match self.do_start().await {
Ok(c) => {
child = Some(c);
let _ = ack.send(StartAck::Started);
}
Err(err) => {
warn!(name = %self.cfg.name, error = %err, "spawn failed");
self.set_state(ServerState::Failed);
let _ = ack.send(StartAck::SpawnFailed(err.to_string()));
}
}
}
}
SupervisorCmd::Stop { ack } => {
if let Some(c) = child.take() {
self.do_stop(c).await;
let _ = ack.send(StopAck::Stopped);
} else {
let _ = ack.send(StopAck::NotRunning);
}
}
SupervisorCmd::Restart { ack } => {
if let Some(c) = child.take() {
self.set_state(ServerState::Restarting);
self.do_stop(c).await;
}
match self.do_start().await {
Ok(c) => child = Some(c),
Err(err) => {
warn!(name = %self.cfg.name, error = %err, "restart spawn failed");
self.set_state(ServerState::Failed);
}
}
let _ = ack.send(());
}
SupervisorCmd::Reconfigure { new, ack } => {
self.cfg = new;
self.backoff =
Backoff::new(self.cfg.restart.backoff_initial, self.cfg.restart.backoff_max);
self.retry_window = RetryWindow::new(
Duration::from_secs(60),
self.cfg.restart.max_retries_per_minute,
);
let _ = ack.send(());
}
SupervisorCmd::Shutdown { ack } => {
if let Some(c) = child.take() {
self.do_stop(c).await;
}
let _ = ack.send(());
return;
}
}
}
code = wait_child(&mut child) => {
child = None;
self.last_exit = code;
self.current_pid = None;
let now = Instant::now();
self.retry_window.record(now);
let cap = self.retry_window.cap_reached(now);
let decision = decide(self.cfg.restart.policy, code, cap);
debug!(name = %self.cfg.name, ?code, ?decision, "child exited");
match decision {
RestartDecision::StayStopped => {
self.started_at = None;
self.set_state(ServerState::Stopped);
}
RestartDecision::MarkFailed => {
self.started_at = None;
self.set_state(ServerState::Failed);
}
RestartDecision::Restart => {
self.set_state(ServerState::Restarting);
let delay = self.backoff.next();
enum Action {
RetryNow,
Cancel,
Exit,
}
let mut delay_fut = std::pin::pin!(sleep(delay));
let action = tokio::select! {
_ = &mut delay_fut => Action::RetryNow,
cmd = self.cmd_rx.recv() => match cmd {
None => Action::Exit,
Some(SupervisorCmd::Stop { ack }) => {
let _ = ack.send(StopAck::NotRunning);
Action::Cancel
}
Some(SupervisorCmd::Shutdown { ack }) => {
let _ = ack.send(());
return;
}
Some(SupervisorCmd::Start { ack }) => {
let _ = ack.send(StartAck::Started);
Action::RetryNow
}
Some(SupervisorCmd::Restart { ack }) => {
let _ = ack.send(());
Action::RetryNow
}
Some(SupervisorCmd::Reconfigure { new, ack }) => {
self.cfg = new;
self.backoff = Backoff::new(
self.cfg.restart.backoff_initial,
self.cfg.restart.backoff_max,
);
self.retry_window = RetryWindow::new(
Duration::from_secs(60),
self.cfg.restart.max_retries_per_minute,
);
let _ = ack.send(());
Action::RetryNow
}
},
};
match action {
Action::RetryNow => {
match self.do_start().await {
Ok(c) => child = Some(c),
Err(err) => {
warn!(name = %self.cfg.name, error = %err, "restart spawn failed");
self.set_state(ServerState::Failed);
}
}
}
Action::Cancel => {
self.started_at = None;
self.set_state(ServerState::Stopped);
}
Action::Exit => return,
}
}
}
}
}
}
}
async fn do_start(&mut self) -> std::io::Result<S::Child> {
self.set_state(ServerState::Starting);
let c = self.spawner.spawn(&self.cfg, self.log_sink.clone()).await?;
self.restart_count = self.restart_count.saturating_add(1);
self.started_at = Some(Instant::now());
self.current_pid = Some(c.pid());
self.backoff.reset();
self.set_state(ServerState::Running);
info!(name = %self.cfg.name, pid = c.pid(), "started");
Ok(c)
}
async fn do_stop(&mut self, mut c: S::Child) {
self.set_state(ServerState::Stopping);
let _ = c.terminate();
let grace = self.cfg.stop.grace;
match tokio::time::timeout(grace, c.wait()).await {
Ok(_) => {}
Err(_) => {
let _ = c.kill();
let _ = c.wait().await;
}
}
self.current_pid = None;
self.started_at = None;
self.set_state(ServerState::Stopped);
}
}
async fn wait_child<C: ChildHandle>(slot: &mut Option<C>) -> Option<i32> {
match slot.as_mut() {
Some(c) => c.wait().await.ok().flatten(),
None => std::future::pending().await,
}
}
pub struct RealSpawner;
#[async_trait::async_trait]
impl Spawner for RealSpawner {
type Child = crate::child::RealChild;
async fn spawn(&self, cfg: &ServerConfig, sink: LogSink) -> std::io::Result<Self::Child> {
crate::child::spawn_with_logs(cfg, sink)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::child::MockChild;
use crate::logs::{LogSink, RotatingLogWriter};
use std::sync::{Arc, Mutex};
use tempfile::tempdir;
use xy_protocol::{RestartConfig, RestartPolicy, StopConfig};
struct QueueSpawner {
queue: Arc<Mutex<Vec<MockChild>>>,
}
#[async_trait::async_trait]
impl Spawner for QueueSpawner {
type Child = MockChild;
async fn spawn(&self, _cfg: &ServerConfig, _sink: LogSink) -> std::io::Result<MockChild> {
let mut q = self.queue.lock().unwrap();
Ok(q.remove(0))
}
}
fn cfg(name: &str, policy: RestartPolicy, max_retries: u32) -> ServerConfig {
ServerConfig {
name: name.to_string(),
command: "/bin/true".into(),
args: vec![],
port: 1,
env: Default::default(),
working_dir: None,
restart: RestartConfig {
policy,
backoff_initial: Duration::from_millis(1),
backoff_max: Duration::from_millis(1),
max_retries_per_minute: max_retries,
},
stop: StopConfig {
grace: Duration::from_millis(50),
},
}
}
fn sink(name: &str) -> LogSink {
let dir = tempdir().unwrap();
let writer = RotatingLogWriter::open(&dir.path().join("s.log"), 1024, 3).unwrap();
std::mem::forget(dir);
LogSink::new(name.to_string(), writer, 1024)
}
fn initial_status(cfg: &ServerConfig) -> Status {
Status {
state: ServerState::Stopped,
pid: None,
port: cfg.port,
uptime_secs: None,
restart_count: 0,
last_exit: None,
}
}
async fn wait_for(rx: &mut watch::Receiver<Status>, want: ServerState) {
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
if rx.borrow().state == want {
return;
}
tokio::select! {
_ = rx.changed() => {}
_ = tokio::time::sleep_until(deadline) => panic!("never reached {want:?}, last={:?}", rx.borrow().state),
}
}
}
#[tokio::test]
async fn start_runs_to_running_and_stop_to_stopped() {
let cfg = cfg("x", RestartPolicy::Never, 5);
let (mock, mut ctl) = MockChild::new(1);
let queue = Arc::new(Mutex::new(vec![mock]));
let spawner = QueueSpawner { queue };
let (status_tx, mut status_rx) = watch::channel(initial_status(&cfg));
let (cmd_tx, cmd_rx) = mpsc::channel(8);
let task = SupervisorTask::new(cfg, sink("x"), spawner, status_tx, cmd_rx);
let h = tokio::spawn(task.run());
let (ack_tx, ack_rx) = oneshot::channel();
cmd_tx
.send(SupervisorCmd::Start { ack: ack_tx })
.await
.unwrap();
assert_eq!(ack_rx.await.unwrap(), StartAck::Started);
wait_for(&mut status_rx, ServerState::Running).await;
ctl.exit_tx.take().unwrap().send(Some(0)).unwrap();
wait_for(&mut status_rx, ServerState::Stopped).await;
let (ack_tx, ack_rx) = oneshot::channel();
cmd_tx
.send(SupervisorCmd::Shutdown { ack: ack_tx })
.await
.unwrap();
ack_rx.await.unwrap();
h.await.unwrap();
}
}
+35
View File
@@ -0,0 +1,35 @@
[package]
name = "xy"
edition.workspace = true
version.workspace = true
license.workspace = true
[[bin]]
name = "xy"
path = "src/main.rs"
[[bin]]
name = "xy-test-sleep-server"
path = "src/bin/xy_test_sleep_server.rs"
[[bin]]
name = "xy-test-exit-failure"
path = "src/bin/xy_test_exit_failure.rs"
[dependencies]
xy-protocol.workspace = true
xy-supervisor.workspace = true
xy-ipc.workspace = true
tokio.workspace = true
clap.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
anyhow.workspace = true
etcetera.workspace = true
nix.workspace = true
humantime.workspace = true
[dev-dependencies]
tempfile.workspace = true
@@ -0,0 +1,4 @@
fn main() {
eprintln!("exit_failure dying immediately");
std::process::exit(7);
}
+12
View File
@@ -0,0 +1,12 @@
fn main() {
use std::io::Write;
let pid = std::process::id();
eprintln!("sleep_server start pid={pid}");
println!("ready");
std::io::stdout().flush().ok();
loop {
std::thread::sleep(std::time::Duration::from_secs(60));
println!("tick");
std::io::stdout().flush().ok();
}
}
+27
View File
@@ -0,0 +1,27 @@
use xy_protocol::rpc::ServerSummary;
pub fn list_table(rows: &[ServerSummary]) -> String {
let mut out = String::new();
out.push_str("NAME STATE PID PORT UPTIME RESTARTS\n");
for r in rows {
let pid = r.pid.map(|p| p.to_string()).unwrap_or_else(|| "-".into());
let up = r
.uptime_secs
.map(|s| format!("{}s", s))
.unwrap_or_else(|| "-".into());
out.push_str(&format!(
"{:<20}{:<12}{:<8}{:<8}{:<10}{}\n",
r.name,
format!("{:?}", r.state).to_lowercase(),
pid,
r.port,
up,
r.restart_count
));
}
out
}
+194
View File
@@ -0,0 +1,194 @@
use crate::paths::Paths;
use anyhow::Result;
use serde_json::json;
use xy_ipc::{Client, ClientError};
use xy_protocol::rpc::{
LogLine, LogStream, LogsParams, LogsSubscribed, ReloadResult, RestartResult, ServerSummary,
StartResult, StatusDetail, StopResult, methods, notifications,
};
mod format;
async fn connect(paths: &Paths) -> Result<Client> {
match Client::connect(&paths.socket).await {
Ok(c) => Ok(c),
Err(err) => {
eprintln!(
"xy: cannot reach daemon at {}: {err}",
paths.socket.display()
);
std::process::exit(2);
}
}
}
fn rpc_to_exit(err: &ClientError) -> i32 {
match err {
ClientError::Unreachable(_) => 2,
ClientError::Rpc { .. } => 1,
_ => 1,
}
}
pub async fn list(paths: Paths) -> Result<i32> {
let mut c = connect(&paths).await?;
let rows: Vec<ServerSummary> = match c.call_no_params(methods::LIST).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
print!("{}", format::list_table(&rows));
Ok(0)
}
pub async fn status(paths: Paths, name: String) -> Result<i32> {
let mut c = connect(&paths).await?;
let d: StatusDetail = match c.call(methods::STATUS, &json!({"name": name})).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
println!("{:#?}", d);
Ok(0)
}
fn name_or_all(all: bool, name: Option<String>) -> serde_json::Value {
if all {
json!({"all": true})
} else {
json!({"name": name.unwrap()})
}
}
pub async fn start(paths: Paths, all: bool, name: Option<String>) -> Result<i32> {
let mut c = connect(&paths).await?;
let r: StartResult = match c.call(methods::START, &name_or_all(all, name)).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
if !r.started.is_empty() {
println!("started: {}", r.started.join(", "));
}
if !r.already_running.is_empty() {
println!("already running: {}", r.already_running.join(", "));
}
Ok(0)
}
pub async fn stop(paths: Paths, all: bool, name: Option<String>) -> Result<i32> {
let mut c = connect(&paths).await?;
let r: StopResult = match c.call(methods::STOP, &name_or_all(all, name)).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
if !r.stopped.is_empty() {
println!("stopped: {}", r.stopped.join(", "));
}
if !r.not_running.is_empty() {
println!("not running: {}", r.not_running.join(", "));
}
Ok(0)
}
pub async fn restart(paths: Paths, all: bool, name: Option<String>) -> Result<i32> {
let mut c = connect(&paths).await?;
let r: RestartResult = match c.call(methods::RESTART, &name_or_all(all, name)).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
println!("restarted: {}", r.restarted.join(", "));
Ok(0)
}
pub async fn reload(paths: Paths) -> Result<i32> {
let mut c = connect(&paths).await?;
let r: ReloadResult = match c.call_no_params(methods::RELOAD).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
println!("added: {}", r.added.join(", "));
println!("removed: {}", r.removed.join(", "));
println!("changed: {}", r.changed.join(", "));
println!("unchanged: {}", r.unchanged.join(", "));
Ok(0)
}
pub async fn logs(paths: Paths, name: String, tail: Option<u32>, follow: bool) -> Result<i32> {
let mut c = connect(&paths).await?;
let p = LogsParams {
name: name.clone(),
tail,
follow,
};
let _sub: LogsSubscribed = match c.call(methods::LOGS, &p).await {
Ok(v) => v,
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
};
loop {
match c.read_notification().await {
Ok(None) => return Ok(0),
Ok(Some(n)) => match n.method.as_str() {
notifications::LOG => {
if let Some(params) = n.params
&& let Ok(line) = serde_json::from_value::<LogLine>(params)
{
let tag = match line.stream {
LogStream::Stdout => "out",
LogStream::Stderr => "err",
};
println!("[{tag}] {}", line.line);
}
}
notifications::LOG_END => return Ok(0),
_ => {}
},
Err(err) => {
eprintln!("xy: {err}");
return Ok(rpc_to_exit(&err));
}
}
}
}
+546
View File
@@ -0,0 +1,546 @@
use crate::daemon::registry::Registry;
use crate::paths::Paths;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use xy_ipc::Connection;
use xy_ipc::envelope::{Incoming, Request, Response, err_response, ok_response};
use xy_protocol::RpcErrorCode;
use xy_protocol::rpc::{
LogEnd, LogLine, LogsCancelParams, LogsParams, LogsSubscribed, NameOrAll, RestartResult,
ServerSummary, StartResult, StatusDetail, StopResult, methods, notifications,
};
use xy_supervisor::supervisor::{StartAck, StopAck, SupervisorCmd};
pub struct ConnState {
pub subs: Mutex<HashMap<u64, JoinHandle<()>>>,
pub next: AtomicU64,
}
impl ConnState {
pub fn new() -> Self {
Self {
subs: Mutex::new(HashMap::new()),
next: AtomicU64::new(1),
}
}
}
pub async fn serve(conn: Arc<Connection>, reg: Registry, _paths: Paths) -> std::io::Result<()> {
let state = Arc::new(ConnState::new());
loop {
let Some(incoming) = conn.read_incoming().await? else {
let mut subs = state.subs.lock().await;
for (_, h) in subs.drain() {
h.abort();
}
return Ok(());
};
if let Incoming::Request(req) = incoming {
let (resp, log_ready) = handle_request(req, &reg, &conn, &state).await;
conn.write_response(&resp).await?;
if let Some(tx) = log_ready {
let _ = tx.send(());
}
}
}
}
struct ApiError {
code: i32,
message: String,
}
impl ApiError {
fn rpc(code: RpcErrorCode, msg: impl Into<String>) -> Self {
Self {
code: code.as_i32(),
message: msg.into(),
}
}
}
type LogReadyTx = tokio::sync::oneshot::Sender<()>;
async fn handle_request(
req: Request,
reg: &Registry,
conn: &Arc<Connection>,
state: &Arc<ConnState>,
) -> (Response, Option<LogReadyTx>) {
let id = req.id.clone();
let method = req.method.as_str();
let params = req.params.unwrap_or(serde_json::Value::Null);
let resp = match method {
methods::LIST => match list(reg).await {
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
Err(err) => err_response(id, err.code, err.message),
},
methods::STATUS => {
let p: xy_protocol::rpc::StatusParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(err) => {
return (
err_response(id, -32602, format!("invalid params: {err}")),
None,
);
}
};
match status(reg, &p.name).await {
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
Err(err) => err_response(id, err.code, err.message),
}
}
methods::START => dispatch_lifecycle(id, params, reg, Op::Start).await,
methods::STOP => dispatch_lifecycle(id, params, reg, Op::Stop).await,
methods::RESTART => dispatch_lifecycle(id, params, reg, Op::Restart).await,
methods::RELOAD => match reload(reg).await {
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
Err(e) => err_response(id, e.code, e.message),
},
methods::LOGS => {
let p: LogsParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(err) => {
return (
err_response(id, -32602, format!("invalid params: {err}")),
None,
);
}
};
match start_log_stream(reg, conn.clone(), state.clone(), p).await {
Ok((sub_id, ready_tx)) => {
let resp = ok_response(
id,
serde_json::to_value(LogsSubscribed {
subscription_id: sub_id,
})
.unwrap(),
);
return (resp, Some(ready_tx));
}
Err(err) => err_response(id, err.code, err.message),
}
}
methods::LOGS_CANCEL => {
let p: LogsCancelParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(err) => {
return (
err_response(id, -32602, format!("invalid params: {err}")),
None,
);
}
};
let mut subs = state.subs.lock().await;
if let Some(h) = subs.remove(&p.subscription_id) {
h.abort();
}
ok_response(id, serde_json::json!({}))
}
other => err_response(id, -32601, format!("unknown method `{other}`")),
};
(resp, None)
}
async fn list(reg: &Registry) -> Result<Vec<ServerSummary>, ApiError> {
let mut out = Vec::new();
for (name, entry) in reg.snapshot().await {
let s = entry.handle.status.borrow();
out.push(ServerSummary {
name,
state: s.state,
pid: s.pid,
port: s.port,
uptime_secs: s.uptime_secs,
restart_count: s.restart_count,
last_exit: s.last_exit,
});
}
Ok(out)
}
async fn status(reg: &Registry, name: &str) -> Result<StatusDetail, ApiError> {
let Some(entry) = reg.get(name).await else {
return Err(ApiError::rpc(
RpcErrorCode::ServerNotFound,
format!("no such server `{name}`"),
));
};
let s = entry.handle.status.borrow();
Ok(StatusDetail {
summary: ServerSummary {
name: entry.handle.name.clone(),
state: s.state,
pid: s.pid,
port: s.port,
uptime_secs: s.uptime_secs,
restart_count: s.restart_count,
last_exit: s.last_exit,
},
recent_transitions: Vec::new(),
})
}
enum Op {
Start,
Stop,
Restart,
}
async fn dispatch_lifecycle(
id: serde_json::Value,
params: serde_json::Value,
reg: &Registry,
op: Op,
) -> Response {
let p: NameOrAll = match serde_json::from_value(params) {
Ok(p) => p,
Err(err) => return err_response(id, -32602, format!("invalid params: {err}")),
};
let targets: Vec<String> = match p {
NameOrAll::All { all } if all => reg.names().await,
NameOrAll::Name { name } => vec![name],
NameOrAll::All { .. } => return err_response(id, -32602, "must set all=true".into()),
};
match op {
Op::Start => {
let mut started = Vec::new();
let mut already = Vec::new();
for name in targets {
let Some(entry) = reg.get(&name).await else {
return err_response(
id,
RpcErrorCode::ServerNotFound.as_i32(),
format!("no such server `{name}`"),
);
};
let (tx, rx) = oneshot::channel();
let _ = entry.handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
match rx.await {
Ok(StartAck::Started) => started.push(name),
Ok(StartAck::AlreadyRunning) => already.push(name),
Ok(StartAck::SpawnFailed(msg)) => {
return err_response(
id,
RpcErrorCode::SpawnFailed.as_i32(),
format!("failed to start `{name}`: {msg}"),
);
}
Err(_) => {
return err_response(
id,
RpcErrorCode::SpawnFailed.as_i32(),
format!("supervisor for `{name}` dropped"),
);
}
}
}
ok_response(
id,
serde_json::to_value(StartResult {
started,
already_running: already,
})
.unwrap(),
)
}
Op::Stop => {
let mut stopped = Vec::new();
let mut not_running = Vec::new();
for name in targets {
let Some(entry) = reg.get(&name).await else {
return err_response(
id,
RpcErrorCode::ServerNotFound.as_i32(),
format!("no such server `{name}`"),
);
};
let (tx, rx) = oneshot::channel();
let _ = entry.handle.tx.send(SupervisorCmd::Stop { ack: tx }).await;
match rx.await {
Ok(StopAck::Stopped) => stopped.push(name),
Ok(StopAck::NotRunning) => not_running.push(name),
Err(_) => {
return err_response(
id,
RpcErrorCode::SpawnFailed.as_i32(),
format!("supervisor for `{name}` dropped"),
);
}
}
}
ok_response(
id,
serde_json::to_value(StopResult {
stopped,
not_running,
})
.unwrap(),
)
}
Op::Restart => {
let mut restarted = Vec::new();
for name in targets {
let Some(entry) = reg.get(&name).await else {
return err_response(
id,
RpcErrorCode::ServerNotFound.as_i32(),
format!("no such server `{name}`"),
);
};
let (tx, rx) = oneshot::channel();
let _ = entry
.handle
.tx
.send(SupervisorCmd::Restart { ack: tx })
.await;
let _ = rx.await;
restarted.push(name);
}
ok_response(
id,
serde_json::to_value(RestartResult { restarted }).unwrap(),
)
}
}
}
use xy_protocol::rpc::ReloadResult;
async fn reload(reg: &Registry) -> Result<ReloadResult, ApiError> {
let paths = crate::daemon::PATHS.get().ok_or_else(|| {
ApiError::rpc(RpcErrorCode::ConfigInvalid, "daemon paths not initialized")
})?;
let new_configs = xy_protocol::kdl_parse::load_all_configs(&paths.config_dir)
.map_err(|err| ApiError::rpc(RpcErrorCode::ConfigInvalid, err.to_string()))?;
use std::collections::HashMap;
let new_by_name: HashMap<String, xy_protocol::ServerConfig> = new_configs
.into_iter()
.map(|c| (c.name.clone(), c))
.collect();
let existing_names: Vec<String> = reg.names().await;
let mut added = Vec::new();
let mut removed = Vec::new();
let mut changed = Vec::new();
let mut unchanged = Vec::new();
for name in &existing_names {
if !new_by_name.contains_key(name)
&& let Some(entry) = reg.remove(name).await
{
let (tx, rx) = oneshot::channel();
let _ = entry
.handle
.tx
.send(SupervisorCmd::Shutdown { ack: tx })
.await;
let _ = rx.await;
removed.push(name.clone());
}
}
for (name, cfg) in new_by_name {
let new_hash = crate::daemon::config_hash(&cfg);
match reg.get(&name).await {
None => {
let handle = crate::daemon::spawn_supervisor(paths, cfg)
.map_err(|err| ApiError::rpc(RpcErrorCode::SpawnFailed, err.to_string()))?;
reg.insert(
name.clone(),
crate::daemon::registry::Entry {
handle: handle.clone(),
config_hash: new_hash,
},
)
.await;
let (tx, rx) = oneshot::channel();
let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
let _ = rx.await;
added.push(name);
}
Some(entry) if entry.config_hash != new_hash => {
let (tx, rx) = oneshot::channel();
let _ = entry
.handle
.tx
.send(SupervisorCmd::Shutdown { ack: tx })
.await;
let _ = rx.await;
reg.remove(&name).await;
let handle = crate::daemon::spawn_supervisor(paths, cfg)
.map_err(|err| ApiError::rpc(RpcErrorCode::SpawnFailed, err.to_string()))?;
reg.insert(
name.clone(),
crate::daemon::registry::Entry {
handle: handle.clone(),
config_hash: new_hash,
},
)
.await;
let (tx, rx) = oneshot::channel();
let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
let _ = rx.await;
changed.push(name);
}
Some(_) => unchanged.push(name),
}
}
Ok(ReloadResult {
added,
removed,
changed,
unchanged,
})
}
async fn start_log_stream(
reg: &Registry,
conn: Arc<Connection>,
state: Arc<ConnState>,
p: LogsParams,
) -> Result<(u64, LogReadyTx), ApiError> {
let Some(entry) = reg.get(&p.name).await else {
return Err(ApiError::rpc(
RpcErrorCode::ServerNotFound,
format!("no such server `{}`", p.name),
));
};
let sub_id = state.next.fetch_add(1, Ordering::Relaxed);
let sink = entry.handle.log_sink.clone();
let conn2 = conn.clone();
let state2 = state.clone();
let follow = p.follow;
let tail = p.tail;
let name = p.name.clone();
let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
let task = tokio::spawn(async move {
// Wait until `serve` has written the LOGS response before sending any
// LOG notifications. Without this the spawned task can race ahead of
// `conn.write_response` and emit notifications that the client
// discards while still awaiting the response.
let _ = ready_rx.await;
for line in sink.ring.snapshot_tail(tail) {
let n = xy_ipc::envelope::notification(
notifications::LOG,
Some(
serde_json::to_value(LogLine {
subscription_id: sub_id,
name: name.clone(),
stream: line.stream,
line: line.line,
ts_unix_ms: line.ts_unix_ms,
})
.unwrap(),
),
);
if conn2.write_notification(&n).await.is_err() {
return;
}
}
if !follow {
let end = xy_ipc::envelope::notification(
notifications::LOG_END,
Some(
serde_json::to_value(LogEnd {
subscription_id: sub_id,
})
.unwrap(),
),
);
let _ = conn2.write_notification(&end).await;
state2.subs.lock().await.remove(&sub_id);
return;
}
let mut rx = sink.broadcast.subscribe();
while let Ok(mut line) = rx.recv().await {
line.subscription_id = sub_id;
let n = xy_ipc::envelope::notification(
notifications::LOG,
Some(serde_json::to_value(&line).unwrap()),
);
if conn2.write_notification(&n).await.is_err() {
break;
}
}
});
state.subs.lock().await.insert(sub_id, task);
Ok((sub_id, ready_tx))
}
+147
View File
@@ -0,0 +1,147 @@
use crate::paths::Paths;
use crate::pidfile::PidFile;
use anyhow::{Context, Result};
use std::sync::{Arc, OnceLock};
use tokio::sync::{mpsc, oneshot, watch};
use tracing::{error, info};
use xy_ipc::{Connection, bind};
use xy_protocol::{ServerConfig, ServerState, kdl_parse::load_all_configs};
use xy_supervisor::{
logs::{LogSink, RotatingLogWriter},
supervisor::{RealSpawner, Status, SupervisorCmd, SupervisorHandle, SupervisorTask},
};
pub mod handlers;
pub mod registry;
pub mod shutdown;
const LOG_FILE_MAX_BYTES: u64 = 10 * 1024 * 1024;
const LOG_FILE_KEEP: usize = 5;
const RING_BUFFER_BYTES: usize = 1024 * 1024;
pub static PATHS: OnceLock<Paths> = OnceLock::new();
pub fn config_hash(cfg: &ServerConfig) -> u64 {
use std::hash::{Hash, Hasher};
let mut h = std::collections::hash_map::DefaultHasher::new();
serde_json::to_string(cfg).unwrap().hash(&mut h);
h.finish()
}
pub fn spawn_supervisor(paths: &Paths, cfg: ServerConfig) -> Result<SupervisorHandle> {
let log_path = paths.log_dir.join(format!("{}.log", cfg.name));
let writer = RotatingLogWriter::open(&log_path, LOG_FILE_MAX_BYTES, LOG_FILE_KEEP)
.with_context(|| format!("open log file {}", log_path.display()))?;
let sink = LogSink::new(cfg.name.clone(), writer, RING_BUFFER_BYTES);
let initial_status = Status {
state: ServerState::Stopped,
pid: None,
port: cfg.port,
uptime_secs: None,
restart_count: 0,
last_exit: None,
};
let (status_tx, status_rx) = watch::channel(initial_status);
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let name = cfg.name.clone();
let task = SupervisorTask::new(cfg, sink.clone(), RealSpawner, status_tx, cmd_rx);
tokio::spawn(task.run());
Ok(SupervisorHandle {
name,
tx: cmd_tx,
status: status_rx,
log_sink: sink,
})
}
pub async fn run(paths: Paths) -> Result<()> {
paths.ensure_dirs().context("create state dirs")?;
let _pid =
PidFile::acquire(&paths.pidfile).context("another xy daemon appears to be running")?;
let listener = bind(&paths.socket).context("bind unix socket")?;
let _ = PATHS.set(paths.clone());
info!(socket = %paths.socket.display(), "daemon listening");
let configs = load_all_configs(&paths.config_dir).context("load configs")?;
let registry = registry::Registry::new();
for cfg in configs {
let hash = config_hash(&cfg);
let handle = spawn_supervisor(&paths, cfg)?;
let name = handle.name.clone();
registry
.insert(
name.clone(),
registry::Entry {
handle: handle.clone(),
config_hash: hash,
},
)
.await;
let (ack_tx, ack_rx) = oneshot::channel();
if handle
.tx
.send(SupervisorCmd::Start { ack: ack_tx })
.await
.is_ok()
{
let _ = ack_rx.await;
}
}
let registry_for_shutdown = registry.clone();
let shutdown_signal = shutdown::install();
let accept = async {
loop {
let (stream, _addr) = match listener.accept().await {
Ok(p) => p,
Err(err) => {
error!(error = %err, "accept failed");
continue;
}
};
let conn = Arc::new(Connection::new(stream));
let reg = registry.clone();
let paths_clone = paths.clone();
tokio::spawn(async move {
if let Err(err) = handlers::serve(conn, reg, paths_clone).await {
error!(error = %err, "connection ended with error");
}
});
}
};
tokio::select! {
_ = accept => {}
_ = shutdown_signal => { info!("shutdown signal received"); }
}
shutdown::shutdown_all(registry_for_shutdown).await;
Ok(())
}
+42
View File
@@ -0,0 +1,42 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use xy_supervisor::SupervisorHandle;
#[derive(Clone)]
pub struct Entry {
pub handle: SupervisorHandle,
pub config_hash: u64,
}
#[derive(Clone, Default)]
pub struct Registry {
inner: Arc<RwLock<HashMap<String, Entry>>>,
}
impl Registry {
pub fn new() -> Self {
Self::default()
}
pub async fn insert(&self, name: String, entry: Entry) {
self.inner.write().await.insert(name, entry);
}
pub async fn remove(&self, name: &str) -> Option<Entry> {
self.inner.write().await.remove(name)
}
pub async fn get(&self, name: &str) -> Option<Entry> {
self.inner.read().await.get(name).cloned()
}
pub async fn names(&self) -> Vec<String> {
let g = self.inner.read().await;
let mut v: Vec<String> = g.keys().cloned().collect();
v.sort();
v
}
pub async fn snapshot(&self) -> Vec<(String, Entry)> {
let g = self.inner.read().await;
let mut v: Vec<_> = g.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
v.sort_by(|a, b| a.0.cmp(&b.0));
v
}
}
+45
View File
@@ -0,0 +1,45 @@
use crate::daemon::registry::Registry;
use tokio::signal::unix::{SignalKind, signal};
use tokio::sync::oneshot;
use xy_supervisor::supervisor::SupervisorCmd;
pub fn install() -> impl std::future::Future<Output = ()> {
let mut term = signal(SignalKind::terminate()).expect("install SIGTERM handler");
let mut int = signal(SignalKind::interrupt()).expect("install SIGINT handler");
async move {
tokio::select! {
_ = term.recv() => {}
_ = int.recv() => {}
}
}
}
pub async fn shutdown_all(reg: Registry) {
let snapshot = reg.snapshot().await;
let mut acks = Vec::new();
for (_name, entry) in &snapshot {
let (tx, rx) = oneshot::channel();
if entry
.handle
.tx
.send(SupervisorCmd::Shutdown { ack: tx })
.await
.is_ok()
{
acks.push(rx);
}
}
let deadline = tokio::time::Duration::from_secs(30);
let _ = tokio::time::timeout(deadline, async {
for rx in acks {
let _ = rx.await;
}
})
.await;
}
+93
View File
@@ -0,0 +1,93 @@
use clap::{Parser, Subcommand};
mod cli;
mod daemon;
mod paths;
mod pidfile;
#[derive(Debug, Parser)]
#[command(name = "xy", version, about = "HTTP MCP server supervisor")]
struct Cli {
#[command(subcommand)]
cmd: Cmd,
}
#[derive(Debug, Subcommand)]
enum Cmd {
/// Run the daemon in the foreground.
Daemon,
/// List all configured servers with state.
List,
/// Show detailed status for a single server.
Status { name: String },
/// Start a server (or all configured servers with --all).
Start {
#[arg(long, conflicts_with = "name")]
all: bool,
#[arg(required_unless_present = "all")]
name: Option<String>,
},
/// Stop a server (or --all).
Stop {
#[arg(long, conflicts_with = "name")]
all: bool,
#[arg(required_unless_present = "all")]
name: Option<String>,
},
/// Restart a server (or --all).
Restart {
#[arg(long, conflicts_with = "name")]
all: bool,
#[arg(required_unless_present = "all")]
name: Option<String>,
},
/// Re-read config dir and reconcile running servers.
Reload,
/// Stream a server's log.
Logs {
name: String,
#[arg(long)]
tail: Option<u32>,
#[arg(short = 'f', long)]
follow: bool,
},
}
#[tokio::main]
async fn main() -> std::process::ExitCode {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.with_writer(std::io::stderr)
.init();
let cli = Cli::parse();
let paths = match paths::Paths::resolve() {
Ok(p) => p,
Err(e) => {
eprintln!("xy: failed to resolve XDG paths: {e}");
return std::process::ExitCode::from(3);
}
};
let result: anyhow::Result<i32> = match cli.cmd {
Cmd::Daemon => daemon::run(paths).await.map(|_| 0),
Cmd::List => cli::list(paths).await,
Cmd::Status { name } => cli::status(paths, name).await,
Cmd::Start { all, name } => cli::start(paths, all, name).await,
Cmd::Stop { all, name } => cli::stop(paths, all, name).await,
Cmd::Restart { all, name } => cli::restart(paths, all, name).await,
Cmd::Reload => cli::reload(paths).await,
Cmd::Logs { name, tail, follow } => cli::logs(paths, name, tail, follow).await,
};
match result {
Ok(code) => std::process::ExitCode::from(code as u8),
Err(e) => {
eprintln!("xy: {e:#}");
std::process::ExitCode::from(1)
}
}
}
+48
View File
@@ -0,0 +1,48 @@
use etcetera::base_strategy::{BaseStrategy, Xdg};
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub struct Paths {
pub config_dir: PathBuf,
pub state_dir: PathBuf,
pub log_dir: PathBuf,
pub socket: PathBuf,
pub pidfile: PathBuf,
}
impl Paths {
pub fn resolve() -> std::io::Result<Self> {
let xdg = Xdg::new().map_err(std::io::Error::other)?;
let config_dir = xdg.config_dir().join("xy").join("servers");
let state_dir = xdg.state_dir().unwrap_or_else(|| xdg.data_dir()).join("xy");
let log_dir = state_dir.join("logs");
let socket = std::env::var_os("XDG_RUNTIME_DIR")
.map(|p| PathBuf::from(p).join("xy.sock"))
.unwrap_or_else(|| state_dir.join("xy.sock"));
let pidfile = state_dir.join("xy.pid");
Ok(Self {
config_dir,
state_dir,
log_dir,
socket,
pidfile,
})
}
pub fn ensure_dirs(&self) -> std::io::Result<()> {
std::fs::create_dir_all(&self.state_dir)?;
std::fs::create_dir_all(&self.log_dir)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn resolves_without_panicking() {
let p = Paths::resolve().unwrap();
assert!(p.pidfile.starts_with(&p.state_dir));
}
}
+57
View File
@@ -0,0 +1,57 @@
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::os::unix::fs::OpenOptionsExt;
use std::path::{Path, PathBuf};
#[derive(Debug)]
pub struct PidFile {
path: PathBuf,
_file: File,
}
impl PidFile {
pub fn acquire(path: &Path) -> std::io::Result<Self> {
let mut f = OpenOptions::new()
.write(true)
.create_new(true)
.mode(0o600)
.open(path)?;
writeln!(f, "{}", std::process::id())?;
Ok(Self {
path: path.to_path_buf(),
_file: f,
})
}
}
impl Drop for PidFile {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn acquire_then_second_fails() {
let dir = tempdir().unwrap();
let p = dir.path().join("x.pid");
let _g = PidFile::acquire(&p).unwrap();
let err = PidFile::acquire(&p).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::AlreadyExists);
}
#[test]
fn drop_removes_file() {
let dir = tempdir().unwrap();
let p = dir.path().join("x.pid");
{
let _g = PidFile::acquire(&p).unwrap();
assert!(p.exists());
}
assert!(!p.exists());
}
}
+102
View File
@@ -0,0 +1,102 @@
#![allow(dead_code)] // not all helpers are used by every test file
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use tempfile::TempDir;
use tokio::process::{Child, Command};
pub struct Harness {
pub tmp: TempDir,
pub config_dir: PathBuf,
pub state_dir: PathBuf,
pub socket: PathBuf,
pub daemon: Option<Child>,
}
impl Harness {
pub fn new() -> Self {
let tmp = tempfile::tempdir().expect("tempdir");
std::fs::create_dir_all(tmp.path().join("config")).unwrap();
std::fs::create_dir_all(tmp.path().join("state")).unwrap();
std::fs::create_dir_all(tmp.path().join("run")).unwrap();
let config_dir = tmp.path().join("config/xy/servers");
let state_dir = tmp.path().join("state/xy");
std::fs::create_dir_all(&config_dir).unwrap();
std::fs::create_dir_all(&state_dir).unwrap();
let socket = tmp.path().join("run/xy.sock");
Self {
tmp,
config_dir,
state_dir,
socket,
daemon: None,
}
}
pub fn write_server(&self, name: &str, command: &str, port: u16, restart_policy: &str) {
let body = format!(
"command \"{command}\"\nport {port}\nrestart {{ policy \"{restart_policy}\" backoff-initial \"10ms\" backoff-max \"50ms\" max-retries-per-minute 3 }}\nstop {{ grace \"500ms\" }}\n"
);
std::fs::write(self.config_dir.join(format!("{name}.kdl")), body).unwrap();
}
pub async fn start_daemon(&mut self, xy_bin: &PathBuf) {
let child = Command::new(xy_bin)
.arg("daemon")
.env("XDG_CONFIG_HOME", self.tmp.path().join("config"))
.env("XDG_STATE_HOME", self.tmp.path().join("state"))
.env("XDG_RUNTIME_DIR", self.tmp.path().join("run"))
.stdout(Stdio::null())
.stderr(Stdio::inherit())
.kill_on_drop(true)
.spawn()
.expect("spawn daemon");
self.daemon = Some(child);
let deadline = std::time::Instant::now() + Duration::from_secs(5);
while !self.socket.exists() {
if std::time::Instant::now() > deadline {
panic!("daemon socket never appeared");
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
}
pub async fn run_cli(&self, xy_bin: &PathBuf, args: &[&str]) -> (i32, String, String) {
let out = Command::new(xy_bin)
.args(args)
.env("XDG_CONFIG_HOME", self.tmp.path().join("config"))
.env("XDG_STATE_HOME", self.tmp.path().join("state"))
.env("XDG_RUNTIME_DIR", self.tmp.path().join("run"))
.output()
.await
.expect("run cli");
let code = out.status.code().unwrap_or(-1);
let stdout = String::from_utf8_lossy(&out.stdout).to_string();
let stderr = String::from_utf8_lossy(&out.stderr).to_string();
(code, stdout, stderr)
}
}
pub fn xy_bin() -> PathBuf {
artifact("xy")
}
pub fn sleep_server_bin() -> PathBuf {
artifact("xy-test-sleep-server")
}
pub fn exit_failure_bin() -> PathBuf {
artifact("xy-test-exit-failure")
}
fn artifact(name: &str) -> PathBuf {
let mut p = std::env::current_exe().unwrap();
p.pop();
if p.ends_with("deps") {
p.pop();
}
p.push(name);
if !p.exists() {
panic!("artifact `{}` not found at {}", name, p.display());
}
p
}
+31
View File
@@ -0,0 +1,31 @@
mod common;
use common::*;
#[tokio::test]
async fn auto_starts_on_boot_then_stop_and_start() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("alpha", sleeper.to_str().unwrap(), 19_001, "always");
h.start_daemon(&xy).await;
let mut last_stdout = String::new();
for _ in 0..40 {
let (_c, out, _e) = h.run_cli(&xy, &["list"]).await;
last_stdout = out;
if last_stdout.contains("alpha") && last_stdout.contains("running") {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert!(last_stdout.contains("alpha"), "stdout: {last_stdout}");
assert!(last_stdout.contains("running"), "stdout: {last_stdout}");
let (code, out, _e) = h.run_cli(&xy, &["stop", "alpha"]).await;
assert_eq!(code, 0);
assert!(out.contains("stopped: alpha"), "stdout: {out}");
let (code, out, _e) = h.run_cli(&xy, &["start", "alpha"]).await;
assert_eq!(code, 0);
assert!(out.contains("started: alpha"), "stdout: {out}");
}
+49
View File
@@ -0,0 +1,49 @@
mod common;
use common::*;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
#[tokio::test]
async fn logs_tail_prints_existing_lines() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("svc", sleeper.to_str().unwrap(), 19_030, "always");
h.start_daemon(&xy).await;
tokio::time::sleep(Duration::from_millis(500)).await;
let (code, out, _e) = h.run_cli(&xy, &["logs", "svc", "--tail", "10"]).await;
assert_eq!(code, 0);
assert!(out.contains("ready"), "stdout: {out}");
}
#[tokio::test]
async fn logs_follow_streams_new_lines() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("svc", sleeper.to_str().unwrap(), 19_031, "always");
h.start_daemon(&xy).await;
let mut child = Command::new(&xy)
.args(["logs", "svc", "--follow"])
.env("XDG_CONFIG_HOME", h.tmp.path().join("config"))
.env("XDG_STATE_HOME", h.tmp.path().join("state"))
.env("XDG_RUNTIME_DIR", h.tmp.path().join("run"))
.stdout(Stdio::piped())
.stderr(Stdio::null())
.kill_on_drop(true)
.spawn()
.unwrap();
let stdout = child.stdout.take().unwrap();
let mut lines = BufReader::new(stdout).lines();
let first = tokio::time::timeout(Duration::from_secs(2), lines.next_line())
.await
.expect("timeout waiting for first log line")
.unwrap();
assert!(first.is_some());
let _ = tokio::time::timeout(Duration::from_secs(2), child.kill()).await;
}
+37
View File
@@ -0,0 +1,37 @@
mod common;
use common::*;
#[tokio::test]
async fn reload_adds_removes_and_changes() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("a", sleeper.to_str().unwrap(), 19_020, "always");
h.start_daemon(&xy).await;
for _ in 0..40 {
let (_c, out, _e) = h.run_cli(&xy, &["list"]).await;
if out.contains("a") && out.contains("running") {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
h.write_server("a", sleeper.to_str().unwrap(), 19_021, "always");
h.write_server("b", sleeper.to_str().unwrap(), 19_022, "always");
let (code, out, _e) = h.run_cli(&xy, &["reload"]).await;
assert_eq!(code, 0);
assert!(out.contains("added:") && out.contains("b"), "stdout: {out}");
assert!(
out.contains("changed:") && out.contains("a"),
"stdout: {out}"
);
std::fs::remove_file(h.config_dir.join("a.kdl")).unwrap();
let (code, out, _e) = h.run_cli(&xy, &["reload"]).await;
assert_eq!(code, 0);
assert!(
out.contains("removed:") && out.contains("a"),
"stdout: {out}"
);
}
+22
View File
@@ -0,0 +1,22 @@
mod common;
use common::*;
#[tokio::test]
async fn restart_cap_marks_failed() {
let xy = xy_bin();
let bad = exit_failure_bin();
let mut h = Harness::new();
h.write_server("flaky", bad.to_str().unwrap(), 19_010, "always");
h.start_daemon(&xy).await;
let mut saw_failed = false;
for _ in 0..60 {
let (_c, out, _e) = h.run_cli(&xy, &["list"]).await;
if out.contains("flaky") && out.contains("failed") {
saw_failed = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert!(saw_failed, "flaky never reached failed state");
}
+15
View File
@@ -0,0 +1,15 @@
command "/Users/you/.cargo/bin/insikt-mcp"
args "--http" "--port" "8421"
port 8421
env {
RUST_LOG "info"
}
restart {
policy "on-failure"
}
stop {
grace "10s"
}
-3
View File
@@ -1,3 +0,0 @@
fn main() {
println!("Hello, world!");
}