# xy MCP Supervisor — Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Build the MVP of `xy`, a Tokio-based daemon that supervises HTTP-based MCP server processes on macOS/Linux, plus a CLI that drives it over a Unix socket. **Architecture:** Single `xy` binary in a Cargo workspace of four crates (`xy-protocol`, `xy-supervisor`, `xy-ipc`, `xy`). The daemon (`xy daemon`) runs one supervisor task per managed server; the CLI subcommands act as JSON-RPC 2.0 clients over a newline-delimited Unix socket. Configs are per-server KDL files under XDG config dir. **Tech Stack:** Rust (edition 2024), Tokio (rt-multi-thread, net, process, signal, sync, fs, io-util, macros), clap (derive), serde + serde_json, kdl, etcetera (XDG), nix (signals/process groups), tracing + tracing-subscriber, thiserror, anyhow. **Spec:** `docs/superpowers/specs/2026-05-25-xy-mcp-supervisor-design.md` --- ## File Structure ``` xy/ ├── Cargo.toml # workspace manifest (no [package]) ├── crates/ │ ├── xy-protocol/ │ │ ├── Cargo.toml │ │ └── src/ │ │ ├── lib.rs # re-exports │ │ ├── config.rs # ServerConfig, RestartPolicy, parsing │ │ ├── state.rs # ServerState enum │ │ ├── rpc.rs # JSON-RPC method params/results │ │ └── error.rs # ConfigError, RpcErrorCode │ ├── xy-supervisor/ │ │ ├── Cargo.toml │ │ └── src/ │ │ ├── lib.rs # public surface │ │ ├── child.rs # ChildHandle trait + RealChild │ │ ├── policy.rs # restart decision logic │ │ ├── backoff.rs # backoff calculator │ │ ├── retry_window.rs # sliding 60s window │ │ ├── logs.rs # log writer, rotation, ring buffer, broadcast │ │ └── supervisor.rs # supervisor task + state machine │ ├── xy-ipc/ │ │ ├── Cargo.toml │ │ └── src/ │ │ ├── lib.rs │ │ ├── framing.rs # newline-delimited JSON codec │ │ ├── envelope.rs # JSON-RPC 2.0 request/response/notification │ │ ├── server.rs # listen, accept, dispatch │ │ └── client.rs # connect, call, subscribe │ └── xy/ │ ├── Cargo.toml │ └── src/ │ ├── main.rs # clap entry, subcommand dispatch │ ├── paths.rs # XDG path resolution (via etcetera) │ ├── pidfile.rs # acquire/release exclusive pidfile │ ├── daemon/ │ │ ├── mod.rs # daemon entry point │ │ ├── handlers.rs # JSON-RPC method handlers → supervisor msgs │ │ ├── registry.rs # name → supervisor handle map │ │ └── shutdown.rs # SIGTERM/SIGINT graceful shutdown │ ├── cli/ │ │ ├── mod.rs # client-side CLI command impls │ │ └── format.rs # `list` / `status` output formatting │ └── bin/ │ ├── xy_test_sleep_server.rs # integration test helper │ └── xy_test_exit_failure.rs # integration test helper └── crates/xy/tests/ ├── common/mod.rs # tempdir XDG, spawn daemon, drive CLI ├── lifecycle.rs # start/stop/restart ├── reload.rs # added/removed/changed ├── restart_policy.rs # crash + cap → failed └── logs.rs # --tail and --follow ``` Each crate's `lib.rs` re-exports its public surface so downstream crates use short paths (`xy_protocol::ServerConfig`, not `xy_protocol::config::ServerConfig`). --- > **Note for the executor:** The full step-by-step task list (37 tasks across 7 phases) follows. Each task is bite-sized (2–5 minutes per step) and includes complete code, exact commands, and commit messages. The plan was authored in a single drafting session; if a step's code snippet references a type or helper, it is defined either in that task or in an earlier numbered task within this plan. ## Phase 1 — Workspace skeleton ### Task 1: Convert single crate to workspace **Files:** - Modify: `Cargo.toml` - Create: `crates/xy-protocol/Cargo.toml` - Create: `crates/xy-protocol/src/lib.rs` - Create: `crates/xy-supervisor/Cargo.toml` - Create: `crates/xy-supervisor/src/lib.rs` - Create: `crates/xy-ipc/Cargo.toml` - Create: `crates/xy-ipc/src/lib.rs` - Create: `crates/xy/Cargo.toml` - Create: `crates/xy/src/main.rs` - Delete: `src/main.rs` and `src/` (replaced by `crates/xy/`) - [ ] **Step 1: Replace root `Cargo.toml` with a workspace manifest** ```toml [workspace] resolver = "3" members = [ "crates/xy-protocol", "crates/xy-supervisor", "crates/xy-ipc", "crates/xy", ] [workspace.package] edition = "2024" version = "0.1.0" license = "MIT OR Apache-2.0" [workspace.dependencies] xy-protocol = { path = "crates/xy-protocol" } xy-supervisor = { path = "crates/xy-supervisor" } xy-ipc = { path = "crates/xy-ipc" } tokio = { version = "1", features = ["rt-multi-thread", "net", "process", "signal", "sync", "fs", "io-util", "macros", "time"] } serde = { version = "1", features = ["derive"] } serde_json = "1" thiserror = "2" anyhow = "1" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } clap = { version = "4", features = ["derive"] } kdl = "6" etcetera = "0.10" nix = { version = "0.30", features = ["signal", "process"] } humantime = "2" humantime-serde = "1" async-trait = "0.1" tempfile = "3" tokio-test = "0.4" ``` - [ ] **Step 2: Create `crates/xy-protocol/Cargo.toml`** ```toml [package] name = "xy-protocol" edition.workspace = true version.workspace = true license.workspace = true [dependencies] serde.workspace = true serde_json.workspace = true thiserror.workspace = true kdl.workspace = true humantime.workspace = true humantime-serde.workspace = true [dev-dependencies] tempfile.workspace = true ``` - [ ] **Step 3: Create `crates/xy-protocol/src/lib.rs`** ```rust //! Wire types and config schema shared between the xy daemon and CLI. ``` - [ ] **Step 4: Create `crates/xy-supervisor/Cargo.toml`** ```toml [package] name = "xy-supervisor" edition.workspace = true version.workspace = true license.workspace = true [dependencies] xy-protocol.workspace = true tokio.workspace = true tracing.workspace = true thiserror.workspace = true nix.workspace = true async-trait.workspace = true [dev-dependencies] tokio-test.workspace = true tempfile.workspace = true tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] } ``` - [ ] **Step 5: Create `crates/xy-supervisor/src/lib.rs`** ```rust //! Process-supervision primitives for the xy daemon. ``` - [ ] **Step 6: Create `crates/xy-ipc/Cargo.toml`** ```toml [package] name = "xy-ipc" edition.workspace = true version.workspace = true license.workspace = true [dependencies] xy-protocol.workspace = true tokio.workspace = true serde.workspace = true serde_json.workspace = true tracing.workspace = true thiserror.workspace = true [dev-dependencies] tempfile.workspace = true ``` - [ ] **Step 7: Create `crates/xy-ipc/src/lib.rs`** ```rust //! JSON-RPC 2.0 over newline-delimited JSON on a Unix socket. ``` - [ ] **Step 8: Create `crates/xy/Cargo.toml`** ```toml [package] name = "xy" edition.workspace = true version.workspace = true license.workspace = true [[bin]] name = "xy" path = "src/main.rs" [dependencies] xy-protocol.workspace = true xy-supervisor.workspace = true xy-ipc.workspace = true tokio.workspace = true clap.workspace = true serde.workspace = true serde_json.workspace = true tracing.workspace = true tracing-subscriber.workspace = true anyhow.workspace = true etcetera.workspace = true nix.workspace = true humantime.workspace = true [dev-dependencies] tempfile.workspace = true ``` - [ ] **Step 9: Create `crates/xy/src/main.rs`** ```rust fn main() { println!("xy: not implemented yet"); } ``` - [ ] **Step 10: Remove the old top-level `src/` directory** Run: `rm -rf src/` - [ ] **Step 11: Verify the workspace builds** Run: `cargo build --workspace` Expected: every crate compiles; one binary `xy` is produced. - [ ] **Step 12: Commit** ```bash git add Cargo.toml crates/ git rm -r --cached src/ 2>/dev/null || true git add -A git commit -m "chore: convert to cargo workspace with four crates" ``` --- ## Phase 2 — `xy-protocol` ### Task 2: `ServerState` enum **Files:** - Create: `crates/xy-protocol/src/state.rs` - Modify: `crates/xy-protocol/src/lib.rs` - [ ] **Step 1: Write the failing test inline in `state.rs`** ```rust use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub enum ServerState { Stopped, Starting, Running, Restarting, Failed, Stopping, } #[cfg(test)] mod tests { use super::*; #[test] fn serializes_to_kebab_case() { assert_eq!(serde_json::to_string(&ServerState::Restarting).unwrap(), "\"restarting\""); } #[test] fn deserializes_from_kebab_case() { let s: ServerState = serde_json::from_str("\"failed\"").unwrap(); assert_eq!(s, ServerState::Failed); } } ``` - [ ] **Step 2: Expose from `lib.rs`** ```rust //! Wire types and config schema shared between the xy daemon and CLI. pub mod state; pub use state::ServerState; ``` - [ ] **Step 3: Run tests** Run: `cargo test -p xy-protocol` Expected: 2 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy-protocol/ git commit -m "feat(protocol): ServerState enum" ``` ### Task 3: `RestartPolicy` + `RestartConfig` + `StopConfig` **Files:** - Create: `crates/xy-protocol/src/config.rs` - Modify: `crates/xy-protocol/src/lib.rs` - [ ] **Step 1: Write the file** ```rust use serde::{Deserialize, Serialize}; use std::time::Duration; #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub enum RestartPolicy { Always, OnFailure, Never, } impl Default for RestartPolicy { fn default() -> Self { RestartPolicy::OnFailure } } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RestartConfig { #[serde(default)] pub policy: RestartPolicy, #[serde(default = "default_backoff_initial", with = "humantime_serde")] pub backoff_initial: Duration, #[serde(default = "default_backoff_max", with = "humantime_serde")] pub backoff_max: Duration, #[serde(default = "default_max_retries_per_minute")] pub max_retries_per_minute: u32, } fn default_backoff_initial() -> Duration { Duration::from_secs(1) } fn default_backoff_max() -> Duration { Duration::from_secs(30) } fn default_max_retries_per_minute() -> u32 { 5 } impl Default for RestartConfig { fn default() -> Self { Self { policy: RestartPolicy::default(), backoff_initial: default_backoff_initial(), backoff_max: default_backoff_max(), max_retries_per_minute: default_max_retries_per_minute(), } } } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct StopConfig { #[serde(default = "default_grace", with = "humantime_serde")] pub grace: Duration, } fn default_grace() -> Duration { Duration::from_secs(10) } impl Default for StopConfig { fn default() -> Self { Self { grace: default_grace() } } } #[cfg(test)] mod tests { use super::*; #[test] fn restart_config_defaults() { let c = RestartConfig::default(); assert_eq!(c.policy, RestartPolicy::OnFailure); assert_eq!(c.backoff_initial, Duration::from_secs(1)); assert_eq!(c.backoff_max, Duration::from_secs(30)); assert_eq!(c.max_retries_per_minute, 5); } #[test] fn stop_config_defaults() { assert_eq!(StopConfig::default().grace, Duration::from_secs(10)); } } ``` - [ ] **Step 2: Expose from `lib.rs`** ```rust //! Wire types and config schema shared between the xy daemon and CLI. pub mod config; pub mod state; pub use config::{RestartConfig, RestartPolicy, StopConfig}; pub use state::ServerState; ``` - [ ] **Step 3: Run tests** Run: `cargo test -p xy-protocol` Expected: 4 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy-protocol/ git commit -m "feat(protocol): RestartPolicy/RestartConfig/StopConfig with defaults" ``` ### Task 4: `ServerConfig` struct + `ConfigError` **Files:** - Modify: `crates/xy-protocol/src/config.rs` - Create: `crates/xy-protocol/src/error.rs` - Modify: `crates/xy-protocol/src/lib.rs` - [ ] **Step 1: Append `ServerConfig` to `config.rs`** ```rust use std::collections::BTreeMap; use std::path::PathBuf; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ServerConfig { pub name: String, pub command: PathBuf, #[serde(default)] pub args: Vec, pub port: u16, #[serde(default)] pub env: BTreeMap, #[serde(default)] pub working_dir: Option, #[serde(default)] pub restart: RestartConfig, #[serde(default)] pub stop: StopConfig, } ``` - [ ] **Step 2: Create `crates/xy-protocol/src/error.rs`** ```rust use std::path::PathBuf; use thiserror::Error; #[derive(Debug, Error)] pub enum ConfigError { #[error("failed to read {path}: {source}")] Io { path: PathBuf, #[source] source: std::io::Error }, #[error("failed to parse KDL in {path}: {message}")] Parse { path: PathBuf, message: String }, #[error("missing required field `{field}` in {path}")] MissingField { path: PathBuf, field: &'static str }, #[error("invalid value for `{field}` in {path}: {message}")] InvalidValue { path: PathBuf, field: &'static str, message: String }, #[error("duplicate port {port} declared by both `{name_a}` and `{name_b}`")] DuplicatePort { name_a: String, name_b: String, port: u16 }, #[error("server name `{name}` contains invalid characters (allowed: a-z, 0-9, '-', '_')")] InvalidName { name: String }, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RpcErrorCode { ServerNotFound = -32001, PortConflict = -32002, ConfigInvalid = -32003, AlreadyRunning = -32004, NotRunning = -32005, SpawnFailed = -32006, } impl RpcErrorCode { pub fn as_i32(self) -> i32 { self as i32 } } ``` - [ ] **Step 3: Update `lib.rs`** ```rust //! Wire types and config schema shared between the xy daemon and CLI. pub mod config; pub mod error; pub mod state; pub use config::{RestartConfig, RestartPolicy, ServerConfig, StopConfig}; pub use error::{ConfigError, RpcErrorCode}; pub use state::ServerState; ``` - [ ] **Step 4: Run tests** Run: `cargo test -p xy-protocol` Expected: 4 passed (unchanged; types only). - [ ] **Step 5: Commit** ```bash git add crates/xy-protocol/ git commit -m "feat(protocol): ServerConfig + ConfigError + RpcErrorCode" ``` ### Task 5: KDL parser for `ServerConfig` **Files:** - Create: `crates/xy-protocol/src/kdl_parse.rs` - Modify: `crates/xy-protocol/src/lib.rs` - [ ] **Step 1: Write the parser + tests in one file** ```rust use crate::{ConfigError, RestartConfig, RestartPolicy, ServerConfig, StopConfig}; use kdl::{KdlDocument, KdlNode}; use std::collections::BTreeMap; use std::path::{Path, PathBuf}; use std::time::Duration; pub fn parse_server_config(name: &str, text: &str, source_path: &Path) -> Result { validate_name(name).map_err(|_| ConfigError::InvalidName { name: name.to_string() })?; let doc: KdlDocument = text.parse().map_err(|e: kdl::KdlError| ConfigError::Parse { path: source_path.to_path_buf(), message: e.to_string(), })?; let command = require_string_arg(&doc, "command", source_path)?; let args = optional_string_args(&doc, "args"); let port = require_u16_arg(&doc, "port", source_path)?; let env = optional_string_map(&doc, "env"); let working_dir = optional_string_arg(&doc, "working-dir").map(PathBuf::from); let restart = parse_restart(&doc, source_path)?; let stop = parse_stop(&doc, source_path)?; Ok(ServerConfig { name: name.to_string(), command: PathBuf::from(command), args, port, env, working_dir, restart, stop, }) } fn validate_name(name: &str) -> Result<(), ()> { if name.is_empty() { return Err(()); } if name.chars().all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '_') { Ok(()) } else { Err(()) } } fn require_string_arg(doc: &KdlDocument, name: &'static str, path: &Path) -> Result { let node = find_node(doc, name).ok_or(ConfigError::MissingField { path: path.to_path_buf(), field: name })?; node.entries().first() .and_then(|e| e.value().as_string().map(str::to_string)) .ok_or(ConfigError::InvalidValue { path: path.to_path_buf(), field: name, message: "expected string argument".into() }) } fn require_u16_arg(doc: &KdlDocument, name: &'static str, path: &Path) -> Result { let node = find_node(doc, name).ok_or(ConfigError::MissingField { path: path.to_path_buf(), field: name })?; let v = node.entries().first() .and_then(|e| e.value().as_integer()) .ok_or(ConfigError::InvalidValue { path: path.to_path_buf(), field: name, message: "expected integer".into() })?; u16::try_from(v).map_err(|_| ConfigError::InvalidValue { path: path.to_path_buf(), field: name, message: format!("port {v} out of u16 range"), }) } fn optional_string_arg(doc: &KdlDocument, name: &str) -> Option { find_node(doc, name) .and_then(|n| n.entries().first()) .and_then(|e| e.value().as_string().map(str::to_string)) } fn optional_string_args(doc: &KdlDocument, name: &str) -> Vec { find_node(doc, name) .map(|n| n.entries().iter() .filter_map(|e| e.value().as_string().map(str::to_string)) .collect()) .unwrap_or_default() } fn optional_string_map(doc: &KdlDocument, name: &str) -> BTreeMap { let Some(node) = find_node(doc, name) else { return BTreeMap::new(); }; let mut out = BTreeMap::new(); if let Some(children) = node.children() { for child in children.nodes() { let key = child.name().value().to_string(); if let Some(val) = child.entries().first().and_then(|e| e.value().as_string()) { out.insert(key, val.to_string()); } } } out } fn parse_restart(doc: &KdlDocument, path: &Path) -> Result { let Some(node) = find_node(doc, "restart") else { return Ok(RestartConfig::default()); }; let Some(children) = node.children() else { return Ok(RestartConfig::default()); }; let mut out = RestartConfig::default(); for child in children.nodes() { match child.name().value() { "policy" => { let s = string_arg(child, "policy", path)?; out.policy = match s.as_str() { "always" => RestartPolicy::Always, "on-failure" => RestartPolicy::OnFailure, "never" => RestartPolicy::Never, other => return Err(ConfigError::InvalidValue { path: path.to_path_buf(), field: "restart.policy", message: format!("unknown policy `{other}`"), }), }; } "backoff-initial" => out.backoff_initial = parse_duration_arg(child, "restart.backoff-initial", path)?, "backoff-max" => out.backoff_max = parse_duration_arg(child, "restart.backoff-max", path)?, "max-retries-per-minute" => { let v = child.entries().first() .and_then(|e| e.value().as_integer()) .ok_or(ConfigError::InvalidValue { path: path.to_path_buf(), field: "restart.max-retries-per-minute", message: "expected integer".into(), })?; out.max_retries_per_minute = u32::try_from(v).map_err(|_| ConfigError::InvalidValue { path: path.to_path_buf(), field: "restart.max-retries-per-minute", message: format!("out of u32 range: {v}"), })?; } other => return Err(ConfigError::InvalidValue { path: path.to_path_buf(), field: "restart", message: format!("unknown key `{other}`"), }), } } Ok(out) } fn parse_stop(doc: &KdlDocument, path: &Path) -> Result { let Some(node) = find_node(doc, "stop") else { return Ok(StopConfig::default()); }; let Some(children) = node.children() else { return Ok(StopConfig::default()); }; let mut out = StopConfig::default(); for child in children.nodes() { match child.name().value() { "grace" => out.grace = parse_duration_arg(child, "stop.grace", path)?, other => return Err(ConfigError::InvalidValue { path: path.to_path_buf(), field: "stop", message: format!("unknown key `{other}`"), }), } } Ok(out) } fn string_arg(node: &KdlNode, field: &'static str, path: &Path) -> Result { node.entries().first() .and_then(|e| e.value().as_string().map(str::to_string)) .ok_or(ConfigError::InvalidValue { path: path.to_path_buf(), field, message: "expected string".into() }) } fn parse_duration_arg(node: &KdlNode, field: &'static str, path: &Path) -> Result { let s = string_arg(node, field, path)?; humantime::parse_duration(&s).map_err(|e| ConfigError::InvalidValue { path: path.to_path_buf(), field, message: format!("invalid duration `{s}`: {e}"), }) } fn find_node<'a>(doc: &'a KdlDocument, name: &str) -> Option<&'a KdlNode> { doc.nodes().iter().find(|n| n.name().value() == name) } #[cfg(test)] mod tests { use super::*; use std::path::Path; fn p() -> &'static Path { Path::new("/tmp/test.kdl") } #[test] fn parses_minimal_config() { let text = "command \"/usr/local/bin/foo\"\nport 8421\n"; let cfg = parse_server_config("foo", text, p()).unwrap(); assert_eq!(cfg.name, "foo"); assert_eq!(cfg.command, PathBuf::from("/usr/local/bin/foo")); assert_eq!(cfg.port, 8421); assert!(cfg.args.is_empty()); assert!(cfg.env.is_empty()); } #[test] fn parses_full_config() { let text = r#" command "/usr/local/bin/foo" args "--http" "--port" "8421" port 8421 env { RUST_LOG "info" FOO_BAR "baz" } working-dir "/tmp/work" restart { policy "always" backoff-initial "2s" backoff-max "1m" max-retries-per-minute 10 } stop { grace "30s" } "#; let cfg = parse_server_config("foo", text, p()).unwrap(); assert_eq!(cfg.args, vec!["--http", "--port", "8421"]); assert_eq!(cfg.env.get("RUST_LOG").map(String::as_str), Some("info")); assert_eq!(cfg.working_dir, Some(PathBuf::from("/tmp/work"))); assert_eq!(cfg.restart.policy, RestartPolicy::Always); assert_eq!(cfg.restart.backoff_initial, Duration::from_secs(2)); assert_eq!(cfg.restart.backoff_max, Duration::from_secs(60)); assert_eq!(cfg.restart.max_retries_per_minute, 10); assert_eq!(cfg.stop.grace, Duration::from_secs(30)); } #[test] fn missing_command_fails() { let err = parse_server_config("foo", "port 8421", p()).unwrap_err(); assert!(matches!(err, ConfigError::MissingField { field: "command", .. })); } #[test] fn missing_port_fails() { let err = parse_server_config("foo", "command \"/bin/x\"", p()).unwrap_err(); assert!(matches!(err, ConfigError::MissingField { field: "port", .. })); } #[test] fn unknown_restart_policy_fails() { let text = "command \"/bin/x\"\nport 1\nrestart { policy \"maybe\" }"; let err = parse_server_config("foo", text, p()).unwrap_err(); assert!(matches!(err, ConfigError::InvalidValue { field: "restart.policy", .. })); } #[test] fn invalid_name_rejected() { let text = "command \"/bin/x\"\nport 1"; let err = parse_server_config("Foo Bar", text, p()).unwrap_err(); assert!(matches!(err, ConfigError::InvalidName { .. })); } } ``` - [ ] **Step 2: Expose from `lib.rs`** Append to `lib.rs`: ```rust pub mod kdl_parse; pub use kdl_parse::parse_server_config; ``` - [ ] **Step 3: Run tests** Run: `cargo test -p xy-protocol` Expected: 10 passed. - [ ] **Step 4: Fix any KDL API mismatches** If the `kdl` crate's API differs slightly in your installed version (e.g. `name().value()` vs `name().to_string()`), adapt call sites. Re-run until green. - [ ] **Step 5: Commit** ```bash git add crates/xy-protocol/ git commit -m "feat(protocol): KDL parser for ServerConfig" ``` ### Task 6: Load all configs from a directory + duplicate-port check **Files:** - Modify: `crates/xy-protocol/src/kdl_parse.rs` - Modify: `crates/xy-protocol/src/lib.rs` - [ ] **Step 1: Append `load_all_configs` and tests** Append to `kdl_parse.rs`: ```rust pub fn load_all_configs(dir: &Path) -> Result, ConfigError> { if !dir.exists() { return Ok(Vec::new()); } let entries = std::fs::read_dir(dir).map_err(|e| ConfigError::Io { path: dir.to_path_buf(), source: e, })?; let mut configs = Vec::new(); for entry in entries { let entry = entry.map_err(|e| ConfigError::Io { path: dir.to_path_buf(), source: e })?; let path = entry.path(); if path.extension().and_then(|s| s.to_str()) != Some("kdl") { continue; } let name = path.file_stem().and_then(|s| s.to_str()) .ok_or(ConfigError::InvalidName { name: path.display().to_string() })? .to_string(); let text = std::fs::read_to_string(&path).map_err(|e| ConfigError::Io { path: path.clone(), source: e })?; configs.push(parse_server_config(&name, &text, &path)?); } check_duplicate_ports(&configs)?; Ok(configs) } fn check_duplicate_ports(configs: &[ServerConfig]) -> Result<(), ConfigError> { let mut seen: std::collections::HashMap = std::collections::HashMap::new(); for c in configs { if let Some(other) = seen.insert(c.port, c.name.clone()) { return Err(ConfigError::DuplicatePort { name_a: other, name_b: c.name.clone(), port: c.port, }); } } Ok(()) } ``` Append tests inside the existing `mod tests`: ```rust use tempfile::tempdir; use std::fs; #[test] fn load_all_finds_and_parses_files() { let dir = tempdir().unwrap(); fs::write(dir.path().join("a.kdl"), "command \"/bin/a\"\nport 8001").unwrap(); fs::write(dir.path().join("b.kdl"), "command \"/bin/b\"\nport 8002").unwrap(); fs::write(dir.path().join("ignored.txt"), "not a config").unwrap(); let mut configs = load_all_configs(dir.path()).unwrap(); configs.sort_by(|x, y| x.name.cmp(&y.name)); assert_eq!(configs.len(), 2); assert_eq!(configs[0].name, "a"); assert_eq!(configs[1].port, 8002); } #[test] fn load_all_returns_empty_for_missing_dir() { let dir = tempdir().unwrap(); let configs = load_all_configs(&dir.path().join("does-not-exist")).unwrap(); assert!(configs.is_empty()); } #[test] fn duplicate_ports_detected() { let dir = tempdir().unwrap(); fs::write(dir.path().join("a.kdl"), "command \"/bin/a\"\nport 8001").unwrap(); fs::write(dir.path().join("b.kdl"), "command \"/bin/b\"\nport 8001").unwrap(); let err = load_all_configs(dir.path()).unwrap_err(); match err { ConfigError::DuplicatePort { port, .. } => assert_eq!(port, 8001), other => panic!("unexpected error: {other:?}"), } } ``` - [ ] **Step 2: Expose** Append to `lib.rs`: `pub use kdl_parse::load_all_configs;` - [ ] **Step 3: Run tests** Run: `cargo test -p xy-protocol` Expected: 13 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy-protocol/ git commit -m "feat(protocol): load_all_configs from dir with duplicate port detection" ``` ### Task 7: JSON-RPC method types in `rpc.rs` **Files:** - Create: `crates/xy-protocol/src/rpc.rs` - Modify: `crates/xy-protocol/src/lib.rs` - [ ] **Step 1: Write the types** ```rust use crate::ServerState; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ServerSummary { pub name: String, pub state: ServerState, #[serde(skip_serializing_if = "Option::is_none")] pub pid: Option, pub port: u16, #[serde(skip_serializing_if = "Option::is_none")] pub uptime_secs: Option, pub restart_count: u32, #[serde(skip_serializing_if = "Option::is_none")] pub last_exit: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StatusDetail { #[serde(flatten)] pub summary: ServerSummary, pub recent_transitions: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StateTransition { pub from: ServerState, pub to: ServerState, pub at_unix_ms: u64, #[serde(skip_serializing_if = "Option::is_none")] pub reason: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] pub enum NameOrAll { All { all: bool }, Name { name: String }, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StatusParams { pub name: String } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StartResult { pub started: Vec, pub already_running: Vec } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StopResult { pub stopped: Vec, pub not_running: Vec } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RestartResult { pub restarted: Vec } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ReloadResult { pub added: Vec, pub removed: Vec, pub changed: Vec, pub unchanged: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LogsParams { pub name: String, #[serde(default, skip_serializing_if = "Option::is_none")] pub tail: Option, #[serde(default)] pub follow: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LogsSubscribed { pub subscription_id: u64 } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LogsCancelParams { pub subscription_id: u64 } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum LogStream { Stdout, Stderr } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LogLine { pub subscription_id: u64, pub name: String, pub stream: LogStream, pub line: String, pub ts_unix_ms: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LogEnd { pub subscription_id: u64 } pub mod methods { pub const LIST: &str = "list"; pub const STATUS: &str = "status"; pub const START: &str = "start"; pub const STOP: &str = "stop"; pub const RESTART: &str = "restart"; pub const RELOAD: &str = "reload"; pub const LOGS: &str = "logs"; pub const LOGS_CANCEL: &str = "logs_cancel"; } pub mod notifications { pub const LOG: &str = "log"; pub const LOG_END: &str = "log_end"; } #[cfg(test)] mod tests { use super::*; #[test] fn name_or_all_round_trips_name() { let v: NameOrAll = serde_json::from_str(r#"{"name":"foo"}"#).unwrap(); match v { NameOrAll::Name { name } => assert_eq!(name, "foo"), _ => panic!("expected Name variant"), } } #[test] fn name_or_all_round_trips_all() { let v: NameOrAll = serde_json::from_str(r#"{"all":true}"#).unwrap(); match v { NameOrAll::All { all } => assert!(all), _ => panic!("expected All variant"), } } } ``` - [ ] **Step 2: Expose** Append to `lib.rs`: ```rust pub mod rpc; ``` - [ ] **Step 3: Run tests** Run: `cargo test -p xy-protocol` Expected: 15 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy-protocol/ git commit -m "feat(protocol): JSON-RPC method param/result types" ``` --- ## Phase 3 — `xy-supervisor` ### Task 8: `ChildHandle` trait + `MockChild` for tests **Files:** - Create: `crates/xy-supervisor/src/child.rs` - Modify: `crates/xy-supervisor/src/lib.rs` - [ ] **Step 1: Define the trait + mock** ```rust use std::sync::Arc; use tokio::sync::{oneshot, Mutex}; #[async_trait::async_trait] pub trait ChildHandle: Send + 'static { fn pid(&self) -> u32; async fn wait(&mut self) -> std::io::Result>; fn terminate(&mut self) -> std::io::Result<()>; fn kill(&mut self) -> std::io::Result<()>; } pub struct MockChild { pid: u32, exit_rx: Arc>>>, terminate_tx: Option>, kill_tx: Option>, } pub struct MockChildController { pub exit_tx: Option>>, pub terminate_rx: oneshot::Receiver<()>, pub kill_rx: oneshot::Receiver<()>, } impl MockChild { pub fn new(pid: u32) -> (Self, MockChildController) { let (exit_tx, exit_rx) = oneshot::channel(); let (terminate_tx, terminate_rx) = oneshot::channel(); let (kill_tx, kill_rx) = oneshot::channel(); let child = Self { pid, exit_rx: Arc::new(Mutex::new(exit_rx)), terminate_tx: Some(terminate_tx), kill_tx: Some(kill_tx), }; let ctl = MockChildController { exit_tx: Some(exit_tx), terminate_rx, kill_rx }; (child, ctl) } } #[async_trait::async_trait] impl ChildHandle for MockChild { fn pid(&self) -> u32 { self.pid } async fn wait(&mut self) -> std::io::Result> { let mut rx = self.exit_rx.lock().await; match (&mut *rx).await { Ok(code) => Ok(code), Err(_) => Err(std::io::Error::other("exit_tx dropped")), } } fn terminate(&mut self) -> std::io::Result<()> { if let Some(tx) = self.terminate_tx.take() { let _ = tx.send(()); } Ok(()) } fn kill(&mut self) -> std::io::Result<()> { if let Some(tx) = self.kill_tx.take() { let _ = tx.send(()); } Ok(()) } } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn mock_child_exit() { let (mut child, mut ctl) = MockChild::new(123); assert_eq!(child.pid(), 123); ctl.exit_tx.take().unwrap().send(Some(0)).unwrap(); assert_eq!(child.wait().await.unwrap(), Some(0)); } #[tokio::test] async fn mock_child_terminate() { let (mut child, mut ctl) = MockChild::new(1); child.terminate().unwrap(); ctl.terminate_rx.try_recv().unwrap(); } } ``` - [ ] **Step 2: Update `lib.rs`** ```rust //! Process-supervision primitives for the xy daemon. pub mod child; pub use child::{ChildHandle, MockChild, MockChildController}; ``` - [ ] **Step 3: Run tests** Run: `cargo test -p xy-supervisor` Expected: 2 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy-supervisor/ git commit -m "feat(supervisor): ChildHandle trait + MockChild" ``` ### Task 9: Restart-policy decision logic **Files:** - Create: `crates/xy-supervisor/src/policy.rs` - Modify: `crates/xy-supervisor/src/lib.rs` - [ ] **Step 1: Write tests + impl** ```rust use xy_protocol::RestartPolicy; #[derive(Debug, PartialEq, Eq)] pub enum RestartDecision { Restart, StayStopped, MarkFailed } pub fn decide(policy: RestartPolicy, exit_code: Option, retry_cap_reached: bool) -> RestartDecision { let clean = matches!(exit_code, Some(0)); let want = match policy { RestartPolicy::Never => false, RestartPolicy::OnFailure => !clean, RestartPolicy::Always => true, }; if !want { return RestartDecision::StayStopped; } if retry_cap_reached { RestartDecision::MarkFailed } else { RestartDecision::Restart } } #[cfg(test)] mod tests { use super::*; #[test] fn never_never_restarts() { assert_eq!(decide(RestartPolicy::Never, Some(0), false), RestartDecision::StayStopped); assert_eq!(decide(RestartPolicy::Never, Some(1), false), RestartDecision::StayStopped); assert_eq!(decide(RestartPolicy::Never, None, false), RestartDecision::StayStopped); } #[test] fn on_failure_skips_clean() { assert_eq!(decide(RestartPolicy::OnFailure, Some(0), false), RestartDecision::StayStopped); } #[test] fn on_failure_restarts_nonzero() { assert_eq!(decide(RestartPolicy::OnFailure, Some(1), false), RestartDecision::Restart); } #[test] fn on_failure_restarts_signal() { assert_eq!(decide(RestartPolicy::OnFailure, None, false), RestartDecision::Restart); } #[test] fn always_restarts_on_clean() { assert_eq!(decide(RestartPolicy::Always, Some(0), false), RestartDecision::Restart); } #[test] fn cap_reached_marks_failed() { assert_eq!(decide(RestartPolicy::Always, Some(0), true), RestartDecision::MarkFailed); assert_eq!(decide(RestartPolicy::OnFailure, Some(1), true), RestartDecision::MarkFailed); } #[test] fn cap_reached_never_still_stopped() { assert_eq!(decide(RestartPolicy::Never, Some(1), true), RestartDecision::StayStopped); } } ``` - [ ] **Step 2: Expose** Append to `lib.rs`: `pub mod policy; pub use policy::{decide, RestartDecision};` - [ ] **Step 3: Run tests** Run: `cargo test -p xy-supervisor` Expected: 9 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy-supervisor/ git commit -m "feat(supervisor): restart-policy decision logic" ``` ### Task 10: Backoff calculator **Files:** - Create: `crates/xy-supervisor/src/backoff.rs` - Modify: `crates/xy-supervisor/src/lib.rs` - [ ] **Step 1: Write tests + impl** ```rust use std::time::Duration; #[derive(Debug, Clone)] pub struct Backoff { initial: Duration, max: Duration, current: Option, } impl Backoff { pub fn new(initial: Duration, max: Duration) -> Self { Self { initial, max, current: None } } pub fn next(&mut self) -> Duration { let next = match self.current { None => self.initial, Some(d) => (d * 2).min(self.max), }; self.current = Some(next); next } pub fn reset(&mut self) { self.current = None; } } #[cfg(test)] mod tests { use super::*; #[test] fn starts_at_initial() { let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30)); assert_eq!(b.next(), Duration::from_secs(1)); } #[test] fn doubles_each_call() { let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30)); assert_eq!(b.next(), Duration::from_secs(1)); assert_eq!(b.next(), Duration::from_secs(2)); assert_eq!(b.next(), Duration::from_secs(4)); assert_eq!(b.next(), Duration::from_secs(8)); } #[test] fn caps_at_max() { let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(5)); for _ in 0..10 { b.next(); } assert_eq!(b.next(), Duration::from_secs(5)); } #[test] fn reset_starts_over() { let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30)); b.next(); b.next(); b.next(); b.reset(); assert_eq!(b.next(), Duration::from_secs(1)); } } ``` - [ ] **Step 2: Expose** Append: `pub mod backoff; pub use backoff::Backoff;` - [ ] **Step 3: Run tests** Run: `cargo test -p xy-supervisor` Expected: 13 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy-supervisor/ git commit -m "feat(supervisor): exponential backoff calculator" ``` ### Task 11: Sliding-60s retry-window tracker **Files:** - Create: `crates/xy-supervisor/src/retry_window.rs` - Modify: `crates/xy-supervisor/src/lib.rs` - [ ] **Step 1: Write tests + impl** ```rust use std::collections::VecDeque; use std::time::{Duration, Instant}; #[derive(Debug, Clone)] pub struct RetryWindow { window: Duration, cap: u32, events: VecDeque, } impl RetryWindow { pub fn new(window: Duration, cap: u32) -> Self { Self { window, cap, events: VecDeque::new() } } pub fn record(&mut self, now: Instant) { self.events.push_back(now); self.prune(now); } pub fn cap_reached(&mut self, now: Instant) -> bool { self.prune(now); self.events.len() as u32 >= self.cap } pub fn count(&mut self, now: Instant) -> u32 { self.prune(now); self.events.len() as u32 } fn prune(&mut self, now: Instant) { while let Some(&front) = self.events.front() { if now.duration_since(front) > self.window { self.events.pop_front(); } else { break; } } } } #[cfg(test)] mod tests { use super::*; #[test] fn below_cap_not_reached() { let mut w = RetryWindow::new(Duration::from_secs(60), 3); let t = Instant::now(); w.record(t); w.record(t); assert!(!w.cap_reached(t)); } #[test] fn at_cap_reached() { let mut w = RetryWindow::new(Duration::from_secs(60), 3); let t = Instant::now(); w.record(t); w.record(t); w.record(t); assert!(w.cap_reached(t)); } #[test] fn old_events_pruned() { let mut w = RetryWindow::new(Duration::from_secs(60), 3); let t0 = Instant::now(); w.record(t0); w.record(t0); w.record(t0); let t1 = t0 + Duration::from_secs(61); assert_eq!(w.count(t1), 0); assert!(!w.cap_reached(t1)); } } ``` - [ ] **Step 2: Expose** Append: `pub mod retry_window; pub use retry_window::RetryWindow;` - [ ] **Step 3: Run tests** Run: `cargo test -p xy-supervisor` Expected: 16 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy-supervisor/ git commit -m "feat(supervisor): sliding retry-window tracker" ``` ### Task 12: Log file writer with rotation **Files:** - Create: `crates/xy-supervisor/src/logs.rs` - Modify: `crates/xy-supervisor/src/lib.rs` - [ ] **Step 1: Write `RotatingLogWriter` + tests** ```rust use std::fs::{File, OpenOptions}; use std::io::Write; use std::path::{Path, PathBuf}; pub struct RotatingLogWriter { base: PathBuf, max_bytes: u64, keep: usize, file: File, written: u64, } impl RotatingLogWriter { pub fn open(base: &Path, max_bytes: u64, keep: usize) -> std::io::Result { if let Some(parent) = base.parent() { std::fs::create_dir_all(parent)?; } let file = OpenOptions::new().create(true).append(true).open(base)?; let written = file.metadata()?.len(); Ok(Self { base: base.to_path_buf(), max_bytes, keep, file, written }) } pub fn write_line(&mut self, tag: &str, line: &str) -> std::io::Result<()> { let bytes = format!("{tag} {line}\n"); self.file.write_all(bytes.as_bytes())?; self.written += bytes.len() as u64; if self.written >= self.max_bytes { self.rotate()?; } Ok(()) } fn rotate(&mut self) -> std::io::Result<()> { // Drop the open file handle by replacing with /dev/null briefly. self.file = OpenOptions::new().read(true).open("/dev/null")?; for i in (1..self.keep).rev() { let src = self.gen_path(i); let dst = self.gen_path(i + 1); if src.exists() { let _ = std::fs::rename(&src, &dst); } } if self.base.exists() { let _ = std::fs::rename(&self.base, self.gen_path(1)); } self.file = OpenOptions::new().create(true).append(true).open(&self.base)?; self.written = 0; Ok(()) } fn gen_path(&self, n: usize) -> PathBuf { let mut s = self.base.as_os_str().to_os_string(); s.push(format!(".{n}")); PathBuf::from(s) } } #[cfg(test)] mod tests { use super::*; use tempfile::tempdir; use std::io::Read; #[test] fn writes_lines_with_tags() { let dir = tempdir().unwrap(); let base = dir.path().join("x.log"); let mut w = RotatingLogWriter::open(&base, 1024, 3).unwrap(); w.write_line("[out]", "hello").unwrap(); w.write_line("[err]", "boom").unwrap(); let mut s = String::new(); File::open(&base).unwrap().read_to_string(&mut s).unwrap(); assert_eq!(s, "[out] hello\n[err] boom\n"); } #[test] fn rotates_at_threshold() { let dir = tempdir().unwrap(); let base = dir.path().join("x.log"); let mut w = RotatingLogWriter::open(&base, 20, 3).unwrap(); for _ in 0..5 { w.write_line("[out]", "0123456789").unwrap(); } assert!(base.exists()); let rotated = dir.path().join("x.log.1"); assert!(rotated.exists(), "expected rotated file at {}", rotated.display()); } } ``` - [ ] **Step 2: Expose** Append: `pub mod logs; pub use logs::RotatingLogWriter;` - [ ] **Step 3: Run tests** Run: `cargo test -p xy-supervisor` Expected: 18 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy-supervisor/ git commit -m "feat(supervisor): rotating log writer" ``` ### Task 13: Ring buffer **Files:** - Modify: `crates/xy-supervisor/src/logs.rs` - Modify: `crates/xy-supervisor/src/lib.rs` - [ ] **Step 1: Append `RingBuffer` to `logs.rs`** ```rust use std::collections::VecDeque; use std::sync::{Arc, Mutex}; #[derive(Clone)] pub struct RingBuffer { inner: Arc>, capacity_bytes: usize, } struct RingBufferInner { lines: VecDeque, bytes: usize, } #[derive(Debug, Clone)] pub struct RecordedLine { pub stream: xy_protocol::rpc::LogStream, pub line: String, pub ts_unix_ms: u64, } impl RingBuffer { pub fn new(capacity_bytes: usize) -> Self { Self { inner: Arc::new(Mutex::new(RingBufferInner { lines: VecDeque::new(), bytes: 0 })), capacity_bytes, } } pub fn push(&self, line: RecordedLine) { let mut g = self.inner.lock().unwrap(); g.bytes += line.line.len(); g.lines.push_back(line); while g.bytes > self.capacity_bytes { if let Some(removed) = g.lines.pop_front() { g.bytes -= removed.line.len(); } else { break; } } } pub fn snapshot_tail(&self, n: Option) -> Vec { let g = self.inner.lock().unwrap(); match n { None => g.lines.iter().cloned().collect(), Some(n) => { let take = (n as usize).min(g.lines.len()); let start = g.lines.len() - take; g.lines.iter().skip(start).cloned().collect() } } } } ``` - [ ] **Step 2: Add tests inside the existing `mod tests`** ```rust use xy_protocol::rpc::LogStream; fn recorded(s: &str) -> RecordedLine { RecordedLine { stream: LogStream::Stdout, line: s.to_string(), ts_unix_ms: 0 } } #[test] fn ring_buffer_drops_oldest_when_full() { let rb = RingBuffer::new(10); rb.push(recorded("aaaaa")); rb.push(recorded("bbbbb")); rb.push(recorded("ccc")); let snap = rb.snapshot_tail(None); assert_eq!(snap.len(), 2); assert_eq!(snap[0].line, "bbbbb"); assert_eq!(snap[1].line, "ccc"); } #[test] fn ring_buffer_tail_n() { let rb = RingBuffer::new(1024); for i in 0..5 { rb.push(recorded(&format!("line{i}"))); } let snap = rb.snapshot_tail(Some(2)); assert_eq!(snap.len(), 2); assert_eq!(snap[0].line, "line3"); assert_eq!(snap[1].line, "line4"); } ``` - [ ] **Step 3: Expose** Append: `pub use logs::{RecordedLine, RingBuffer};` - [ ] **Step 4: Run tests** Run: `cargo test -p xy-supervisor` Expected: 20 passed. - [ ] **Step 5: Commit** ```bash git add crates/xy-supervisor/ git commit -m "feat(supervisor): ring buffer for recent log lines" ``` ### Task 14: `LogSink` — fan-out to file, ring buffer, broadcast **Files:** - Modify: `crates/xy-supervisor/src/logs.rs` - Modify: `crates/xy-supervisor/src/lib.rs` - [ ] **Step 1: Append `LogSink` to `logs.rs`** ```rust use tokio::sync::broadcast; use xy_protocol::rpc::{LogLine, LogStream}; const LOG_BROADCAST_CAP: usize = 256; #[derive(Clone)] pub struct LogSink { pub server_name: String, writer: Arc>, pub ring: RingBuffer, pub broadcast: broadcast::Sender, } impl LogSink { pub fn new(server_name: String, writer: RotatingLogWriter, ring_capacity_bytes: usize) -> Self { let (tx, _) = broadcast::channel(LOG_BROADCAST_CAP); Self { server_name, writer: Arc::new(Mutex::new(writer)), ring: RingBuffer::new(ring_capacity_bytes), broadcast: tx, } } pub fn record(&self, stream: LogStream, line: String) { let ts = now_unix_ms(); let tag = match stream { LogStream::Stdout => "[out]", LogStream::Stderr => "[err]" }; if let Err(e) = self.writer.lock().unwrap().write_line(tag, &line) { tracing::warn!(server = %self.server_name, error = %e, "log file write failed"); } self.ring.push(RecordedLine { stream, line: line.clone(), ts_unix_ms: ts }); let _ = self.broadcast.send(LogLine { subscription_id: 0, name: self.server_name.clone(), stream, line, ts_unix_ms: ts, }); } } fn now_unix_ms() -> u64 { std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_millis() as u64).unwrap_or(0) } ``` - [ ] **Step 2: Add a tokio test inside `mod tests`** ```rust #[tokio::test] async fn log_sink_records_and_broadcasts() { let dir = tempdir().unwrap(); let writer = RotatingLogWriter::open(&dir.path().join("s.log"), 1024, 3).unwrap(); let sink = LogSink::new("s".to_string(), writer, 1024); let mut rx = sink.broadcast.subscribe(); sink.record(LogStream::Stdout, "hello".to_string()); let got = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv()).await.unwrap().unwrap(); assert_eq!(got.line, "hello"); assert_eq!(got.stream, LogStream::Stdout); assert_eq!(sink.ring.snapshot_tail(None).len(), 1); } ``` - [ ] **Step 3: Expose** Append: `pub use logs::LogSink;` - [ ] **Step 4: Run tests** Run: `cargo test -p xy-supervisor` Expected: 21 passed. - [ ] **Step 5: Commit** ```bash git add crates/xy-supervisor/ git commit -m "feat(supervisor): LogSink fans out to file, ring buffer, broadcast" ``` ### Task 15: `RealChild` — real-process implementation of `ChildHandle` **Files:** - Modify: `crates/xy-supervisor/src/child.rs` - Modify: `crates/xy-supervisor/src/lib.rs` - [ ] **Step 1: Append `RealChild` and `spawn_with_logs` to `child.rs`** ```rust use crate::logs::LogSink; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; use std::os::unix::process::CommandExt; use std::process::Stdio; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child as TokioChild, Command}; use xy_protocol::{rpc::LogStream, ServerConfig}; pub struct RealChild { pid: u32, pgid: Pid, child: Option, } impl RealChild { pub fn pgid(&self) -> Pid { self.pgid } } #[async_trait::async_trait] impl ChildHandle for RealChild { fn pid(&self) -> u32 { self.pid } async fn wait(&mut self) -> std::io::Result> { let child = self.child.as_mut().ok_or_else(|| std::io::Error::other("already waited"))?; let status = child.wait().await?; Ok(status.code()) } fn terminate(&mut self) -> std::io::Result<()> { kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGTERM) .map_err(|e| std::io::Error::other(e.to_string())) } fn kill(&mut self) -> std::io::Result<()> { kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGKILL) .map_err(|e| std::io::Error::other(e.to_string())) } } pub fn spawn_with_logs(cfg: &ServerConfig, sink: LogSink) -> std::io::Result { let mut cmd = Command::new(&cfg.command); cmd.args(&cfg.args); for (k, v) in &cfg.env { cmd.env(k, v); } if let Some(dir) = &cfg.working_dir { cmd.current_dir(dir); } cmd.stdout(Stdio::piped()); cmd.stderr(Stdio::piped()); cmd.kill_on_drop(true); // Own process group so signals reach the whole tree. unsafe { cmd.pre_exec(|| { nix::unistd::setpgid(Pid::from_raw(0), Pid::from_raw(0)) .map_err(|e| std::io::Error::other(e.to_string())) }); } let mut child = cmd.spawn()?; let pid = child.id().ok_or_else(|| std::io::Error::other("no pid"))?; let pgid = Pid::from_raw(pid as i32); if let Some(out) = child.stdout.take() { spawn_pump(out, sink.clone(), LogStream::Stdout); } if let Some(err) = child.stderr.take() { spawn_pump(err, sink.clone(), LogStream::Stderr); } Ok(RealChild { pid, pgid, child: Some(child) }) } fn spawn_pump( reader: R, sink: LogSink, stream: LogStream, ) { tokio::spawn(async move { let mut lines = BufReader::new(reader).lines(); loop { match lines.next_line().await { Ok(Some(line)) => sink.record(stream, line), Ok(None) => break, Err(e) => { tracing::warn!(server = %sink.server_name, error = %e, ?stream, "log pump read error"); break; } } } }); } ``` - [ ] **Step 2: Expose** Append to `lib.rs`: `pub use child::{RealChild, spawn_with_logs};` - [ ] **Step 3: Build** Run: `cargo build -p xy-supervisor` Expected: compiles. Fix any nix API mismatches inline. - [ ] **Step 4: Commit** ```bash git add crates/xy-supervisor/ git commit -m "feat(supervisor): RealChild + spawn_with_logs" ``` ### Task 16: Supervisor task state machine **Files:** - Create: `crates/xy-supervisor/src/supervisor.rs` - Modify: `crates/xy-supervisor/src/lib.rs` - [ ] **Step 1: Define cmd types + handle** ```rust use crate::{ backoff::Backoff, child::ChildHandle, logs::LogSink, policy::{decide, RestartDecision}, retry_window::RetryWindow, }; use std::time::{Duration, Instant}; use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::sleep; use tracing::{debug, info, warn}; use xy_protocol::{ServerConfig, ServerState}; pub enum SupervisorCmd { Start { ack: oneshot::Sender }, Stop { ack: oneshot::Sender }, Restart { ack: oneshot::Sender<()> }, Reconfigure { new: ServerConfig, ack: oneshot::Sender<()> }, Shutdown { ack: oneshot::Sender<()> }, } #[derive(Debug, PartialEq, Eq)] pub enum StartAck { Started, AlreadyRunning } #[derive(Debug, PartialEq, Eq)] pub enum StopAck { Stopped, NotRunning } #[derive(Clone)] pub struct SupervisorHandle { pub name: String, pub tx: mpsc::Sender, pub state: watch::Receiver, pub log_sink: LogSink, } #[async_trait::async_trait] pub trait Spawner: Send + 'static { type Child: ChildHandle; async fn spawn(&self, cfg: &ServerConfig, sink: LogSink) -> std::io::Result; } ``` - [ ] **Step 2: Append the task + run loop** ```rust pub struct SupervisorTask { cfg: ServerConfig, log_sink: LogSink, spawner: S, state_tx: watch::Sender, cmd_rx: mpsc::Receiver, backoff: Backoff, retry_window: RetryWindow, restart_count: u32, last_exit: Option, started_at: Option, } impl SupervisorTask { pub fn new( cfg: ServerConfig, log_sink: LogSink, spawner: S, state_tx: watch::Sender, cmd_rx: mpsc::Receiver, ) -> Self { let backoff = Backoff::new(cfg.restart.backoff_initial, cfg.restart.backoff_max); let retry_window = RetryWindow::new(Duration::from_secs(60), cfg.restart.max_retries_per_minute); Self { cfg, log_sink, spawner, state_tx, cmd_rx, backoff, retry_window, restart_count: 0, last_exit: None, started_at: None } } fn set_state(&self, s: ServerState) { let _ = self.state_tx.send(s); } pub async fn run(mut self) { let mut child: Option = None; loop { tokio::select! { cmd = self.cmd_rx.recv() => { let Some(cmd) = cmd else { break; }; match cmd { SupervisorCmd::Start { ack } => { if child.is_some() { let _ = ack.send(StartAck::AlreadyRunning); } else { match self.do_start().await { Ok(c) => { child = Some(c); let _ = ack.send(StartAck::Started); } Err(e) => { warn!(name = %self.cfg.name, error = %e, "spawn failed"); self.set_state(ServerState::Failed); let _ = ack.send(StartAck::Started); } } } } SupervisorCmd::Stop { ack } => { if let Some(c) = child.take() { self.do_stop(c).await; let _ = ack.send(StopAck::Stopped); } else { let _ = ack.send(StopAck::NotRunning); } } SupervisorCmd::Restart { ack } => { if let Some(c) = child.take() { self.set_state(ServerState::Restarting); self.do_stop(c).await; } match self.do_start().await { Ok(c) => child = Some(c), Err(e) => { warn!(name = %self.cfg.name, error = %e, "restart spawn failed"); self.set_state(ServerState::Failed); } } let _ = ack.send(()); } SupervisorCmd::Reconfigure { new, ack } => { self.cfg = new; self.backoff = Backoff::new(self.cfg.restart.backoff_initial, self.cfg.restart.backoff_max); self.retry_window = RetryWindow::new(Duration::from_secs(60), self.cfg.restart.max_retries_per_minute); let _ = ack.send(()); } SupervisorCmd::Shutdown { ack } => { if let Some(c) = child.take() { self.do_stop(c).await; } let _ = ack.send(()); return; } } } code = wait_child(&mut child) => { self.last_exit = code; let now = Instant::now(); self.retry_window.record(now); let cap = self.retry_window.cap_reached(now); let decision = decide(self.cfg.restart.policy, code, cap); debug!(name = %self.cfg.name, ?code, ?decision, "child exited"); match decision { RestartDecision::StayStopped => { self.started_at = None; self.set_state(ServerState::Stopped); } RestartDecision::MarkFailed => { self.started_at = None; self.set_state(ServerState::Failed); } RestartDecision::Restart => { self.set_state(ServerState::Restarting); let delay = self.backoff.next(); sleep(delay).await; match self.do_start().await { Ok(c) => child = Some(c), Err(e) => { warn!(name = %self.cfg.name, error = %e, "restart spawn failed"); self.set_state(ServerState::Failed); } } } } } } } } async fn do_start(&mut self) -> std::io::Result { self.set_state(ServerState::Starting); let c = self.spawner.spawn(&self.cfg, self.log_sink.clone()).await?; self.restart_count = self.restart_count.saturating_add(1); self.started_at = Some(Instant::now()); self.backoff.reset(); self.set_state(ServerState::Running); info!(name = %self.cfg.name, pid = c.pid(), "started"); Ok(c) } async fn do_stop(&mut self, mut c: S::Child) { self.set_state(ServerState::Stopping); let _ = c.terminate(); let grace = self.cfg.stop.grace; match tokio::time::timeout(grace, c.wait()).await { Ok(_) => {} Err(_) => { let _ = c.kill(); let _ = c.wait().await; } } self.started_at = None; self.set_state(ServerState::Stopped); } } async fn wait_child(slot: &mut Option) -> Option { match slot.as_mut() { Some(c) => c.wait().await.ok().flatten(), None => std::future::pending().await, } } pub struct RealSpawner; #[async_trait::async_trait] impl Spawner for RealSpawner { type Child = crate::child::RealChild; async fn spawn(&self, cfg: &ServerConfig, sink: LogSink) -> std::io::Result { crate::child::spawn_with_logs(cfg, sink) } } ``` - [ ] **Step 3: Add a smoke test in supervisor.rs** ```rust #[cfg(test)] mod tests { use super::*; use crate::child::{MockChild, MockChildController}; use crate::logs::{LogSink, RotatingLogWriter}; use std::sync::{Arc, Mutex}; use tempfile::tempdir; use xy_protocol::{RestartConfig, RestartPolicy, StopConfig}; struct QueueSpawner { queue: Arc>>, } #[async_trait::async_trait] impl Spawner for QueueSpawner { type Child = MockChild; async fn spawn(&self, _cfg: &ServerConfig, _sink: LogSink) -> std::io::Result { let mut q = self.queue.lock().unwrap(); Ok(q.remove(0)) } } fn cfg(name: &str, policy: RestartPolicy, max_retries: u32) -> ServerConfig { ServerConfig { name: name.to_string(), command: "/bin/true".into(), args: vec![], port: 1, env: Default::default(), working_dir: None, restart: RestartConfig { policy, backoff_initial: Duration::from_millis(1), backoff_max: Duration::from_millis(1), max_retries_per_minute: max_retries, }, stop: StopConfig { grace: Duration::from_millis(50) }, } } fn sink(name: &str) -> LogSink { let dir = tempdir().unwrap(); let writer = RotatingLogWriter::open(&dir.path().join("s.log"), 1024, 3).unwrap(); std::mem::forget(dir); LogSink::new(name.to_string(), writer, 1024) } async fn wait_for(rx: &mut watch::Receiver, want: ServerState) { let deadline = tokio::time::Instant::now() + Duration::from_secs(2); loop { if *rx.borrow() == want { return; } tokio::select! { _ = rx.changed() => {} _ = tokio::time::sleep_until(deadline) => panic!("never reached {want:?}, last={:?}", *rx.borrow()), } } } #[tokio::test] async fn start_runs_to_running_and_stop_to_stopped() { let (mock, mut ctl) = MockChild::new(1); let queue = Arc::new(Mutex::new(vec![mock])); let spawner = QueueSpawner { queue }; let (state_tx, mut state_rx) = watch::channel(ServerState::Stopped); let (cmd_tx, cmd_rx) = mpsc::channel(8); let task = SupervisorTask::new(cfg("x", RestartPolicy::Never, 5), sink("x"), spawner, state_tx, cmd_rx); let h = tokio::spawn(task.run()); let (ack_tx, ack_rx) = oneshot::channel(); cmd_tx.send(SupervisorCmd::Start { ack: ack_tx }).await.unwrap(); assert_eq!(ack_rx.await.unwrap(), StartAck::Started); wait_for(&mut state_rx, ServerState::Running).await; // Trigger child exit via the controller. ctl.exit_tx.take().unwrap().send(Some(0)).unwrap(); wait_for(&mut state_rx, ServerState::Stopped).await; let (ack_tx, ack_rx) = oneshot::channel(); cmd_tx.send(SupervisorCmd::Shutdown { ack: ack_tx }).await.unwrap(); ack_rx.await.unwrap(); h.await.unwrap(); } } ``` - [ ] **Step 4: Update `lib.rs`** ```rust pub mod supervisor; pub use supervisor::{ RealSpawner, Spawner, StartAck, StopAck, SupervisorCmd, SupervisorHandle, SupervisorTask, }; ``` - [ ] **Step 5: Run tests** Run: `cargo test -p xy-supervisor` Expected: 22 passed. - [ ] **Step 6: Commit** ```bash git add crates/xy-supervisor/ git commit -m "feat(supervisor): supervisor task with state machine" ``` --- ## Phase 4 — `xy-ipc` ### Task 17: JSON-RPC envelope types **Files:** - Create: `crates/xy-ipc/src/envelope.rs` - Modify: `crates/xy-ipc/src/lib.rs` - [ ] **Step 1: Define envelope types** ```rust use serde::{Deserialize, Serialize}; use serde_json::Value; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Request { pub jsonrpc: String, pub id: serde_json::Value, pub method: String, #[serde(default, skip_serializing_if = "Option::is_none")] pub params: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Notification { pub jsonrpc: String, pub method: String, #[serde(default, skip_serializing_if = "Option::is_none")] pub params: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Response { pub jsonrpc: String, pub id: serde_json::Value, #[serde(default, skip_serializing_if = "Option::is_none")] pub result: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub error: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RpcError { pub code: i32, pub message: String, #[serde(default, skip_serializing_if = "Option::is_none")] pub data: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] pub enum Incoming { Request(Request), Response(Response), Notification(Notification), } pub const JSONRPC_VERSION: &str = "2.0"; pub fn request(id: u64, method: &str, params: Option) -> Request { Request { jsonrpc: JSONRPC_VERSION.into(), id: serde_json::json!(id), method: method.into(), params } } pub fn notification(method: &str, params: Option) -> Notification { Notification { jsonrpc: JSONRPC_VERSION.into(), method: method.into(), params } } pub fn ok_response(id: serde_json::Value, result: Value) -> Response { Response { jsonrpc: JSONRPC_VERSION.into(), id, result: Some(result), error: None } } pub fn err_response(id: serde_json::Value, code: i32, message: String) -> Response { Response { jsonrpc: JSONRPC_VERSION.into(), id, result: None, error: Some(RpcError { code, message, data: None }) } } #[cfg(test)] mod tests { use super::*; #[test] fn request_round_trip() { let r = request(7, "list", None); let s = serde_json::to_string(&r).unwrap(); let back: Request = serde_json::from_str(&s).unwrap(); assert_eq!(back.method, "list"); assert_eq!(back.id, serde_json::json!(7)); } #[test] fn incoming_is_response() { let s = r#"{"jsonrpc":"2.0","id":1,"result":{"ok":true}}"#; let i: Incoming = serde_json::from_str(s).unwrap(); assert!(matches!(i, Incoming::Response(_))); } #[test] fn incoming_is_notification() { let s = r#"{"jsonrpc":"2.0","method":"log","params":{}}"#; let i: Incoming = serde_json::from_str(s).unwrap(); assert!(matches!(i, Incoming::Notification(_))); } } ``` - [ ] **Step 2: Expose** ```rust //! JSON-RPC 2.0 over newline-delimited JSON on a Unix socket. pub mod envelope; pub use envelope::{ err_response, notification, ok_response, request, Incoming, Notification, Request, Response, RpcError, }; ``` - [ ] **Step 3: Run tests** Run: `cargo test -p xy-ipc` Expected: 3 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy-ipc/ git commit -m "feat(ipc): JSON-RPC envelope types" ``` ### Task 18: Newline-delimited framing helpers **Files:** - Create: `crates/xy-ipc/src/framing.rs` - Modify: `crates/xy-ipc/src/lib.rs` - [ ] **Step 1: Implement `JsonFramed`** ```rust use serde::Serialize; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixStream; pub struct JsonFramed { reader: BufReader, writer: tokio::net::unix::OwnedWriteHalf, } impl JsonFramed { pub fn new(stream: UnixStream) -> Self { let (r, w) = stream.into_split(); Self { reader: BufReader::new(r), writer: w } } pub async fn read(&mut self) -> std::io::Result> { let mut buf = String::new(); let n = self.reader.read_line(&mut buf).await?; if n == 0 { return Ok(None); } let v: T = serde_json::from_str(buf.trim_end()) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; Ok(Some(v)) } pub async fn write(&mut self, value: &T) -> std::io::Result<()> { let mut bytes = serde_json::to_vec(value) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; bytes.push(b'\n'); self.writer.write_all(&bytes).await?; self.writer.flush().await } } #[cfg(test)] mod tests { use super::*; use serde::Deserialize; #[derive(Debug, Serialize, Deserialize, PartialEq)] struct M { x: u32, name: String } #[tokio::test] async fn round_trip_over_socket_pair() { let (a, b) = UnixStream::pair().unwrap(); let mut sa = JsonFramed::new(a); let mut sb = JsonFramed::new(b); sa.write(&M { x: 1, name: "hi".into() }).await.unwrap(); let got: Option = sb.read().await.unwrap(); assert_eq!(got, Some(M { x: 1, name: "hi".into() })); } #[tokio::test] async fn eof_returns_none() { let (a, b) = UnixStream::pair().unwrap(); drop(a); let mut sb = JsonFramed::new(b); let got: Option = sb.read().await.unwrap(); assert!(got.is_none()); } } ``` - [ ] **Step 2: Expose** Append: `pub mod framing; pub use framing::JsonFramed;` - [ ] **Step 3: Run tests** Run: `cargo test -p xy-ipc` Expected: 5 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy-ipc/ git commit -m "feat(ipc): newline-delimited JSON framing" ``` ### Task 19: Client helper (call + subscribe) **Files:** - Create: `crates/xy-ipc/src/client.rs` - Modify: `crates/xy-ipc/src/lib.rs` - [ ] **Step 1: Implement client** ```rust use crate::envelope::{Incoming, Notification, Request}; use crate::framing::JsonFramed; use serde::de::DeserializeOwned; use serde::Serialize; use std::path::Path; use thiserror::Error; use tokio::net::UnixStream; #[derive(Debug, Error)] pub enum ClientError { #[error("io: {0}")] Io(#[from] std::io::Error), #[error("rpc error {code}: {message}")] Rpc { code: i32, message: String }, #[error("unexpected message kind from daemon")] Unexpected, #[error("daemon unreachable: {0}")] Unreachable(std::io::Error), #[error("serialization: {0}")] Serde(#[from] serde_json::Error), } pub struct Client { framed: JsonFramed, next_id: u64, } impl Client { pub async fn connect(socket_path: &Path) -> Result { let stream = UnixStream::connect(socket_path).await.map_err(ClientError::Unreachable)?; Ok(Self { framed: JsonFramed::new(stream), next_id: 1 }) } pub async fn call(&mut self, method: &str, params: &P) -> Result { let id = self.next_id; self.next_id += 1; let params_val = serde_json::to_value(params)?; let req = crate::envelope::request(id, method, Some(params_val)); self.framed.write(&req).await?; loop { let msg: Option = self.framed.read().await?; let Some(msg) = msg else { return Err(ClientError::Unreachable(std::io::Error::from(std::io::ErrorKind::UnexpectedEof))); }; match msg { Incoming::Response(r) => { if r.id != serde_json::json!(id) { return Err(ClientError::Unexpected); } if let Some(err) = r.error { return Err(ClientError::Rpc { code: err.code, message: err.message }); } let result = r.result.unwrap_or(serde_json::Value::Null); return Ok(serde_json::from_value(result)?); } Incoming::Notification(_) => continue, Incoming::Request(_) => return Err(ClientError::Unexpected), } } } pub async fn call_no_params(&mut self, method: &str) -> Result { let id = self.next_id; self.next_id += 1; let req = Request { jsonrpc: "2.0".into(), id: serde_json::json!(id), method: method.into(), params: None }; self.framed.write(&req).await?; let msg: Option = self.framed.read().await?; let Some(Incoming::Response(r)) = msg else { return Err(ClientError::Unexpected); }; if let Some(err) = r.error { return Err(ClientError::Rpc { code: err.code, message: err.message }); } Ok(serde_json::from_value(r.result.unwrap_or(serde_json::Value::Null))?) } pub async fn read_notification(&mut self) -> Result, ClientError> { loop { let msg: Option = self.framed.read().await?; match msg { None => return Ok(None), Some(Incoming::Notification(n)) => return Ok(Some(n)), Some(Incoming::Response(_)) => continue, Some(Incoming::Request(_)) => return Err(ClientError::Unexpected), } } } pub async fn send_notification(&mut self, n: &Notification) -> Result<(), ClientError> { self.framed.write(n).await?; Ok(()) } } ``` - [ ] **Step 2: Expose** Append: `pub mod client; pub use client::{Client, ClientError};` - [ ] **Step 3: Build** Run: `cargo build -p xy-ipc` Expected: compiles. - [ ] **Step 4: Commit** ```bash git add crates/xy-ipc/ git commit -m "feat(ipc): client with call + notification reader" ``` ### Task 20: Server helper (bind + Connection) **Files:** - Create: `crates/xy-ipc/src/server.rs` - Modify: `crates/xy-ipc/src/lib.rs` - [ ] **Step 1: Implement server primitives** ```rust use crate::envelope::{Incoming, Notification, Response}; use crate::framing::JsonFramed; use std::path::Path; use std::sync::Arc; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::Mutex; pub struct Connection { inner: Arc>, } impl Connection { pub fn new(stream: UnixStream) -> Self { Self { inner: Arc::new(Mutex::new(JsonFramed::new(stream))) } } pub async fn read_incoming(&self) -> std::io::Result> { let mut g = self.inner.lock().await; g.read::().await } pub async fn write_response(&self, r: &Response) -> std::io::Result<()> { let mut g = self.inner.lock().await; g.write(r).await } pub async fn write_notification(&self, n: &Notification) -> std::io::Result<()> { let mut g = self.inner.lock().await; g.write(n).await } } pub fn bind(socket_path: &Path) -> std::io::Result { if socket_path.exists() { std::fs::remove_file(socket_path)?; } if let Some(parent) = socket_path.parent() { std::fs::create_dir_all(parent)?; } let listener = UnixListener::bind(socket_path)?; use std::os::unix::fs::PermissionsExt; std::fs::set_permissions(socket_path, std::fs::Permissions::from_mode(0o600))?; Ok(listener) } #[cfg(test)] mod tests { use super::*; use tempfile::tempdir; #[tokio::test] async fn bind_creates_socket_with_0600() { let dir = tempdir().unwrap(); let path = dir.path().join("x.sock"); let _listener = bind(&path).unwrap(); use std::os::unix::fs::PermissionsExt; let mode = std::fs::metadata(&path).unwrap().permissions().mode() & 0o777; assert_eq!(mode, 0o600); } } ``` - [ ] **Step 2: Expose** Append: `pub mod server; pub use server::{bind, Connection};` - [ ] **Step 3: Run tests** Run: `cargo test -p xy-ipc` Expected: 6 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy-ipc/ git commit -m "feat(ipc): server bind + Connection wrapper" ``` --- ## Phase 5 — `xy` binary ### Task 21: XDG path resolution **Files:** - Create: `crates/xy/src/paths.rs` - Modify: `crates/xy/src/main.rs` - [ ] **Step 1: Implement** ```rust use etcetera::base_strategy::{BaseStrategy, Xdg}; use std::path::PathBuf; #[derive(Debug, Clone)] pub struct Paths { pub config_dir: PathBuf, pub state_dir: PathBuf, pub log_dir: PathBuf, pub socket: PathBuf, pub pidfile: PathBuf, } impl Paths { pub fn resolve() -> std::io::Result { let xdg = Xdg::new().map_err(std::io::Error::other)?; let config_dir = xdg.config_dir().join("xy").join("servers"); let state_dir = xdg.state_dir().unwrap_or_else(|| xdg.data_dir()).join("xy"); let log_dir = state_dir.join("logs"); let socket = std::env::var_os("XDG_RUNTIME_DIR") .map(|p| PathBuf::from(p).join("xy.sock")) .unwrap_or_else(|| state_dir.join("xy.sock")); let pidfile = state_dir.join("xy.pid"); Ok(Self { config_dir, state_dir, log_dir, socket, pidfile }) } pub fn ensure_dirs(&self) -> std::io::Result<()> { std::fs::create_dir_all(&self.state_dir)?; std::fs::create_dir_all(&self.log_dir)?; Ok(()) } } #[cfg(test)] mod tests { use super::*; #[test] fn resolves_without_panicking() { let p = Paths::resolve().unwrap(); assert!(p.pidfile.starts_with(&p.state_dir)); } } ``` - [ ] **Step 2: Reference from `main.rs`** ```rust mod paths; fn main() { let p = paths::Paths::resolve().unwrap(); eprintln!("xy: socket would be at {}", p.socket.display()); } ``` - [ ] **Step 3: Run tests** Run: `cargo test -p xy` Expected: 1 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy/ git commit -m "feat(xy): XDG path resolution" ``` ### Task 22: Pidfile guard **Files:** - Create: `crates/xy/src/pidfile.rs` - Modify: `crates/xy/src/main.rs` - [ ] **Step 1: Implement** ```rust use std::fs::{File, OpenOptions}; use std::io::Write; use std::os::unix::fs::OpenOptionsExt; use std::path::{Path, PathBuf}; pub struct PidFile { path: PathBuf, _file: File, } impl PidFile { pub fn acquire(path: &Path) -> std::io::Result { let mut f = OpenOptions::new() .write(true).create_new(true).mode(0o600).open(path)?; writeln!(f, "{}", std::process::id())?; Ok(Self { path: path.to_path_buf(), _file: f }) } } impl Drop for PidFile { fn drop(&mut self) { let _ = std::fs::remove_file(&self.path); } } #[cfg(test)] mod tests { use super::*; use tempfile::tempdir; #[test] fn acquire_then_second_fails() { let dir = tempdir().unwrap(); let p = dir.path().join("x.pid"); let _g = PidFile::acquire(&p).unwrap(); let err = PidFile::acquire(&p).unwrap_err(); assert_eq!(err.kind(), std::io::ErrorKind::AlreadyExists); } #[test] fn drop_removes_file() { let dir = tempdir().unwrap(); let p = dir.path().join("x.pid"); { let _g = PidFile::acquire(&p).unwrap(); assert!(p.exists()); } assert!(!p.exists()); } } ``` - [ ] **Step 2: Reference from `main.rs`** ```rust mod paths; mod pidfile; fn main() { let p = paths::Paths::resolve().unwrap(); eprintln!("xy: socket would be at {}", p.socket.display()); } ``` - [ ] **Step 3: Run tests** Run: `cargo test -p xy` Expected: 3 passed. - [ ] **Step 4: Commit** ```bash git add crates/xy/ git commit -m "feat(xy): exclusive pidfile guard" ``` ### Task 23: clap CLI definitions **Files:** - Modify: `crates/xy/src/main.rs` - Create: `crates/xy/src/cli/mod.rs` (stub) - Create: `crates/xy/src/daemon/mod.rs` (stub) - [ ] **Step 1: Replace `main.rs`** ```rust use clap::{Parser, Subcommand}; mod cli; mod daemon; mod paths; mod pidfile; #[derive(Debug, Parser)] #[command(name = "xy", version, about = "HTTP MCP server supervisor")] struct Cli { #[command(subcommand)] cmd: Cmd, } #[derive(Debug, Subcommand)] enum Cmd { /// Run the daemon in the foreground. Daemon, /// List all configured servers with state. List, /// Show detailed status for a single server. Status { name: String }, /// Start a server (or all configured servers with --all). Start { #[arg(long, conflicts_with = "name")] all: bool, #[arg(required_unless_present = "all")] name: Option, }, /// Stop a server (or --all). Stop { #[arg(long, conflicts_with = "name")] all: bool, #[arg(required_unless_present = "all")] name: Option, }, /// Restart a server (or --all). Restart { #[arg(long, conflicts_with = "name")] all: bool, #[arg(required_unless_present = "all")] name: Option, }, /// Re-read config dir and reconcile running servers. Reload, /// Stream a server's log. Logs { name: String, #[arg(long)] tail: Option, #[arg(short = 'f', long)] follow: bool, }, } #[tokio::main] async fn main() -> std::process::ExitCode { tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))) .with_writer(std::io::stderr) .init(); let cli = Cli::parse(); let paths = match paths::Paths::resolve() { Ok(p) => p, Err(e) => { eprintln!("xy: failed to resolve XDG paths: {e}"); return std::process::ExitCode::from(3); } }; let result: anyhow::Result = match cli.cmd { Cmd::Daemon => daemon::run(paths).await.map(|_| 0), Cmd::List => cli::list(paths).await, Cmd::Status { name } => cli::status(paths, name).await, Cmd::Start { all, name } => cli::start(paths, all, name).await, Cmd::Stop { all, name } => cli::stop(paths, all, name).await, Cmd::Restart { all, name } => cli::restart(paths, all, name).await, Cmd::Reload => cli::reload(paths).await, Cmd::Logs { name, tail, follow } => cli::logs(paths, name, tail, follow).await, }; match result { Ok(code) => std::process::ExitCode::from(code as u8), Err(e) => { eprintln!("xy: {e:#}"); std::process::ExitCode::from(1) } } } ``` - [ ] **Step 2: Stub `cli/mod.rs`** ```rust use crate::paths::Paths; use anyhow::{bail, Result}; pub async fn list(_p: Paths) -> Result { bail!("not implemented") } pub async fn status(_p: Paths, _name: String) -> Result { bail!("not implemented") } pub async fn start(_p: Paths, _all: bool, _name: Option) -> Result { bail!("not implemented") } pub async fn stop(_p: Paths, _all: bool, _name: Option) -> Result { bail!("not implemented") } pub async fn restart(_p: Paths, _all: bool, _name: Option) -> Result { bail!("not implemented") } pub async fn reload(_p: Paths) -> Result { bail!("not implemented") } pub async fn logs(_p: Paths, _name: String, _tail: Option, _follow: bool) -> Result { bail!("not implemented") } ``` - [ ] **Step 3: Stub `daemon/mod.rs`** ```rust use crate::paths::Paths; use anyhow::{bail, Result}; pub async fn run(_paths: Paths) -> Result<()> { bail!("not implemented") } ``` - [ ] **Step 4: Build + verify --help** Run: `cargo build -p xy` Run: `cargo run -p xy -- --help` Expected: prints help with all subcommands. - [ ] **Step 5: Commit** ```bash git add crates/xy/ git commit -m "feat(xy): clap CLI scaffold" ``` ### Task 24: Daemon `Registry` with config-hash entry **Files:** - Create: `crates/xy/src/daemon/registry.rs` - Modify: `crates/xy/src/daemon/mod.rs` - [ ] **Step 1: Implement registry with `Entry { handle, config_hash }`** ```rust use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; use xy_supervisor::SupervisorHandle; #[derive(Clone)] pub struct Entry { pub handle: SupervisorHandle, pub config_hash: u64, } #[derive(Clone, Default)] pub struct Registry { inner: Arc>>, } impl Registry { pub fn new() -> Self { Self::default() } pub async fn insert(&self, name: String, entry: Entry) { self.inner.write().await.insert(name, entry); } pub async fn remove(&self, name: &str) -> Option { self.inner.write().await.remove(name) } pub async fn get(&self, name: &str) -> Option { self.inner.read().await.get(name).cloned() } pub async fn names(&self) -> Vec { let g = self.inner.read().await; let mut v: Vec = g.keys().cloned().collect(); v.sort(); v } pub async fn snapshot(&self) -> Vec<(String, Entry)> { let g = self.inner.read().await; let mut v: Vec<_> = g.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); v.sort_by(|a, b| a.0.cmp(&b.0)); v } } ``` - [ ] **Step 2: Add `pub mod registry;` to `daemon/mod.rs`** ```rust use crate::paths::Paths; use anyhow::{bail, Result}; pub mod registry; pub async fn run(_paths: Paths) -> Result<()> { bail!("not implemented") } ``` - [ ] **Step 3: Build** Run: `cargo build -p xy` Expected: compiles. - [ ] **Step 4: Commit** ```bash git add crates/xy/ git commit -m "feat(xy): daemon Registry with config-hash entries" ``` ### Task 25: Daemon entry point — boot + accept + shutdown **Files:** - Modify: `crates/xy/src/daemon/mod.rs` - Create: `crates/xy/src/daemon/shutdown.rs` - Create: `crates/xy/src/daemon/handlers.rs` (stub) - [ ] **Step 1: Implement `daemon/mod.rs`** ```rust use crate::paths::Paths; use crate::pidfile::PidFile; use anyhow::{Context, Result}; use std::sync::{Arc, OnceLock}; use tokio::sync::{mpsc, oneshot, watch}; use tracing::{error, info}; use xy_ipc::{bind, Connection}; use xy_protocol::{kdl_parse::load_all_configs, ServerConfig, ServerState}; use xy_supervisor::{ logs::{LogSink, RotatingLogWriter}, supervisor::{RealSpawner, SupervisorCmd, SupervisorHandle, SupervisorTask}, }; pub mod handlers; pub mod registry; pub mod shutdown; const LOG_FILE_MAX_BYTES: u64 = 10 * 1024 * 1024; const LOG_FILE_KEEP: usize = 5; const RING_BUFFER_BYTES: usize = 1024 * 1024; pub static PATHS: OnceLock = OnceLock::new(); pub fn config_hash(cfg: &ServerConfig) -> u64 { use std::hash::{Hash, Hasher}; let mut h = std::collections::hash_map::DefaultHasher::new(); serde_json::to_string(cfg).unwrap().hash(&mut h); h.finish() } pub fn spawn_supervisor(paths: &Paths, cfg: ServerConfig) -> Result { let log_path = paths.log_dir.join(format!("{}.log", cfg.name)); let writer = RotatingLogWriter::open(&log_path, LOG_FILE_MAX_BYTES, LOG_FILE_KEEP) .with_context(|| format!("open log file {}", log_path.display()))?; let sink = LogSink::new(cfg.name.clone(), writer, RING_BUFFER_BYTES); let (state_tx, state_rx) = watch::channel(ServerState::Stopped); let (cmd_tx, cmd_rx) = mpsc::channel(16); let name = cfg.name.clone(); let task = SupervisorTask::new(cfg, sink.clone(), RealSpawner, state_tx, cmd_rx); tokio::spawn(task.run()); Ok(SupervisorHandle { name, tx: cmd_tx, state: state_rx, log_sink: sink }) } pub async fn run(paths: Paths) -> Result<()> { paths.ensure_dirs().context("create state dirs")?; let _pid = PidFile::acquire(&paths.pidfile) .context("another xy daemon appears to be running")?; let listener = bind(&paths.socket).context("bind unix socket")?; let _ = PATHS.set(paths.clone()); info!(socket = %paths.socket.display(), "daemon listening"); let configs = load_all_configs(&paths.config_dir).context("load configs")?; let registry = registry::Registry::new(); for cfg in configs { let hash = config_hash(&cfg); let handle = spawn_supervisor(&paths, cfg)?; let name = handle.name.clone(); registry.insert(name.clone(), registry::Entry { handle: handle.clone(), config_hash: hash }).await; let (ack_tx, ack_rx) = oneshot::channel(); if handle.tx.send(SupervisorCmd::Start { ack: ack_tx }).await.is_ok() { let _ = ack_rx.await; } } let registry_for_shutdown = registry.clone(); let shutdown_signal = shutdown::install(); let accept = async { loop { let (stream, _addr) = match listener.accept().await { Ok(p) => p, Err(e) => { error!(error = %e, "accept failed"); continue; } }; let conn = Arc::new(Connection::new(stream)); let reg = registry.clone(); let paths_clone = paths.clone(); tokio::spawn(async move { if let Err(e) = handlers::serve(conn, reg, paths_clone).await { error!(error = %e, "connection ended with error"); } }); } }; tokio::select! { _ = accept => {} _ = shutdown_signal => { info!("shutdown signal received"); } } shutdown::shutdown_all(registry_for_shutdown).await; Ok(()) } ``` - [ ] **Step 2: Create `shutdown.rs`** ```rust use crate::daemon::registry::Registry; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::oneshot; use xy_supervisor::supervisor::SupervisorCmd; pub fn install() -> impl std::future::Future { let mut term = signal(SignalKind::terminate()).expect("install SIGTERM handler"); let mut int = signal(SignalKind::interrupt()).expect("install SIGINT handler"); async move { tokio::select! { _ = term.recv() => {} _ = int.recv() => {} } } } pub async fn shutdown_all(reg: Registry) { let snapshot = reg.snapshot().await; let mut acks = Vec::new(); for (_name, entry) in &snapshot { let (tx, rx) = oneshot::channel(); if entry.handle.tx.send(SupervisorCmd::Shutdown { ack: tx }).await.is_ok() { acks.push(rx); } } let deadline = tokio::time::Duration::from_secs(30); let _ = tokio::time::timeout(deadline, async { for rx in acks { let _ = rx.await; } }).await; } ``` - [ ] **Step 3: Stub `handlers.rs`** ```rust use crate::daemon::registry::Registry; use crate::paths::Paths; use std::sync::Arc; use xy_ipc::Connection; pub async fn serve(_conn: Arc, _reg: Registry, _paths: Paths) -> std::io::Result<()> { Ok(()) } ``` - [ ] **Step 4: Build** Run: `cargo build -p xy` Expected: compiles. - [ ] **Step 5: Commit** ```bash git add crates/xy/ git commit -m "feat(xy): daemon boot + accept loop + graceful shutdown" ``` ### Task 26: RPC handlers — list/status/start/stop/restart **Files:** - Modify: `crates/xy/src/daemon/handlers.rs` - [ ] **Step 1: Replace stub with full dispatch** ```rust use crate::daemon::registry::Registry; use crate::paths::Paths; use std::sync::Arc; use tokio::sync::oneshot; use xy_ipc::envelope::{err_response, ok_response, Incoming, Request, Response}; use xy_ipc::Connection; use xy_protocol::rpc::{ methods, NameOrAll, RestartResult, ServerSummary, StartResult, StatusDetail, StopResult, }; use xy_protocol::RpcErrorCode; use xy_supervisor::supervisor::{StartAck, StopAck, SupervisorCmd}; pub async fn serve(conn: Arc, reg: Registry, _paths: Paths) -> std::io::Result<()> { loop { let Some(incoming) = conn.read_incoming().await? else { return Ok(()); }; match incoming { Incoming::Request(req) => { let resp = handle_request(req, ®).await; conn.write_response(&resp).await?; } _ => continue, } } } struct ApiError { code: i32, message: String } impl ApiError { fn rpc(c: RpcErrorCode, msg: impl Into) -> Self { Self { code: c.as_i32(), message: msg.into() } } } async fn handle_request(req: Request, reg: &Registry) -> Response { let id = req.id.clone(); let method = req.method.as_str(); let params = req.params.unwrap_or(serde_json::Value::Null); match method { methods::LIST => match list(reg).await { Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()), Err(e) => err_response(id, e.code, e.message), }, methods::STATUS => { let p: xy_protocol::rpc::StatusParams = match serde_json::from_value(params) { Ok(p) => p, Err(e) => return err_response(id, -32602, format!("invalid params: {e}")), }; match status(reg, &p.name).await { Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()), Err(e) => err_response(id, e.code, e.message), } } methods::START => dispatch_lifecycle(id, params, reg, Op::Start).await, methods::STOP => dispatch_lifecycle(id, params, reg, Op::Stop).await, methods::RESTART => dispatch_lifecycle(id, params, reg, Op::Restart).await, methods::RELOAD => err_response(id, -32601, "reload not yet implemented".into()), methods::LOGS => err_response(id, -32601, "logs not yet implemented".into()), methods::LOGS_CANCEL => err_response(id, -32601, "logs_cancel not yet implemented".into()), other => err_response(id, -32601, format!("unknown method `{other}`")), } } async fn list(reg: &Registry) -> Result, ApiError> { let mut out = Vec::new(); for (name, entry) in reg.snapshot().await { out.push(ServerSummary { name, state: *entry.handle.state.borrow(), pid: None, port: 0, uptime_secs: None, restart_count: 0, last_exit: None, }); } Ok(out) } async fn status(reg: &Registry, name: &str) -> Result { let Some(entry) = reg.get(name).await else { return Err(ApiError::rpc(RpcErrorCode::ServerNotFound, format!("no such server `{name}`"))); }; Ok(StatusDetail { summary: ServerSummary { name: entry.handle.name.clone(), state: *entry.handle.state.borrow(), pid: None, port: 0, uptime_secs: None, restart_count: 0, last_exit: None, }, recent_transitions: Vec::new(), }) } enum Op { Start, Stop, Restart } async fn dispatch_lifecycle( id: serde_json::Value, params: serde_json::Value, reg: &Registry, op: Op, ) -> Response { let p: NameOrAll = match serde_json::from_value(params) { Ok(p) => p, Err(e) => return err_response(id, -32602, format!("invalid params: {e}")), }; let targets: Vec = match p { NameOrAll::All { all } if all => reg.names().await, NameOrAll::Name { name } => vec![name], NameOrAll::All { .. } => return err_response(id, -32602, "must set all=true".into()), }; match op { Op::Start => { let mut started = Vec::new(); let mut already = Vec::new(); for name in targets { let Some(entry) = reg.get(&name).await else { return err_response(id, RpcErrorCode::ServerNotFound.as_i32(), format!("no such server `{name}`")); }; let (tx, rx) = oneshot::channel(); let _ = entry.handle.tx.send(SupervisorCmd::Start { ack: tx }).await; match rx.await { Ok(StartAck::Started) => started.push(name), Ok(StartAck::AlreadyRunning) => already.push(name), Err(_) => return err_response(id, RpcErrorCode::SpawnFailed.as_i32(), format!("supervisor for `{name}` dropped")), } } ok_response(id, serde_json::to_value(StartResult { started, already_running: already }).unwrap()) } Op::Stop => { let mut stopped = Vec::new(); let mut not_running = Vec::new(); for name in targets { let Some(entry) = reg.get(&name).await else { return err_response(id, RpcErrorCode::ServerNotFound.as_i32(), format!("no such server `{name}`")); }; let (tx, rx) = oneshot::channel(); let _ = entry.handle.tx.send(SupervisorCmd::Stop { ack: tx }).await; match rx.await { Ok(StopAck::Stopped) => stopped.push(name), Ok(StopAck::NotRunning) => not_running.push(name), Err(_) => return err_response(id, RpcErrorCode::SpawnFailed.as_i32(), format!("supervisor for `{name}` dropped")), } } ok_response(id, serde_json::to_value(StopResult { stopped, not_running }).unwrap()) } Op::Restart => { let mut restarted = Vec::new(); for name in targets { let Some(entry) = reg.get(&name).await else { return err_response(id, RpcErrorCode::ServerNotFound.as_i32(), format!("no such server `{name}`")); }; let (tx, rx) = oneshot::channel(); let _ = entry.handle.tx.send(SupervisorCmd::Restart { ack: tx }).await; let _ = rx.await; restarted.push(name); } ok_response(id, serde_json::to_value(RestartResult { restarted }).unwrap()) } } } ``` - [ ] **Step 2: Build** Run: `cargo build -p xy` Expected: compiles. - [ ] **Step 3: Commit** ```bash git add crates/xy/ git commit -m "feat(xy): RPC handlers for list/status/start/stop/restart" ``` ### Task 27: RPC handler — reload **Files:** - Modify: `crates/xy/src/daemon/handlers.rs` - [ ] **Step 1: Replace RELOAD arm and add `reload`** In the `match method` block, change: ```rust methods::RELOAD => match reload(reg).await { Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()), Err(e) => err_response(id, e.code, e.message), }, ``` Append at end of file: ```rust use xy_protocol::rpc::ReloadResult; async fn reload(reg: &Registry) -> Result { let paths = crate::daemon::PATHS.get() .ok_or_else(|| ApiError::rpc(RpcErrorCode::ConfigInvalid, "daemon paths not initialized"))?; let new_configs = xy_protocol::kdl_parse::load_all_configs(&paths.config_dir) .map_err(|e| ApiError::rpc(RpcErrorCode::ConfigInvalid, e.to_string()))?; use std::collections::HashMap; let new_by_name: HashMap = new_configs.into_iter().map(|c| (c.name.clone(), c)).collect(); let existing_names: Vec = reg.names().await; let mut added = Vec::new(); let mut removed = Vec::new(); let mut changed = Vec::new(); let mut unchanged = Vec::new(); for name in &existing_names { if !new_by_name.contains_key(name) { if let Some(entry) = reg.remove(name).await { let (tx, rx) = oneshot::channel(); let _ = entry.handle.tx.send(SupervisorCmd::Shutdown { ack: tx }).await; let _ = rx.await; removed.push(name.clone()); } } } for (name, cfg) in new_by_name { let new_hash = crate::daemon::config_hash(&cfg); match reg.get(&name).await { None => { let handle = crate::daemon::spawn_supervisor(paths, cfg) .map_err(|e| ApiError::rpc(RpcErrorCode::SpawnFailed, e.to_string()))?; reg.insert(name.clone(), crate::daemon::registry::Entry { handle: handle.clone(), config_hash: new_hash, }).await; let (tx, rx) = oneshot::channel(); let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await; let _ = rx.await; added.push(name); } Some(entry) if entry.config_hash != new_hash => { let (tx, rx) = oneshot::channel(); let _ = entry.handle.tx.send(SupervisorCmd::Shutdown { ack: tx }).await; let _ = rx.await; reg.remove(&name).await; let handle = crate::daemon::spawn_supervisor(paths, cfg) .map_err(|e| ApiError::rpc(RpcErrorCode::SpawnFailed, e.to_string()))?; reg.insert(name.clone(), crate::daemon::registry::Entry { handle: handle.clone(), config_hash: new_hash, }).await; let (tx, rx) = oneshot::channel(); let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await; let _ = rx.await; changed.push(name); } Some(_) => unchanged.push(name), } } Ok(ReloadResult { added, removed, changed, unchanged }) } ``` - [ ] **Step 2: Build** Run: `cargo build -p xy` Expected: compiles. - [ ] **Step 3: Commit** ```bash git add crates/xy/ git commit -m "feat(xy): reload handler with diff" ``` ### Task 28: RPC handlers — logs + logs_cancel **Files:** - Modify: `crates/xy/src/daemon/handlers.rs` - [ ] **Step 1: Add `ConnState` and rewire `serve`** At the top of the imports add: ```rust use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::sync::Mutex; use tokio::task::JoinHandle; use xy_protocol::rpc::{notifications, LogEnd, LogLine, LogsCancelParams, LogsParams, LogsSubscribed}; ``` Add `ConnState` and update `serve`: ```rust pub struct ConnState { pub subs: Mutex>>, pub next: AtomicU64, } impl ConnState { pub fn new() -> Self { Self { subs: Mutex::new(HashMap::new()), next: AtomicU64::new(1) } } } pub async fn serve(conn: Arc, reg: Registry, _paths: Paths) -> std::io::Result<()> { let state = Arc::new(ConnState::new()); loop { let Some(incoming) = conn.read_incoming().await? else { let mut subs = state.subs.lock().await; for (_, h) in subs.drain() { h.abort(); } return Ok(()); }; if let Incoming::Request(req) = incoming { let resp = handle_request(req, ®, &conn, &state).await; conn.write_response(&resp).await?; } } } ``` Update `handle_request` signature to: ```rust async fn handle_request( req: Request, reg: &Registry, conn: &Arc, state: &Arc, ) -> Response { /* same body, plus the two new arms below */ } ``` Replace the LOGS / LOGS_CANCEL arms with: ```rust methods::LOGS => { let p: LogsParams = match serde_json::from_value(params) { Ok(p) => p, Err(e) => return err_response(id, -32602, format!("invalid params: {e}")), }; match start_log_stream(reg, conn.clone(), state.clone(), p).await { Ok(sub_id) => ok_response(id, serde_json::to_value(LogsSubscribed { subscription_id: sub_id }).unwrap()), Err(e) => err_response(id, e.code, e.message), } } methods::LOGS_CANCEL => { let p: LogsCancelParams = match serde_json::from_value(params) { Ok(p) => p, Err(e) => return err_response(id, -32602, format!("invalid params: {e}")), }; let mut subs = state.subs.lock().await; if let Some(h) = subs.remove(&p.subscription_id) { h.abort(); } ok_response(id, serde_json::json!({})) } ``` - [ ] **Step 2: Implement `start_log_stream`** Append: ```rust async fn start_log_stream( reg: &Registry, conn: Arc, state: Arc, p: LogsParams, ) -> Result { let Some(entry) = reg.get(&p.name).await else { return Err(ApiError::rpc(RpcErrorCode::ServerNotFound, format!("no such server `{}`", p.name))); }; let sub_id = state.next.fetch_add(1, Ordering::Relaxed); let sink = entry.handle.log_sink.clone(); let conn2 = conn.clone(); let state2 = state.clone(); let follow = p.follow; let tail = p.tail; let name = p.name.clone(); let task = tokio::spawn(async move { for line in sink.ring.snapshot_tail(tail) { let n = xy_ipc::envelope::notification(notifications::LOG, Some(serde_json::to_value(LogLine { subscription_id: sub_id, name: name.clone(), stream: line.stream, line: line.line, ts_unix_ms: line.ts_unix_ms, }).unwrap())); if conn2.write_notification(&n).await.is_err() { return; } } if !follow { let end = xy_ipc::envelope::notification(notifications::LOG_END, Some(serde_json::to_value(LogEnd { subscription_id: sub_id }).unwrap())); let _ = conn2.write_notification(&end).await; state2.subs.lock().await.remove(&sub_id); return; } let mut rx = sink.broadcast.subscribe(); loop { match rx.recv().await { Ok(mut line) => { line.subscription_id = sub_id; let n = xy_ipc::envelope::notification(notifications::LOG, Some(serde_json::to_value(&line).unwrap())); if conn2.write_notification(&n).await.is_err() { break; } } Err(_) => break, } } }); state.subs.lock().await.insert(sub_id, task); Ok(sub_id) } ``` - [ ] **Step 3: Build** Run: `cargo build -p xy` Expected: compiles. If `handle_request` callers in `serve` aren't updated, update the call sites. - [ ] **Step 4: Commit** ```bash git add crates/xy/ git commit -m "feat(xy): logs streaming via subscription notifications" ``` ### Task 29: CLI client implementations **Files:** - Modify: `crates/xy/src/cli/mod.rs` - Create: `crates/xy/src/cli/format.rs` - [ ] **Step 1: Implement `format.rs`** ```rust use xy_protocol::rpc::ServerSummary; pub fn list_table(rows: &[ServerSummary]) -> String { let mut out = String::new(); out.push_str("NAME STATE PID PORT UPTIME RESTARTS\n"); for r in rows { let pid = r.pid.map(|p| p.to_string()).unwrap_or_else(|| "-".into()); let up = r.uptime_secs.map(|s| format!("{}s", s)).unwrap_or_else(|| "-".into()); out.push_str(&format!( "{:<20}{:<12}{:<8}{:<8}{:<10}{}\n", r.name, format!("{:?}", r.state).to_lowercase(), pid, r.port, up, r.restart_count )); } out } ``` - [ ] **Step 2: Replace `cli/mod.rs`** ```rust use crate::paths::Paths; use anyhow::Result; use serde_json::json; use xy_ipc::{Client, ClientError}; use xy_protocol::rpc::{ methods, notifications, LogLine, LogsParams, LogsSubscribed, ReloadResult, RestartResult, ServerSummary, StartResult, StatusDetail, StopResult, }; mod format; async fn connect(paths: &Paths) -> Result { match Client::connect(&paths.socket).await { Ok(c) => Ok(c), Err(e) => { eprintln!("xy: cannot reach daemon at {}: {e}", paths.socket.display()); std::process::exit(2); } } } fn rpc_to_exit(e: &ClientError) -> i32 { match e { ClientError::Unreachable(_) => 2, ClientError::Rpc { .. } => 1, _ => 1, } } pub async fn list(paths: Paths) -> Result { let mut c = connect(&paths).await?; let rows: Vec = match c.call_no_params(methods::LIST).await { Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); } }; print!("{}", format::list_table(&rows)); Ok(0) } pub async fn status(paths: Paths, name: String) -> Result { let mut c = connect(&paths).await?; let d: StatusDetail = match c.call(methods::STATUS, &json!({"name": name})).await { Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); } }; println!("{:#?}", d); Ok(0) } fn name_or_all(all: bool, name: Option) -> serde_json::Value { if all { json!({"all": true}) } else { json!({"name": name.unwrap()}) } } pub async fn start(paths: Paths, all: bool, name: Option) -> Result { let mut c = connect(&paths).await?; let r: StartResult = match c.call(methods::START, &name_or_all(all, name)).await { Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); } }; if !r.started.is_empty() { println!("started: {}", r.started.join(", ")); } if !r.already_running.is_empty() { println!("already running: {}", r.already_running.join(", ")); } Ok(0) } pub async fn stop(paths: Paths, all: bool, name: Option) -> Result { let mut c = connect(&paths).await?; let r: StopResult = match c.call(methods::STOP, &name_or_all(all, name)).await { Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); } }; if !r.stopped.is_empty() { println!("stopped: {}", r.stopped.join(", ")); } if !r.not_running.is_empty() { println!("not running: {}", r.not_running.join(", ")); } Ok(0) } pub async fn restart(paths: Paths, all: bool, name: Option) -> Result { let mut c = connect(&paths).await?; let r: RestartResult = match c.call(methods::RESTART, &name_or_all(all, name)).await { Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); } }; println!("restarted: {}", r.restarted.join(", ")); Ok(0) } pub async fn reload(paths: Paths) -> Result { let mut c = connect(&paths).await?; let r: ReloadResult = match c.call_no_params(methods::RELOAD).await { Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); } }; println!("added: {}", r.added.join(", ")); println!("removed: {}", r.removed.join(", ")); println!("changed: {}", r.changed.join(", ")); println!("unchanged: {}", r.unchanged.join(", ")); Ok(0) } pub async fn logs(paths: Paths, name: String, tail: Option, follow: bool) -> Result { let mut c = connect(&paths).await?; let p = LogsParams { name: name.clone(), tail, follow }; let _sub: LogsSubscribed = match c.call(methods::LOGS, &p).await { Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); } }; loop { match c.read_notification().await { Ok(None) => return Ok(0), Ok(Some(n)) => match n.method.as_str() { notifications::LOG => { if let Some(params) = n.params { if let Ok(line) = serde_json::from_value::(params) { let tag = match line.stream { xy_protocol::rpc::LogStream::Stdout => "out", xy_protocol::rpc::LogStream::Stderr => "err", }; println!("[{tag}] {}", line.line); } } } notifications::LOG_END => return Ok(0), _ => {} }, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); } } } } ``` - [ ] **Step 3: Build** Run: `cargo build --workspace` Expected: full workspace compiles. - [ ] **Step 4: Commit** ```bash git add crates/xy/ git commit -m "feat(xy): CLI client commands" ``` --- ## Phase 6 — Integration tests ### Task 30: Test helper binaries **Files:** - Create: `crates/xy/src/bin/xy_test_sleep_server.rs` - Create: `crates/xy/src/bin/xy_test_exit_failure.rs` - Modify: `crates/xy/Cargo.toml` - [ ] **Step 1: `xy_test_sleep_server.rs`** ```rust fn main() { use std::io::Write; let pid = std::process::id(); eprintln!("sleep_server start pid={pid}"); println!("ready"); std::io::stdout().flush().ok(); loop { std::thread::sleep(std::time::Duration::from_secs(60)); println!("tick"); std::io::stdout().flush().ok(); } } ``` - [ ] **Step 2: `xy_test_exit_failure.rs`** ```rust fn main() { eprintln!("exit_failure dying immediately"); std::process::exit(7); } ``` - [ ] **Step 3: Declare in `crates/xy/Cargo.toml`** ```toml [[bin]] name = "xy-test-sleep-server" path = "src/bin/xy_test_sleep_server.rs" [[bin]] name = "xy-test-exit-failure" path = "src/bin/xy_test_exit_failure.rs" ``` - [ ] **Step 4: Build** Run: `cargo build -p xy --bins` Expected: three binaries produced. - [ ] **Step 5: Commit** ```bash git add crates/xy/ git commit -m "test(xy): helper binaries for integration tests" ``` ### Task 31: Integration test scaffolding **Files:** - Create: `crates/xy/tests/common/mod.rs` - [ ] **Step 1: Write the harness** ```rust use std::path::PathBuf; use std::process::Stdio; use std::time::Duration; use tokio::process::{Child, Command}; use tempfile::TempDir; pub struct Harness { pub tmp: TempDir, pub config_dir: PathBuf, pub state_dir: PathBuf, pub socket: PathBuf, pub daemon: Option, } impl Harness { pub fn new() -> Self { let tmp = tempfile::tempdir().expect("tempdir"); std::fs::create_dir_all(tmp.path().join("config")).unwrap(); std::fs::create_dir_all(tmp.path().join("state")).unwrap(); std::fs::create_dir_all(tmp.path().join("run")).unwrap(); let config_dir = tmp.path().join("config/xy/servers"); let state_dir = tmp.path().join("state/xy"); std::fs::create_dir_all(&config_dir).unwrap(); std::fs::create_dir_all(&state_dir).unwrap(); let socket = tmp.path().join("run/xy.sock"); Self { tmp, config_dir, state_dir, socket, daemon: None } } pub fn write_server(&self, name: &str, command: &str, port: u16, restart_policy: &str) { let body = format!( "command \"{command}\"\nport {port}\nrestart {{ policy \"{restart_policy}\" backoff-initial \"10ms\" backoff-max \"50ms\" max-retries-per-minute 3 }}\nstop {{ grace \"500ms\" }}\n" ); std::fs::write(self.config_dir.join(format!("{name}.kdl")), body).unwrap(); } pub async fn start_daemon(&mut self, xy_bin: &PathBuf) { let child = Command::new(xy_bin) .arg("daemon") .env("XDG_CONFIG_HOME", self.tmp.path().join("config")) .env("XDG_STATE_HOME", self.tmp.path().join("state")) .env("XDG_RUNTIME_DIR", self.tmp.path().join("run")) .stdout(Stdio::null()) .stderr(Stdio::inherit()) .kill_on_drop(true) .spawn() .expect("spawn daemon"); self.daemon = Some(child); let deadline = std::time::Instant::now() + Duration::from_secs(5); while !self.socket.exists() { if std::time::Instant::now() > deadline { panic!("daemon socket never appeared"); } tokio::time::sleep(Duration::from_millis(25)).await; } } pub async fn run_cli(&self, xy_bin: &PathBuf, args: &[&str]) -> (i32, String, String) { let out = Command::new(xy_bin) .args(args) .env("XDG_CONFIG_HOME", self.tmp.path().join("config")) .env("XDG_STATE_HOME", self.tmp.path().join("state")) .env("XDG_RUNTIME_DIR", self.tmp.path().join("run")) .output().await.expect("run cli"); let code = out.status.code().unwrap_or(-1); let stdout = String::from_utf8_lossy(&out.stdout).to_string(); let stderr = String::from_utf8_lossy(&out.stderr).to_string(); (code, stdout, stderr) } } pub fn xy_bin() -> PathBuf { artifact("xy") } pub fn sleep_server_bin() -> PathBuf { artifact("xy-test-sleep-server") } pub fn exit_failure_bin() -> PathBuf { artifact("xy-test-exit-failure") } fn artifact(name: &str) -> PathBuf { let mut p = std::env::current_exe().unwrap(); p.pop(); if p.ends_with("deps") { p.pop(); } p.push(name); if !p.exists() { panic!("artifact `{}` not found at {}", name, p.display()); } p } ``` - [ ] **Step 2: Build** Run: `cargo build --workspace --tests` Expected: compiles. - [ ] **Step 3: Commit** ```bash git add crates/xy/ git commit -m "test(xy): integration test harness" ``` ### Task 32: Lifecycle integration test **Files:** - Create: `crates/xy/tests/lifecycle.rs` - [ ] **Step 1: Write the test** ```rust mod common; use common::*; #[tokio::test] async fn auto_starts_on_boot_then_stop_and_start() { let xy = xy_bin(); let sleeper = sleep_server_bin(); let mut h = Harness::new(); h.write_server("alpha", sleeper.to_str().unwrap(), 19_001, "always"); h.start_daemon(&xy).await; let mut last_stdout = String::new(); for _ in 0..40 { let (_c, out, _e) = h.run_cli(&xy, &["list"]).await; last_stdout = out; if last_stdout.contains("alpha") && last_stdout.contains("running") { break; } tokio::time::sleep(std::time::Duration::from_millis(100)).await; } assert!(last_stdout.contains("alpha"), "stdout: {last_stdout}"); assert!(last_stdout.contains("running"), "stdout: {last_stdout}"); let (code, out, _e) = h.run_cli(&xy, &["stop", "alpha"]).await; assert_eq!(code, 0); assert!(out.contains("stopped: alpha"), "stdout: {out}"); let (code, out, _e) = h.run_cli(&xy, &["start", "alpha"]).await; assert_eq!(code, 0); assert!(out.contains("started: alpha"), "stdout: {out}"); } ``` - [ ] **Step 2: Run** Run: `cargo test -p xy --test lifecycle -- --nocapture` Expected: passes. - [ ] **Step 3: Commit** ```bash git add crates/xy/ git commit -m "test(xy): auto-start + stop/start lifecycle" ``` ### Task 33: Restart cap integration test **Files:** - Create: `crates/xy/tests/restart_policy.rs` - [ ] **Step 1: Write the test** ```rust mod common; use common::*; #[tokio::test] async fn restart_cap_marks_failed() { let xy = xy_bin(); let bad = exit_failure_bin(); let mut h = Harness::new(); h.write_server("flaky", bad.to_str().unwrap(), 19_010, "always"); h.start_daemon(&xy).await; let mut saw_failed = false; for _ in 0..60 { let (_c, out, _e) = h.run_cli(&xy, &["list"]).await; if out.contains("flaky") && out.contains("failed") { saw_failed = true; break; } tokio::time::sleep(std::time::Duration::from_millis(100)).await; } assert!(saw_failed, "flaky never reached failed state"); } ``` - [ ] **Step 2: Run** Run: `cargo test -p xy --test restart_policy -- --nocapture` Expected: passes. - [ ] **Step 3: Commit** ```bash git add crates/xy/ git commit -m "test(xy): restart cap escalates to failed" ``` ### Task 34: Reload integration test **Files:** - Create: `crates/xy/tests/reload.rs` - [ ] **Step 1: Write the test** ```rust mod common; use common::*; #[tokio::test] async fn reload_adds_removes_and_changes() { let xy = xy_bin(); let sleeper = sleep_server_bin(); let mut h = Harness::new(); h.write_server("a", sleeper.to_str().unwrap(), 19_020, "always"); h.start_daemon(&xy).await; for _ in 0..40 { let (_c, out, _e) = h.run_cli(&xy, &["list"]).await; if out.contains("a") && out.contains("running") { break; } tokio::time::sleep(std::time::Duration::from_millis(100)).await; } h.write_server("a", sleeper.to_str().unwrap(), 19_021, "always"); h.write_server("b", sleeper.to_str().unwrap(), 19_022, "always"); let (code, out, _e) = h.run_cli(&xy, &["reload"]).await; assert_eq!(code, 0); assert!(out.contains("added:") && out.contains("b"), "stdout: {out}"); assert!(out.contains("changed:") && out.contains("a"), "stdout: {out}"); std::fs::remove_file(h.config_dir.join("a.kdl")).unwrap(); let (code, out, _e) = h.run_cli(&xy, &["reload"]).await; assert_eq!(code, 0); assert!(out.contains("removed:") && out.contains("a"), "stdout: {out}"); } ``` - [ ] **Step 2: Run** Run: `cargo test -p xy --test reload -- --nocapture` Expected: passes. - [ ] **Step 3: Commit** ```bash git add crates/xy/ git commit -m "test(xy): reload diff" ``` ### Task 35: Logs integration test **Files:** - Create: `crates/xy/tests/logs.rs` - [ ] **Step 1: Write the test** ```rust mod common; use common::*; use std::process::Stdio; use std::time::Duration; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; #[tokio::test] async fn logs_tail_prints_existing_lines() { let xy = xy_bin(); let sleeper = sleep_server_bin(); let mut h = Harness::new(); h.write_server("svc", sleeper.to_str().unwrap(), 19_030, "always"); h.start_daemon(&xy).await; tokio::time::sleep(Duration::from_millis(500)).await; let (code, out, _e) = h.run_cli(&xy, &["logs", "svc", "--tail", "10"]).await; assert_eq!(code, 0); assert!(out.contains("ready"), "stdout: {out}"); } #[tokio::test] async fn logs_follow_streams_new_lines() { let xy = xy_bin(); let sleeper = sleep_server_bin(); let mut h = Harness::new(); h.write_server("svc", sleeper.to_str().unwrap(), 19_031, "always"); h.start_daemon(&xy).await; let mut child = Command::new(&xy) .args(["logs", "svc", "--follow"]) .env("XDG_CONFIG_HOME", h.tmp.path().join("config")) .env("XDG_STATE_HOME", h.tmp.path().join("state")) .env("XDG_RUNTIME_DIR", h.tmp.path().join("run")) .stdout(Stdio::piped()) .stderr(Stdio::null()) .kill_on_drop(true) .spawn().unwrap(); let stdout = child.stdout.take().unwrap(); let mut lines = BufReader::new(stdout).lines(); let first = tokio::time::timeout(Duration::from_secs(2), lines.next_line()).await .expect("timeout waiting for first log line").unwrap(); assert!(first.is_some()); let _ = child.kill().await; } ``` - [ ] **Step 2: Run** Run: `cargo test -p xy --test logs -- --nocapture` Expected: passes. - [ ] **Step 3: Commit** ```bash git add crates/xy/ git commit -m "test(xy): logs --tail and --follow" ``` --- ## Phase 7 — Polish ### Task 36: CI-clean **Files:** none (verification only). - [ ] **Step 1: Format** Run: `cargo +nightly fmt --all` - [ ] **Step 2: Lint** Run: `cargo clippy --workspace --all-targets -- -D warnings` Fix any warnings inline. - [ ] **Step 3: Full test pass** Run: `cargo test --workspace` Expected: every test green. - [ ] **Step 4: Commit cleanups** ```bash git add -A git commit -m "chore: cargo fmt + clippy clean" ``` ### Task 37: README + example KDL **Files:** - Create: `README.md` - Create: `examples/insikt.kdl` - [ ] **Step 1: Write `README.md`** ```markdown # xy — HTTP MCP server supervisor Daemon + CLI that launches and supervises HTTP-based MCP servers. ## Build cargo build --release ## Run target/release/xy daemon # foreground Drop a server definition into `$XDG_CONFIG_HOME/xy/servers/.kdl` (see `examples/insikt.kdl`) and `xy reload`. Commands: xy list xy status xy start xy stop xy restart xy reload xy logs [--tail N] [--follow] Exit codes: 0 success, 1 operational error, 2 daemon unreachable, 3 config invalid. ``` - [ ] **Step 2: Write `examples/insikt.kdl`** ```kdl command "/Users/you/.cargo/bin/insikt-mcp" args "--http" "--port" "8421" port 8421 env { RUST_LOG "info" } restart { policy "on-failure" } stop { grace "10s" } ``` - [ ] **Step 3: Commit** ```bash git add README.md examples/ git commit -m "docs: README and example KDL config" ``` --- ## Self-Review (run before handoff) **Spec coverage:** - Single `xy` binary, Cargo workspace with 4 crates — Tasks 1, 8–16, 17–20, 21+. - XDG paths (macOS too via etcetera) — Task 21. - Per-server KDL configs, duplicate-port detection at load — Tasks 5, 6. - JSON-RPC over newline-delimited Unix socket, 0600 perms — Tasks 17–20. - All RPC methods including subscription-based `logs` + `logs_cancel` — Tasks 26, 27, 28. - Restart policy + backoff + retry window → failed — Tasks 9–11, 16, 33. - Stop with SIGTERM, grace timer, SIGKILL — Task 16, 25 (daemon shutdown). - Log rotation + ring buffer + broadcast — Tasks 12–14. - Process-group ownership of child — Task 15. - Tests at unit + integration level — Tasks 8–16 (units), 32–35 (integration). - CLI exit code mapping — Task 29 (`rpc_to_exit`). **Placeholders:** none — every step contains its code or commands. **Type consistency:** - `SupervisorHandle.tx` is `mpsc::Sender` everywhere. - `LogLine.subscription_id` is `u64` everywhere. - `Registry` uses `Entry { handle, config_hash }` consistently in Tasks 24 onward. **Order:** later tasks depend only on earlier ones. xy-protocol (Phase 2) feeds everything; xy-supervisor (Phase 3) uses xy-protocol; xy-ipc (Phase 4) is independent of supervisor; xy binary (Phase 5) ties them all together; integration tests (Phase 6) drive end-to-end; polish (Phase 7) closes out.