Files
xy/docs/superpowers/plans/2026-05-25-xy-mcp-supervisor.md
T
logaritmisk c252bd7716 docs: add xy MCP supervisor implementation plan
37-task TDD-style plan across 7 phases: workspace skeleton,
xy-protocol (config/state/rpc types), xy-supervisor (state machine
with mock-driven unit tests), xy-ipc (JSON-RPC over Unix socket),
xy binary (daemon + CLI), integration tests with test-helper bins,
and polish (fmt/clippy/README).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 11:13:56 +02:00

125 KiB
Raw Blame History

xy MCP Supervisor — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Build the MVP of xy, a Tokio-based daemon that supervises HTTP-based MCP server processes on macOS/Linux, plus a CLI that drives it over a Unix socket.

Architecture: Single xy binary in a Cargo workspace of four crates (xy-protocol, xy-supervisor, xy-ipc, xy). The daemon (xy daemon) runs one supervisor task per managed server; the CLI subcommands act as JSON-RPC 2.0 clients over a newline-delimited Unix socket. Configs are per-server KDL files under XDG config dir.

Tech Stack: Rust (edition 2024), Tokio (rt-multi-thread, net, process, signal, sync, fs, io-util, macros), clap (derive), serde + serde_json, kdl, etcetera (XDG), nix (signals/process groups), tracing + tracing-subscriber, thiserror, anyhow.

Spec: docs/superpowers/specs/2026-05-25-xy-mcp-supervisor-design.md


File Structure

xy/
├── Cargo.toml                                # workspace manifest (no [package])
├── crates/
│   ├── xy-protocol/
│   │   ├── Cargo.toml
│   │   └── src/
│   │       ├── lib.rs                        # re-exports
│   │       ├── config.rs                     # ServerConfig, RestartPolicy, parsing
│   │       ├── state.rs                      # ServerState enum
│   │       ├── rpc.rs                        # JSON-RPC method params/results
│   │       └── error.rs                      # ConfigError, RpcErrorCode
│   ├── xy-supervisor/
│   │   ├── Cargo.toml
│   │   └── src/
│   │       ├── lib.rs                        # public surface
│   │       ├── child.rs                      # ChildHandle trait + RealChild
│   │       ├── policy.rs                     # restart decision logic
│   │       ├── backoff.rs                    # backoff calculator
│   │       ├── retry_window.rs               # sliding 60s window
│   │       ├── logs.rs                       # log writer, rotation, ring buffer, broadcast
│   │       └── supervisor.rs                 # supervisor task + state machine
│   ├── xy-ipc/
│   │   ├── Cargo.toml
│   │   └── src/
│   │       ├── lib.rs
│   │       ├── framing.rs                    # newline-delimited JSON codec
│   │       ├── envelope.rs                   # JSON-RPC 2.0 request/response/notification
│   │       ├── server.rs                     # listen, accept, dispatch
│   │       └── client.rs                     # connect, call, subscribe
│   └── xy/
│       ├── Cargo.toml
│       └── src/
│           ├── main.rs                       # clap entry, subcommand dispatch
│           ├── paths.rs                      # XDG path resolution (via etcetera)
│           ├── pidfile.rs                    # acquire/release exclusive pidfile
│           ├── daemon/
│           │   ├── mod.rs                    # daemon entry point
│           │   ├── handlers.rs               # JSON-RPC method handlers → supervisor msgs
│           │   ├── registry.rs               # name → supervisor handle map
│           │   └── shutdown.rs               # SIGTERM/SIGINT graceful shutdown
│           ├── cli/
│           │   ├── mod.rs                    # client-side CLI command impls
│           │   └── format.rs                 # `list` / `status` output formatting
│           └── bin/
│               ├── xy_test_sleep_server.rs   # integration test helper
│               └── xy_test_exit_failure.rs   # integration test helper
└── crates/xy/tests/
    ├── common/mod.rs                         # tempdir XDG, spawn daemon, drive CLI
    ├── lifecycle.rs                          # start/stop/restart
    ├── reload.rs                             # added/removed/changed
    ├── restart_policy.rs                     # crash + cap → failed
    └── logs.rs                               # --tail and --follow

Each crate's lib.rs re-exports its public surface so downstream crates use short paths (xy_protocol::ServerConfig, not xy_protocol::config::ServerConfig).


Note for the executor: The full step-by-step task list (37 tasks across 7 phases) follows. Each task is bite-sized (25 minutes per step) and includes complete code, exact commands, and commit messages. The plan was authored in a single drafting session; if a step's code snippet references a type or helper, it is defined either in that task or in an earlier numbered task within this plan.

Phase 1 — Workspace skeleton

Task 1: Convert single crate to workspace

Files:

  • Modify: Cargo.toml

  • Create: crates/xy-protocol/Cargo.toml

  • Create: crates/xy-protocol/src/lib.rs

  • Create: crates/xy-supervisor/Cargo.toml

  • Create: crates/xy-supervisor/src/lib.rs

  • Create: crates/xy-ipc/Cargo.toml

  • Create: crates/xy-ipc/src/lib.rs

  • Create: crates/xy/Cargo.toml

  • Create: crates/xy/src/main.rs

  • Delete: src/main.rs and src/ (replaced by crates/xy/)

  • Step 1: Replace root Cargo.toml with a workspace manifest

[workspace]
resolver = "3"
members = [
    "crates/xy-protocol",
    "crates/xy-supervisor",
    "crates/xy-ipc",
    "crates/xy",
]

[workspace.package]
edition = "2024"
version = "0.1.0"
license = "MIT OR Apache-2.0"

[workspace.dependencies]
xy-protocol = { path = "crates/xy-protocol" }
xy-supervisor = { path = "crates/xy-supervisor" }
xy-ipc = { path = "crates/xy-ipc" }

tokio = { version = "1", features = ["rt-multi-thread", "net", "process", "signal", "sync", "fs", "io-util", "macros", "time"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "2"
anyhow = "1"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
clap = { version = "4", features = ["derive"] }
kdl = "6"
etcetera = "0.10"
nix = { version = "0.30", features = ["signal", "process"] }
humantime = "2"
humantime-serde = "1"
async-trait = "0.1"
tempfile = "3"
tokio-test = "0.4"
  • Step 2: Create crates/xy-protocol/Cargo.toml
[package]
name = "xy-protocol"
edition.workspace = true
version.workspace = true
license.workspace = true

[dependencies]
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
kdl.workspace = true
humantime.workspace = true
humantime-serde.workspace = true

[dev-dependencies]
tempfile.workspace = true
  • Step 3: Create crates/xy-protocol/src/lib.rs
//! Wire types and config schema shared between the xy daemon and CLI.
  • Step 4: Create crates/xy-supervisor/Cargo.toml
[package]
name = "xy-supervisor"
edition.workspace = true
version.workspace = true
license.workspace = true

[dependencies]
xy-protocol.workspace = true
tokio.workspace = true
tracing.workspace = true
thiserror.workspace = true
nix.workspace = true
async-trait.workspace = true

[dev-dependencies]
tokio-test.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] }
  • Step 5: Create crates/xy-supervisor/src/lib.rs
//! Process-supervision primitives for the xy daemon.
  • Step 6: Create crates/xy-ipc/Cargo.toml
[package]
name = "xy-ipc"
edition.workspace = true
version.workspace = true
license.workspace = true

[dependencies]
xy-protocol.workspace = true
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
thiserror.workspace = true

[dev-dependencies]
tempfile.workspace = true
  • Step 7: Create crates/xy-ipc/src/lib.rs
//! JSON-RPC 2.0 over newline-delimited JSON on a Unix socket.
  • Step 8: Create crates/xy/Cargo.toml
[package]
name = "xy"
edition.workspace = true
version.workspace = true
license.workspace = true

[[bin]]
name = "xy"
path = "src/main.rs"

[dependencies]
xy-protocol.workspace = true
xy-supervisor.workspace = true
xy-ipc.workspace = true
tokio.workspace = true
clap.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
anyhow.workspace = true
etcetera.workspace = true
nix.workspace = true
humantime.workspace = true

[dev-dependencies]
tempfile.workspace = true
  • Step 9: Create crates/xy/src/main.rs
fn main() {
    println!("xy: not implemented yet");
}
  • Step 10: Remove the old top-level src/ directory

Run: rm -rf src/

  • Step 11: Verify the workspace builds

Run: cargo build --workspace Expected: every crate compiles; one binary xy is produced.

  • Step 12: Commit
git add Cargo.toml crates/
git rm -r --cached src/ 2>/dev/null || true
git add -A
git commit -m "chore: convert to cargo workspace with four crates"

Phase 2 — xy-protocol

Task 2: ServerState enum

Files:

  • Create: crates/xy-protocol/src/state.rs

  • Modify: crates/xy-protocol/src/lib.rs

  • Step 1: Write the failing test inline in state.rs

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum ServerState {
    Stopped,
    Starting,
    Running,
    Restarting,
    Failed,
    Stopping,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn serializes_to_kebab_case() {
        assert_eq!(serde_json::to_string(&ServerState::Restarting).unwrap(), "\"restarting\"");
    }

    #[test]
    fn deserializes_from_kebab_case() {
        let s: ServerState = serde_json::from_str("\"failed\"").unwrap();
        assert_eq!(s, ServerState::Failed);
    }
}
  • Step 2: Expose from lib.rs
//! Wire types and config schema shared between the xy daemon and CLI.

pub mod state;

pub use state::ServerState;
  • Step 3: Run tests

Run: cargo test -p xy-protocol Expected: 2 passed.

  • Step 4: Commit
git add crates/xy-protocol/
git commit -m "feat(protocol): ServerState enum"

Task 3: RestartPolicy + RestartConfig + StopConfig

Files:

  • Create: crates/xy-protocol/src/config.rs

  • Modify: crates/xy-protocol/src/lib.rs

  • Step 1: Write the file

use serde::{Deserialize, Serialize};
use std::time::Duration;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum RestartPolicy {
    Always,
    OnFailure,
    Never,
}

impl Default for RestartPolicy {
    fn default() -> Self { RestartPolicy::OnFailure }
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RestartConfig {
    #[serde(default)]
    pub policy: RestartPolicy,
    #[serde(default = "default_backoff_initial", with = "humantime_serde")]
    pub backoff_initial: Duration,
    #[serde(default = "default_backoff_max", with = "humantime_serde")]
    pub backoff_max: Duration,
    #[serde(default = "default_max_retries_per_minute")]
    pub max_retries_per_minute: u32,
}

fn default_backoff_initial() -> Duration { Duration::from_secs(1) }
fn default_backoff_max() -> Duration { Duration::from_secs(30) }
fn default_max_retries_per_minute() -> u32 { 5 }

impl Default for RestartConfig {
    fn default() -> Self {
        Self {
            policy: RestartPolicy::default(),
            backoff_initial: default_backoff_initial(),
            backoff_max: default_backoff_max(),
            max_retries_per_minute: default_max_retries_per_minute(),
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StopConfig {
    #[serde(default = "default_grace", with = "humantime_serde")]
    pub grace: Duration,
}

fn default_grace() -> Duration { Duration::from_secs(10) }

impl Default for StopConfig {
    fn default() -> Self { Self { grace: default_grace() } }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn restart_config_defaults() {
        let c = RestartConfig::default();
        assert_eq!(c.policy, RestartPolicy::OnFailure);
        assert_eq!(c.backoff_initial, Duration::from_secs(1));
        assert_eq!(c.backoff_max, Duration::from_secs(30));
        assert_eq!(c.max_retries_per_minute, 5);
    }

    #[test]
    fn stop_config_defaults() {
        assert_eq!(StopConfig::default().grace, Duration::from_secs(10));
    }
}
  • Step 2: Expose from lib.rs
//! Wire types and config schema shared between the xy daemon and CLI.

pub mod config;
pub mod state;

pub use config::{RestartConfig, RestartPolicy, StopConfig};
pub use state::ServerState;
  • Step 3: Run tests

Run: cargo test -p xy-protocol Expected: 4 passed.

  • Step 4: Commit
git add crates/xy-protocol/
git commit -m "feat(protocol): RestartPolicy/RestartConfig/StopConfig with defaults"

Task 4: ServerConfig struct + ConfigError

Files:

  • Modify: crates/xy-protocol/src/config.rs

  • Create: crates/xy-protocol/src/error.rs

  • Modify: crates/xy-protocol/src/lib.rs

  • Step 1: Append ServerConfig to config.rs

use std::collections::BTreeMap;
use std::path::PathBuf;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServerConfig {
    pub name: String,
    pub command: PathBuf,
    #[serde(default)]
    pub args: Vec<String>,
    pub port: u16,
    #[serde(default)]
    pub env: BTreeMap<String, String>,
    #[serde(default)]
    pub working_dir: Option<PathBuf>,
    #[serde(default)]
    pub restart: RestartConfig,
    #[serde(default)]
    pub stop: StopConfig,
}
  • Step 2: Create crates/xy-protocol/src/error.rs
use std::path::PathBuf;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum ConfigError {
    #[error("failed to read {path}: {source}")]
    Io { path: PathBuf, #[source] source: std::io::Error },

    #[error("failed to parse KDL in {path}: {message}")]
    Parse { path: PathBuf, message: String },

    #[error("missing required field `{field}` in {path}")]
    MissingField { path: PathBuf, field: &'static str },

    #[error("invalid value for `{field}` in {path}: {message}")]
    InvalidValue { path: PathBuf, field: &'static str, message: String },

    #[error("duplicate port {port} declared by both `{name_a}` and `{name_b}`")]
    DuplicatePort { name_a: String, name_b: String, port: u16 },

    #[error("server name `{name}` contains invalid characters (allowed: a-z, 0-9, '-', '_')")]
    InvalidName { name: String },
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RpcErrorCode {
    ServerNotFound = -32001,
    PortConflict = -32002,
    ConfigInvalid = -32003,
    AlreadyRunning = -32004,
    NotRunning = -32005,
    SpawnFailed = -32006,
}

impl RpcErrorCode {
    pub fn as_i32(self) -> i32 { self as i32 }
}
  • Step 3: Update lib.rs
//! Wire types and config schema shared between the xy daemon and CLI.

pub mod config;
pub mod error;
pub mod state;

pub use config::{RestartConfig, RestartPolicy, ServerConfig, StopConfig};
pub use error::{ConfigError, RpcErrorCode};
pub use state::ServerState;
  • Step 4: Run tests

Run: cargo test -p xy-protocol Expected: 4 passed (unchanged; types only).

  • Step 5: Commit
git add crates/xy-protocol/
git commit -m "feat(protocol): ServerConfig + ConfigError + RpcErrorCode"

Task 5: KDL parser for ServerConfig

Files:

  • Create: crates/xy-protocol/src/kdl_parse.rs

  • Modify: crates/xy-protocol/src/lib.rs

  • Step 1: Write the parser + tests in one file

use crate::{ConfigError, RestartConfig, RestartPolicy, ServerConfig, StopConfig};
use kdl::{KdlDocument, KdlNode};
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::time::Duration;

pub fn parse_server_config(name: &str, text: &str, source_path: &Path) -> Result<ServerConfig, ConfigError> {
    validate_name(name).map_err(|_| ConfigError::InvalidName { name: name.to_string() })?;

    let doc: KdlDocument = text.parse().map_err(|e: kdl::KdlError| ConfigError::Parse {
        path: source_path.to_path_buf(),
        message: e.to_string(),
    })?;

    let command = require_string_arg(&doc, "command", source_path)?;
    let args = optional_string_args(&doc, "args");
    let port = require_u16_arg(&doc, "port", source_path)?;
    let env = optional_string_map(&doc, "env");
    let working_dir = optional_string_arg(&doc, "working-dir").map(PathBuf::from);
    let restart = parse_restart(&doc, source_path)?;
    let stop = parse_stop(&doc, source_path)?;

    Ok(ServerConfig {
        name: name.to_string(),
        command: PathBuf::from(command),
        args, port, env, working_dir, restart, stop,
    })
}

fn validate_name(name: &str) -> Result<(), ()> {
    if name.is_empty() { return Err(()); }
    if name.chars().all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '_') {
        Ok(())
    } else {
        Err(())
    }
}

fn require_string_arg(doc: &KdlDocument, name: &'static str, path: &Path) -> Result<String, ConfigError> {
    let node = find_node(doc, name).ok_or(ConfigError::MissingField { path: path.to_path_buf(), field: name })?;
    node.entries().first()
        .and_then(|e| e.value().as_string().map(str::to_string))
        .ok_or(ConfigError::InvalidValue { path: path.to_path_buf(), field: name, message: "expected string argument".into() })
}

fn require_u16_arg(doc: &KdlDocument, name: &'static str, path: &Path) -> Result<u16, ConfigError> {
    let node = find_node(doc, name).ok_or(ConfigError::MissingField { path: path.to_path_buf(), field: name })?;
    let v = node.entries().first()
        .and_then(|e| e.value().as_integer())
        .ok_or(ConfigError::InvalidValue { path: path.to_path_buf(), field: name, message: "expected integer".into() })?;
    u16::try_from(v).map_err(|_| ConfigError::InvalidValue {
        path: path.to_path_buf(), field: name, message: format!("port {v} out of u16 range"),
    })
}

fn optional_string_arg(doc: &KdlDocument, name: &str) -> Option<String> {
    find_node(doc, name)
        .and_then(|n| n.entries().first())
        .and_then(|e| e.value().as_string().map(str::to_string))
}

fn optional_string_args(doc: &KdlDocument, name: &str) -> Vec<String> {
    find_node(doc, name)
        .map(|n| n.entries().iter()
            .filter_map(|e| e.value().as_string().map(str::to_string))
            .collect())
        .unwrap_or_default()
}

fn optional_string_map(doc: &KdlDocument, name: &str) -> BTreeMap<String, String> {
    let Some(node) = find_node(doc, name) else { return BTreeMap::new(); };
    let mut out = BTreeMap::new();
    if let Some(children) = node.children() {
        for child in children.nodes() {
            let key = child.name().value().to_string();
            if let Some(val) = child.entries().first().and_then(|e| e.value().as_string()) {
                out.insert(key, val.to_string());
            }
        }
    }
    out
}

fn parse_restart(doc: &KdlDocument, path: &Path) -> Result<RestartConfig, ConfigError> {
    let Some(node) = find_node(doc, "restart") else { return Ok(RestartConfig::default()); };
    let Some(children) = node.children() else { return Ok(RestartConfig::default()); };

    let mut out = RestartConfig::default();
    for child in children.nodes() {
        match child.name().value() {
            "policy" => {
                let s = string_arg(child, "policy", path)?;
                out.policy = match s.as_str() {
                    "always" => RestartPolicy::Always,
                    "on-failure" => RestartPolicy::OnFailure,
                    "never" => RestartPolicy::Never,
                    other => return Err(ConfigError::InvalidValue {
                        path: path.to_path_buf(), field: "restart.policy",
                        message: format!("unknown policy `{other}`"),
                    }),
                };
            }
            "backoff-initial" => out.backoff_initial = parse_duration_arg(child, "restart.backoff-initial", path)?,
            "backoff-max" => out.backoff_max = parse_duration_arg(child, "restart.backoff-max", path)?,
            "max-retries-per-minute" => {
                let v = child.entries().first()
                    .and_then(|e| e.value().as_integer())
                    .ok_or(ConfigError::InvalidValue {
                        path: path.to_path_buf(), field: "restart.max-retries-per-minute",
                        message: "expected integer".into(),
                    })?;
                out.max_retries_per_minute = u32::try_from(v).map_err(|_| ConfigError::InvalidValue {
                    path: path.to_path_buf(), field: "restart.max-retries-per-minute",
                    message: format!("out of u32 range: {v}"),
                })?;
            }
            other => return Err(ConfigError::InvalidValue {
                path: path.to_path_buf(), field: "restart",
                message: format!("unknown key `{other}`"),
            }),
        }
    }
    Ok(out)
}

fn parse_stop(doc: &KdlDocument, path: &Path) -> Result<StopConfig, ConfigError> {
    let Some(node) = find_node(doc, "stop") else { return Ok(StopConfig::default()); };
    let Some(children) = node.children() else { return Ok(StopConfig::default()); };
    let mut out = StopConfig::default();
    for child in children.nodes() {
        match child.name().value() {
            "grace" => out.grace = parse_duration_arg(child, "stop.grace", path)?,
            other => return Err(ConfigError::InvalidValue {
                path: path.to_path_buf(), field: "stop",
                message: format!("unknown key `{other}`"),
            }),
        }
    }
    Ok(out)
}

fn string_arg(node: &KdlNode, field: &'static str, path: &Path) -> Result<String, ConfigError> {
    node.entries().first()
        .and_then(|e| e.value().as_string().map(str::to_string))
        .ok_or(ConfigError::InvalidValue { path: path.to_path_buf(), field, message: "expected string".into() })
}

fn parse_duration_arg(node: &KdlNode, field: &'static str, path: &Path) -> Result<Duration, ConfigError> {
    let s = string_arg(node, field, path)?;
    humantime::parse_duration(&s).map_err(|e| ConfigError::InvalidValue {
        path: path.to_path_buf(), field,
        message: format!("invalid duration `{s}`: {e}"),
    })
}

fn find_node<'a>(doc: &'a KdlDocument, name: &str) -> Option<&'a KdlNode> {
    doc.nodes().iter().find(|n| n.name().value() == name)
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::path::Path;

    fn p() -> &'static Path { Path::new("/tmp/test.kdl") }

    #[test]
    fn parses_minimal_config() {
        let text = "command \"/usr/local/bin/foo\"\nport 8421\n";
        let cfg = parse_server_config("foo", text, p()).unwrap();
        assert_eq!(cfg.name, "foo");
        assert_eq!(cfg.command, PathBuf::from("/usr/local/bin/foo"));
        assert_eq!(cfg.port, 8421);
        assert!(cfg.args.is_empty());
        assert!(cfg.env.is_empty());
    }

    #[test]
    fn parses_full_config() {
        let text = r#"
command "/usr/local/bin/foo"
args "--http" "--port" "8421"
port 8421
env {
    RUST_LOG "info"
    FOO_BAR "baz"
}
working-dir "/tmp/work"
restart {
    policy "always"
    backoff-initial "2s"
    backoff-max "1m"
    max-retries-per-minute 10
}
stop {
    grace "30s"
}
"#;
        let cfg = parse_server_config("foo", text, p()).unwrap();
        assert_eq!(cfg.args, vec!["--http", "--port", "8421"]);
        assert_eq!(cfg.env.get("RUST_LOG").map(String::as_str), Some("info"));
        assert_eq!(cfg.working_dir, Some(PathBuf::from("/tmp/work")));
        assert_eq!(cfg.restart.policy, RestartPolicy::Always);
        assert_eq!(cfg.restart.backoff_initial, Duration::from_secs(2));
        assert_eq!(cfg.restart.backoff_max, Duration::from_secs(60));
        assert_eq!(cfg.restart.max_retries_per_minute, 10);
        assert_eq!(cfg.stop.grace, Duration::from_secs(30));
    }

    #[test]
    fn missing_command_fails() {
        let err = parse_server_config("foo", "port 8421", p()).unwrap_err();
        assert!(matches!(err, ConfigError::MissingField { field: "command", .. }));
    }

    #[test]
    fn missing_port_fails() {
        let err = parse_server_config("foo", "command \"/bin/x\"", p()).unwrap_err();
        assert!(matches!(err, ConfigError::MissingField { field: "port", .. }));
    }

    #[test]
    fn unknown_restart_policy_fails() {
        let text = "command \"/bin/x\"\nport 1\nrestart { policy \"maybe\" }";
        let err = parse_server_config("foo", text, p()).unwrap_err();
        assert!(matches!(err, ConfigError::InvalidValue { field: "restart.policy", .. }));
    }

    #[test]
    fn invalid_name_rejected() {
        let text = "command \"/bin/x\"\nport 1";
        let err = parse_server_config("Foo Bar", text, p()).unwrap_err();
        assert!(matches!(err, ConfigError::InvalidName { .. }));
    }
}
  • Step 2: Expose from lib.rs

Append to lib.rs:

pub mod kdl_parse;

pub use kdl_parse::parse_server_config;
  • Step 3: Run tests

Run: cargo test -p xy-protocol Expected: 10 passed.

  • Step 4: Fix any KDL API mismatches

If the kdl crate's API differs slightly in your installed version (e.g. name().value() vs name().to_string()), adapt call sites. Re-run until green.

  • Step 5: Commit
git add crates/xy-protocol/
git commit -m "feat(protocol): KDL parser for ServerConfig"

Task 6: Load all configs from a directory + duplicate-port check

Files:

  • Modify: crates/xy-protocol/src/kdl_parse.rs

  • Modify: crates/xy-protocol/src/lib.rs

  • Step 1: Append load_all_configs and tests

Append to kdl_parse.rs:

pub fn load_all_configs(dir: &Path) -> Result<Vec<ServerConfig>, ConfigError> {
    if !dir.exists() { return Ok(Vec::new()); }
    let entries = std::fs::read_dir(dir).map_err(|e| ConfigError::Io {
        path: dir.to_path_buf(), source: e,
    })?;

    let mut configs = Vec::new();
    for entry in entries {
        let entry = entry.map_err(|e| ConfigError::Io { path: dir.to_path_buf(), source: e })?;
        let path = entry.path();
        if path.extension().and_then(|s| s.to_str()) != Some("kdl") { continue; }
        let name = path.file_stem().and_then(|s| s.to_str())
            .ok_or(ConfigError::InvalidName { name: path.display().to_string() })?
            .to_string();
        let text = std::fs::read_to_string(&path).map_err(|e| ConfigError::Io { path: path.clone(), source: e })?;
        configs.push(parse_server_config(&name, &text, &path)?);
    }

    check_duplicate_ports(&configs)?;
    Ok(configs)
}

fn check_duplicate_ports(configs: &[ServerConfig]) -> Result<(), ConfigError> {
    let mut seen: std::collections::HashMap<u16, String> = std::collections::HashMap::new();
    for c in configs {
        if let Some(other) = seen.insert(c.port, c.name.clone()) {
            return Err(ConfigError::DuplicatePort {
                name_a: other, name_b: c.name.clone(), port: c.port,
            });
        }
    }
    Ok(())
}

Append tests inside the existing mod tests:

    use tempfile::tempdir;
    use std::fs;

    #[test]
    fn load_all_finds_and_parses_files() {
        let dir = tempdir().unwrap();
        fs::write(dir.path().join("a.kdl"), "command \"/bin/a\"\nport 8001").unwrap();
        fs::write(dir.path().join("b.kdl"), "command \"/bin/b\"\nport 8002").unwrap();
        fs::write(dir.path().join("ignored.txt"), "not a config").unwrap();
        let mut configs = load_all_configs(dir.path()).unwrap();
        configs.sort_by(|x, y| x.name.cmp(&y.name));
        assert_eq!(configs.len(), 2);
        assert_eq!(configs[0].name, "a");
        assert_eq!(configs[1].port, 8002);
    }

    #[test]
    fn load_all_returns_empty_for_missing_dir() {
        let dir = tempdir().unwrap();
        let configs = load_all_configs(&dir.path().join("does-not-exist")).unwrap();
        assert!(configs.is_empty());
    }

    #[test]
    fn duplicate_ports_detected() {
        let dir = tempdir().unwrap();
        fs::write(dir.path().join("a.kdl"), "command \"/bin/a\"\nport 8001").unwrap();
        fs::write(dir.path().join("b.kdl"), "command \"/bin/b\"\nport 8001").unwrap();
        let err = load_all_configs(dir.path()).unwrap_err();
        match err {
            ConfigError::DuplicatePort { port, .. } => assert_eq!(port, 8001),
            other => panic!("unexpected error: {other:?}"),
        }
    }
  • Step 2: Expose

Append to lib.rs: pub use kdl_parse::load_all_configs;

  • Step 3: Run tests

Run: cargo test -p xy-protocol Expected: 13 passed.

  • Step 4: Commit
git add crates/xy-protocol/
git commit -m "feat(protocol): load_all_configs from dir with duplicate port detection"

Task 7: JSON-RPC method types in rpc.rs

Files:

  • Create: crates/xy-protocol/src/rpc.rs

  • Modify: crates/xy-protocol/src/lib.rs

  • Step 1: Write the types

use crate::ServerState;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerSummary {
    pub name: String,
    pub state: ServerState,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub pid: Option<u32>,
    pub port: u16,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub uptime_secs: Option<u64>,
    pub restart_count: u32,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub last_exit: Option<i32>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusDetail {
    #[serde(flatten)]
    pub summary: ServerSummary,
    pub recent_transitions: Vec<StateTransition>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateTransition {
    pub from: ServerState,
    pub to: ServerState,
    pub at_unix_ms: u64,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub reason: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum NameOrAll {
    All { all: bool },
    Name { name: String },
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusParams { pub name: String }

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StartResult { pub started: Vec<String>, pub already_running: Vec<String> }

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StopResult { pub stopped: Vec<String>, pub not_running: Vec<String> }

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RestartResult { pub restarted: Vec<String> }

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReloadResult {
    pub added: Vec<String>,
    pub removed: Vec<String>,
    pub changed: Vec<String>,
    pub unchanged: Vec<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogsParams {
    pub name: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub tail: Option<u32>,
    #[serde(default)]
    pub follow: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogsSubscribed { pub subscription_id: u64 }

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogsCancelParams { pub subscription_id: u64 }

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum LogStream { Stdout, Stderr }

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogLine {
    pub subscription_id: u64,
    pub name: String,
    pub stream: LogStream,
    pub line: String,
    pub ts_unix_ms: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEnd { pub subscription_id: u64 }

pub mod methods {
    pub const LIST: &str = "list";
    pub const STATUS: &str = "status";
    pub const START: &str = "start";
    pub const STOP: &str = "stop";
    pub const RESTART: &str = "restart";
    pub const RELOAD: &str = "reload";
    pub const LOGS: &str = "logs";
    pub const LOGS_CANCEL: &str = "logs_cancel";
}

pub mod notifications {
    pub const LOG: &str = "log";
    pub const LOG_END: &str = "log_end";
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn name_or_all_round_trips_name() {
        let v: NameOrAll = serde_json::from_str(r#"{"name":"foo"}"#).unwrap();
        match v {
            NameOrAll::Name { name } => assert_eq!(name, "foo"),
            _ => panic!("expected Name variant"),
        }
    }

    #[test]
    fn name_or_all_round_trips_all() {
        let v: NameOrAll = serde_json::from_str(r#"{"all":true}"#).unwrap();
        match v {
            NameOrAll::All { all } => assert!(all),
            _ => panic!("expected All variant"),
        }
    }
}
  • Step 2: Expose

Append to lib.rs:

pub mod rpc;
  • Step 3: Run tests

Run: cargo test -p xy-protocol Expected: 15 passed.

  • Step 4: Commit
git add crates/xy-protocol/
git commit -m "feat(protocol): JSON-RPC method param/result types"

Phase 3 — xy-supervisor

Task 8: ChildHandle trait + MockChild for tests

Files:

  • Create: crates/xy-supervisor/src/child.rs

  • Modify: crates/xy-supervisor/src/lib.rs

  • Step 1: Define the trait + mock

use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};

#[async_trait::async_trait]
pub trait ChildHandle: Send + 'static {
    fn pid(&self) -> u32;
    async fn wait(&mut self) -> std::io::Result<Option<i32>>;
    fn terminate(&mut self) -> std::io::Result<()>;
    fn kill(&mut self) -> std::io::Result<()>;
}

pub struct MockChild {
    pid: u32,
    exit_rx: Arc<Mutex<oneshot::Receiver<Option<i32>>>>,
    terminate_tx: Option<oneshot::Sender<()>>,
    kill_tx: Option<oneshot::Sender<()>>,
}

pub struct MockChildController {
    pub exit_tx: Option<oneshot::Sender<Option<i32>>>,
    pub terminate_rx: oneshot::Receiver<()>,
    pub kill_rx: oneshot::Receiver<()>,
}

impl MockChild {
    pub fn new(pid: u32) -> (Self, MockChildController) {
        let (exit_tx, exit_rx) = oneshot::channel();
        let (terminate_tx, terminate_rx) = oneshot::channel();
        let (kill_tx, kill_rx) = oneshot::channel();
        let child = Self {
            pid,
            exit_rx: Arc::new(Mutex::new(exit_rx)),
            terminate_tx: Some(terminate_tx),
            kill_tx: Some(kill_tx),
        };
        let ctl = MockChildController { exit_tx: Some(exit_tx), terminate_rx, kill_rx };
        (child, ctl)
    }
}

#[async_trait::async_trait]
impl ChildHandle for MockChild {
    fn pid(&self) -> u32 { self.pid }

    async fn wait(&mut self) -> std::io::Result<Option<i32>> {
        let mut rx = self.exit_rx.lock().await;
        match (&mut *rx).await {
            Ok(code) => Ok(code),
            Err(_) => Err(std::io::Error::other("exit_tx dropped")),
        }
    }

    fn terminate(&mut self) -> std::io::Result<()> {
        if let Some(tx) = self.terminate_tx.take() { let _ = tx.send(()); }
        Ok(())
    }

    fn kill(&mut self) -> std::io::Result<()> {
        if let Some(tx) = self.kill_tx.take() { let _ = tx.send(()); }
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn mock_child_exit() {
        let (mut child, mut ctl) = MockChild::new(123);
        assert_eq!(child.pid(), 123);
        ctl.exit_tx.take().unwrap().send(Some(0)).unwrap();
        assert_eq!(child.wait().await.unwrap(), Some(0));
    }

    #[tokio::test]
    async fn mock_child_terminate() {
        let (mut child, mut ctl) = MockChild::new(1);
        child.terminate().unwrap();
        ctl.terminate_rx.try_recv().unwrap();
    }
}
  • Step 2: Update lib.rs
//! Process-supervision primitives for the xy daemon.

pub mod child;

pub use child::{ChildHandle, MockChild, MockChildController};
  • Step 3: Run tests

Run: cargo test -p xy-supervisor Expected: 2 passed.

  • Step 4: Commit
git add crates/xy-supervisor/
git commit -m "feat(supervisor): ChildHandle trait + MockChild"

Task 9: Restart-policy decision logic

Files:

  • Create: crates/xy-supervisor/src/policy.rs

  • Modify: crates/xy-supervisor/src/lib.rs

  • Step 1: Write tests + impl

use xy_protocol::RestartPolicy;

#[derive(Debug, PartialEq, Eq)]
pub enum RestartDecision { Restart, StayStopped, MarkFailed }

pub fn decide(policy: RestartPolicy, exit_code: Option<i32>, retry_cap_reached: bool) -> RestartDecision {
    let clean = matches!(exit_code, Some(0));
    let want = match policy {
        RestartPolicy::Never => false,
        RestartPolicy::OnFailure => !clean,
        RestartPolicy::Always => true,
    };
    if !want { return RestartDecision::StayStopped; }
    if retry_cap_reached { RestartDecision::MarkFailed } else { RestartDecision::Restart }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test] fn never_never_restarts() {
        assert_eq!(decide(RestartPolicy::Never, Some(0), false), RestartDecision::StayStopped);
        assert_eq!(decide(RestartPolicy::Never, Some(1), false), RestartDecision::StayStopped);
        assert_eq!(decide(RestartPolicy::Never, None, false), RestartDecision::StayStopped);
    }

    #[test] fn on_failure_skips_clean() {
        assert_eq!(decide(RestartPolicy::OnFailure, Some(0), false), RestartDecision::StayStopped);
    }

    #[test] fn on_failure_restarts_nonzero() {
        assert_eq!(decide(RestartPolicy::OnFailure, Some(1), false), RestartDecision::Restart);
    }

    #[test] fn on_failure_restarts_signal() {
        assert_eq!(decide(RestartPolicy::OnFailure, None, false), RestartDecision::Restart);
    }

    #[test] fn always_restarts_on_clean() {
        assert_eq!(decide(RestartPolicy::Always, Some(0), false), RestartDecision::Restart);
    }

    #[test] fn cap_reached_marks_failed() {
        assert_eq!(decide(RestartPolicy::Always, Some(0), true), RestartDecision::MarkFailed);
        assert_eq!(decide(RestartPolicy::OnFailure, Some(1), true), RestartDecision::MarkFailed);
    }

    #[test] fn cap_reached_never_still_stopped() {
        assert_eq!(decide(RestartPolicy::Never, Some(1), true), RestartDecision::StayStopped);
    }
}
  • Step 2: Expose

Append to lib.rs: pub mod policy; pub use policy::{decide, RestartDecision};

  • Step 3: Run tests

Run: cargo test -p xy-supervisor Expected: 9 passed.

  • Step 4: Commit
git add crates/xy-supervisor/
git commit -m "feat(supervisor): restart-policy decision logic"

Task 10: Backoff calculator

Files:

  • Create: crates/xy-supervisor/src/backoff.rs

  • Modify: crates/xy-supervisor/src/lib.rs

  • Step 1: Write tests + impl

use std::time::Duration;

#[derive(Debug, Clone)]
pub struct Backoff {
    initial: Duration,
    max: Duration,
    current: Option<Duration>,
}

impl Backoff {
    pub fn new(initial: Duration, max: Duration) -> Self {
        Self { initial, max, current: None }
    }
    pub fn next(&mut self) -> Duration {
        let next = match self.current {
            None => self.initial,
            Some(d) => (d * 2).min(self.max),
        };
        self.current = Some(next);
        next
    }
    pub fn reset(&mut self) { self.current = None; }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test] fn starts_at_initial() {
        let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30));
        assert_eq!(b.next(), Duration::from_secs(1));
    }
    #[test] fn doubles_each_call() {
        let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30));
        assert_eq!(b.next(), Duration::from_secs(1));
        assert_eq!(b.next(), Duration::from_secs(2));
        assert_eq!(b.next(), Duration::from_secs(4));
        assert_eq!(b.next(), Duration::from_secs(8));
    }
    #[test] fn caps_at_max() {
        let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(5));
        for _ in 0..10 { b.next(); }
        assert_eq!(b.next(), Duration::from_secs(5));
    }
    #[test] fn reset_starts_over() {
        let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30));
        b.next(); b.next(); b.next();
        b.reset();
        assert_eq!(b.next(), Duration::from_secs(1));
    }
}
  • Step 2: Expose

Append: pub mod backoff; pub use backoff::Backoff;

  • Step 3: Run tests

Run: cargo test -p xy-supervisor Expected: 13 passed.

  • Step 4: Commit
git add crates/xy-supervisor/
git commit -m "feat(supervisor): exponential backoff calculator"

Task 11: Sliding-60s retry-window tracker

Files:

  • Create: crates/xy-supervisor/src/retry_window.rs

  • Modify: crates/xy-supervisor/src/lib.rs

  • Step 1: Write tests + impl

use std::collections::VecDeque;
use std::time::{Duration, Instant};

#[derive(Debug, Clone)]
pub struct RetryWindow {
    window: Duration,
    cap: u32,
    events: VecDeque<Instant>,
}

impl RetryWindow {
    pub fn new(window: Duration, cap: u32) -> Self {
        Self { window, cap, events: VecDeque::new() }
    }
    pub fn record(&mut self, now: Instant) {
        self.events.push_back(now);
        self.prune(now);
    }
    pub fn cap_reached(&mut self, now: Instant) -> bool {
        self.prune(now);
        self.events.len() as u32 >= self.cap
    }
    pub fn count(&mut self, now: Instant) -> u32 {
        self.prune(now);
        self.events.len() as u32
    }
    fn prune(&mut self, now: Instant) {
        while let Some(&front) = self.events.front() {
            if now.duration_since(front) > self.window { self.events.pop_front(); }
            else { break; }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test] fn below_cap_not_reached() {
        let mut w = RetryWindow::new(Duration::from_secs(60), 3);
        let t = Instant::now(); w.record(t); w.record(t);
        assert!(!w.cap_reached(t));
    }
    #[test] fn at_cap_reached() {
        let mut w = RetryWindow::new(Duration::from_secs(60), 3);
        let t = Instant::now(); w.record(t); w.record(t); w.record(t);
        assert!(w.cap_reached(t));
    }
    #[test] fn old_events_pruned() {
        let mut w = RetryWindow::new(Duration::from_secs(60), 3);
        let t0 = Instant::now(); w.record(t0); w.record(t0); w.record(t0);
        let t1 = t0 + Duration::from_secs(61);
        assert_eq!(w.count(t1), 0);
        assert!(!w.cap_reached(t1));
    }
}
  • Step 2: Expose

Append: pub mod retry_window; pub use retry_window::RetryWindow;

  • Step 3: Run tests

Run: cargo test -p xy-supervisor Expected: 16 passed.

  • Step 4: Commit
git add crates/xy-supervisor/
git commit -m "feat(supervisor): sliding retry-window tracker"

Task 12: Log file writer with rotation

Files:

  • Create: crates/xy-supervisor/src/logs.rs

  • Modify: crates/xy-supervisor/src/lib.rs

  • Step 1: Write RotatingLogWriter + tests

use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};

pub struct RotatingLogWriter {
    base: PathBuf,
    max_bytes: u64,
    keep: usize,
    file: File,
    written: u64,
}

impl RotatingLogWriter {
    pub fn open(base: &Path, max_bytes: u64, keep: usize) -> std::io::Result<Self> {
        if let Some(parent) = base.parent() { std::fs::create_dir_all(parent)?; }
        let file = OpenOptions::new().create(true).append(true).open(base)?;
        let written = file.metadata()?.len();
        Ok(Self { base: base.to_path_buf(), max_bytes, keep, file, written })
    }

    pub fn write_line(&mut self, tag: &str, line: &str) -> std::io::Result<()> {
        let bytes = format!("{tag} {line}\n");
        self.file.write_all(bytes.as_bytes())?;
        self.written += bytes.len() as u64;
        if self.written >= self.max_bytes { self.rotate()?; }
        Ok(())
    }

    fn rotate(&mut self) -> std::io::Result<()> {
        // Drop the open file handle by replacing with /dev/null briefly.
        self.file = OpenOptions::new().read(true).open("/dev/null")?;
        for i in (1..self.keep).rev() {
            let src = self.gen_path(i);
            let dst = self.gen_path(i + 1);
            if src.exists() { let _ = std::fs::rename(&src, &dst); }
        }
        if self.base.exists() { let _ = std::fs::rename(&self.base, self.gen_path(1)); }
        self.file = OpenOptions::new().create(true).append(true).open(&self.base)?;
        self.written = 0;
        Ok(())
    }

    fn gen_path(&self, n: usize) -> PathBuf {
        let mut s = self.base.as_os_str().to_os_string();
        s.push(format!(".{n}"));
        PathBuf::from(s)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;
    use std::io::Read;

    #[test] fn writes_lines_with_tags() {
        let dir = tempdir().unwrap();
        let base = dir.path().join("x.log");
        let mut w = RotatingLogWriter::open(&base, 1024, 3).unwrap();
        w.write_line("[out]", "hello").unwrap();
        w.write_line("[err]", "boom").unwrap();
        let mut s = String::new();
        File::open(&base).unwrap().read_to_string(&mut s).unwrap();
        assert_eq!(s, "[out] hello\n[err] boom\n");
    }

    #[test] fn rotates_at_threshold() {
        let dir = tempdir().unwrap();
        let base = dir.path().join("x.log");
        let mut w = RotatingLogWriter::open(&base, 20, 3).unwrap();
        for _ in 0..5 { w.write_line("[out]", "0123456789").unwrap(); }
        assert!(base.exists());
        let rotated = dir.path().join("x.log.1");
        assert!(rotated.exists(), "expected rotated file at {}", rotated.display());
    }
}
  • Step 2: Expose

Append: pub mod logs; pub use logs::RotatingLogWriter;

  • Step 3: Run tests

Run: cargo test -p xy-supervisor Expected: 18 passed.

  • Step 4: Commit
git add crates/xy-supervisor/
git commit -m "feat(supervisor): rotating log writer"

Task 13: Ring buffer

Files:

  • Modify: crates/xy-supervisor/src/logs.rs

  • Modify: crates/xy-supervisor/src/lib.rs

  • Step 1: Append RingBuffer to logs.rs

use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

#[derive(Clone)]
pub struct RingBuffer {
    inner: Arc<Mutex<RingBufferInner>>,
    capacity_bytes: usize,
}

struct RingBufferInner {
    lines: VecDeque<RecordedLine>,
    bytes: usize,
}

#[derive(Debug, Clone)]
pub struct RecordedLine {
    pub stream: xy_protocol::rpc::LogStream,
    pub line: String,
    pub ts_unix_ms: u64,
}

impl RingBuffer {
    pub fn new(capacity_bytes: usize) -> Self {
        Self {
            inner: Arc::new(Mutex::new(RingBufferInner { lines: VecDeque::new(), bytes: 0 })),
            capacity_bytes,
        }
    }
    pub fn push(&self, line: RecordedLine) {
        let mut g = self.inner.lock().unwrap();
        g.bytes += line.line.len();
        g.lines.push_back(line);
        while g.bytes > self.capacity_bytes {
            if let Some(removed) = g.lines.pop_front() { g.bytes -= removed.line.len(); }
            else { break; }
        }
    }
    pub fn snapshot_tail(&self, n: Option<u32>) -> Vec<RecordedLine> {
        let g = self.inner.lock().unwrap();
        match n {
            None => g.lines.iter().cloned().collect(),
            Some(n) => {
                let take = (n as usize).min(g.lines.len());
                let start = g.lines.len() - take;
                g.lines.iter().skip(start).cloned().collect()
            }
        }
    }
}
  • Step 2: Add tests inside the existing mod tests
    use xy_protocol::rpc::LogStream;

    fn recorded(s: &str) -> RecordedLine {
        RecordedLine { stream: LogStream::Stdout, line: s.to_string(), ts_unix_ms: 0 }
    }

    #[test] fn ring_buffer_drops_oldest_when_full() {
        let rb = RingBuffer::new(10);
        rb.push(recorded("aaaaa"));
        rb.push(recorded("bbbbb"));
        rb.push(recorded("ccc"));
        let snap = rb.snapshot_tail(None);
        assert_eq!(snap.len(), 2);
        assert_eq!(snap[0].line, "bbbbb");
        assert_eq!(snap[1].line, "ccc");
    }

    #[test] fn ring_buffer_tail_n() {
        let rb = RingBuffer::new(1024);
        for i in 0..5 { rb.push(recorded(&format!("line{i}"))); }
        let snap = rb.snapshot_tail(Some(2));
        assert_eq!(snap.len(), 2);
        assert_eq!(snap[0].line, "line3");
        assert_eq!(snap[1].line, "line4");
    }
  • Step 3: Expose

Append: pub use logs::{RecordedLine, RingBuffer};

  • Step 4: Run tests

Run: cargo test -p xy-supervisor Expected: 20 passed.

  • Step 5: Commit
git add crates/xy-supervisor/
git commit -m "feat(supervisor): ring buffer for recent log lines"

Task 14: LogSink — fan-out to file, ring buffer, broadcast

Files:

  • Modify: crates/xy-supervisor/src/logs.rs

  • Modify: crates/xy-supervisor/src/lib.rs

  • Step 1: Append LogSink to logs.rs

use tokio::sync::broadcast;
use xy_protocol::rpc::{LogLine, LogStream};

const LOG_BROADCAST_CAP: usize = 256;

#[derive(Clone)]
pub struct LogSink {
    pub server_name: String,
    writer: Arc<Mutex<RotatingLogWriter>>,
    pub ring: RingBuffer,
    pub broadcast: broadcast::Sender<LogLine>,
}

impl LogSink {
    pub fn new(server_name: String, writer: RotatingLogWriter, ring_capacity_bytes: usize) -> Self {
        let (tx, _) = broadcast::channel(LOG_BROADCAST_CAP);
        Self {
            server_name,
            writer: Arc::new(Mutex::new(writer)),
            ring: RingBuffer::new(ring_capacity_bytes),
            broadcast: tx,
        }
    }
    pub fn record(&self, stream: LogStream, line: String) {
        let ts = now_unix_ms();
        let tag = match stream { LogStream::Stdout => "[out]", LogStream::Stderr => "[err]" };
        if let Err(e) = self.writer.lock().unwrap().write_line(tag, &line) {
            tracing::warn!(server = %self.server_name, error = %e, "log file write failed");
        }
        self.ring.push(RecordedLine { stream, line: line.clone(), ts_unix_ms: ts });
        let _ = self.broadcast.send(LogLine {
            subscription_id: 0,
            name: self.server_name.clone(),
            stream, line, ts_unix_ms: ts,
        });
    }
}

fn now_unix_ms() -> u64 {
    std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.as_millis() as u64).unwrap_or(0)
}
  • Step 2: Add a tokio test inside mod tests
    #[tokio::test]
    async fn log_sink_records_and_broadcasts() {
        let dir = tempdir().unwrap();
        let writer = RotatingLogWriter::open(&dir.path().join("s.log"), 1024, 3).unwrap();
        let sink = LogSink::new("s".to_string(), writer, 1024);
        let mut rx = sink.broadcast.subscribe();
        sink.record(LogStream::Stdout, "hello".to_string());
        let got = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv()).await.unwrap().unwrap();
        assert_eq!(got.line, "hello");
        assert_eq!(got.stream, LogStream::Stdout);
        assert_eq!(sink.ring.snapshot_tail(None).len(), 1);
    }
  • Step 3: Expose

Append: pub use logs::LogSink;

  • Step 4: Run tests

Run: cargo test -p xy-supervisor Expected: 21 passed.

  • Step 5: Commit
git add crates/xy-supervisor/
git commit -m "feat(supervisor): LogSink fans out to file, ring buffer, broadcast"

Task 15: RealChild — real-process implementation of ChildHandle

Files:

  • Modify: crates/xy-supervisor/src/child.rs

  • Modify: crates/xy-supervisor/src/lib.rs

  • Step 1: Append RealChild and spawn_with_logs to child.rs

use crate::logs::LogSink;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use std::os::unix::process::CommandExt;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child as TokioChild, Command};
use xy_protocol::{rpc::LogStream, ServerConfig};

pub struct RealChild {
    pid: u32,
    pgid: Pid,
    child: Option<TokioChild>,
}

impl RealChild {
    pub fn pgid(&self) -> Pid { self.pgid }
}

#[async_trait::async_trait]
impl ChildHandle for RealChild {
    fn pid(&self) -> u32 { self.pid }

    async fn wait(&mut self) -> std::io::Result<Option<i32>> {
        let child = self.child.as_mut().ok_or_else(|| std::io::Error::other("already waited"))?;
        let status = child.wait().await?;
        Ok(status.code())
    }

    fn terminate(&mut self) -> std::io::Result<()> {
        kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGTERM)
            .map_err(|e| std::io::Error::other(e.to_string()))
    }

    fn kill(&mut self) -> std::io::Result<()> {
        kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGKILL)
            .map_err(|e| std::io::Error::other(e.to_string()))
    }
}

pub fn spawn_with_logs(cfg: &ServerConfig, sink: LogSink) -> std::io::Result<RealChild> {
    let mut cmd = Command::new(&cfg.command);
    cmd.args(&cfg.args);
    for (k, v) in &cfg.env { cmd.env(k, v); }
    if let Some(dir) = &cfg.working_dir { cmd.current_dir(dir); }
    cmd.stdout(Stdio::piped());
    cmd.stderr(Stdio::piped());
    cmd.kill_on_drop(true);

    // Own process group so signals reach the whole tree.
    unsafe {
        cmd.pre_exec(|| {
            nix::unistd::setpgid(Pid::from_raw(0), Pid::from_raw(0))
                .map_err(|e| std::io::Error::other(e.to_string()))
        });
    }

    let mut child = cmd.spawn()?;
    let pid = child.id().ok_or_else(|| std::io::Error::other("no pid"))?;
    let pgid = Pid::from_raw(pid as i32);

    if let Some(out) = child.stdout.take() { spawn_pump(out, sink.clone(), LogStream::Stdout); }
    if let Some(err) = child.stderr.take() { spawn_pump(err, sink.clone(), LogStream::Stderr); }

    Ok(RealChild { pid, pgid, child: Some(child) })
}

fn spawn_pump<R: tokio::io::AsyncRead + Unpin + Send + 'static>(
    reader: R, sink: LogSink, stream: LogStream,
) {
    tokio::spawn(async move {
        let mut lines = BufReader::new(reader).lines();
        loop {
            match lines.next_line().await {
                Ok(Some(line)) => sink.record(stream, line),
                Ok(None) => break,
                Err(e) => {
                    tracing::warn!(server = %sink.server_name, error = %e, ?stream, "log pump read error");
                    break;
                }
            }
        }
    });
}
  • Step 2: Expose

Append to lib.rs: pub use child::{RealChild, spawn_with_logs};

  • Step 3: Build

Run: cargo build -p xy-supervisor Expected: compiles. Fix any nix API mismatches inline.

  • Step 4: Commit
git add crates/xy-supervisor/
git commit -m "feat(supervisor): RealChild + spawn_with_logs"

Task 16: Supervisor task state machine

Files:

  • Create: crates/xy-supervisor/src/supervisor.rs

  • Modify: crates/xy-supervisor/src/lib.rs

  • Step 1: Define cmd types + handle

use crate::{
    backoff::Backoff, child::ChildHandle, logs::LogSink,
    policy::{decide, RestartDecision}, retry_window::RetryWindow,
};
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::sleep;
use tracing::{debug, info, warn};
use xy_protocol::{ServerConfig, ServerState};

pub enum SupervisorCmd {
    Start { ack: oneshot::Sender<StartAck> },
    Stop { ack: oneshot::Sender<StopAck> },
    Restart { ack: oneshot::Sender<()> },
    Reconfigure { new: ServerConfig, ack: oneshot::Sender<()> },
    Shutdown { ack: oneshot::Sender<()> },
}

#[derive(Debug, PartialEq, Eq)]
pub enum StartAck { Started, AlreadyRunning }

#[derive(Debug, PartialEq, Eq)]
pub enum StopAck { Stopped, NotRunning }

#[derive(Clone)]
pub struct SupervisorHandle {
    pub name: String,
    pub tx: mpsc::Sender<SupervisorCmd>,
    pub state: watch::Receiver<ServerState>,
    pub log_sink: LogSink,
}

#[async_trait::async_trait]
pub trait Spawner: Send + 'static {
    type Child: ChildHandle;
    async fn spawn(&self, cfg: &ServerConfig, sink: LogSink) -> std::io::Result<Self::Child>;
}
  • Step 2: Append the task + run loop
pub struct SupervisorTask<S: Spawner> {
    cfg: ServerConfig,
    log_sink: LogSink,
    spawner: S,
    state_tx: watch::Sender<ServerState>,
    cmd_rx: mpsc::Receiver<SupervisorCmd>,
    backoff: Backoff,
    retry_window: RetryWindow,
    restart_count: u32,
    last_exit: Option<i32>,
    started_at: Option<Instant>,
}

impl<S: Spawner> SupervisorTask<S> {
    pub fn new(
        cfg: ServerConfig, log_sink: LogSink, spawner: S,
        state_tx: watch::Sender<ServerState>, cmd_rx: mpsc::Receiver<SupervisorCmd>,
    ) -> Self {
        let backoff = Backoff::new(cfg.restart.backoff_initial, cfg.restart.backoff_max);
        let retry_window = RetryWindow::new(Duration::from_secs(60), cfg.restart.max_retries_per_minute);
        Self { cfg, log_sink, spawner, state_tx, cmd_rx, backoff, retry_window,
            restart_count: 0, last_exit: None, started_at: None }
    }

    fn set_state(&self, s: ServerState) { let _ = self.state_tx.send(s); }

    pub async fn run(mut self) {
        let mut child: Option<S::Child> = None;
        loop {
            tokio::select! {
                cmd = self.cmd_rx.recv() => {
                    let Some(cmd) = cmd else { break; };
                    match cmd {
                        SupervisorCmd::Start { ack } => {
                            if child.is_some() { let _ = ack.send(StartAck::AlreadyRunning); }
                            else {
                                match self.do_start().await {
                                    Ok(c) => { child = Some(c); let _ = ack.send(StartAck::Started); }
                                    Err(e) => {
                                        warn!(name = %self.cfg.name, error = %e, "spawn failed");
                                        self.set_state(ServerState::Failed);
                                        let _ = ack.send(StartAck::Started);
                                    }
                                }
                            }
                        }
                        SupervisorCmd::Stop { ack } => {
                            if let Some(c) = child.take() { self.do_stop(c).await; let _ = ack.send(StopAck::Stopped); }
                            else { let _ = ack.send(StopAck::NotRunning); }
                        }
                        SupervisorCmd::Restart { ack } => {
                            if let Some(c) = child.take() {
                                self.set_state(ServerState::Restarting);
                                self.do_stop(c).await;
                            }
                            match self.do_start().await {
                                Ok(c) => child = Some(c),
                                Err(e) => {
                                    warn!(name = %self.cfg.name, error = %e, "restart spawn failed");
                                    self.set_state(ServerState::Failed);
                                }
                            }
                            let _ = ack.send(());
                        }
                        SupervisorCmd::Reconfigure { new, ack } => {
                            self.cfg = new;
                            self.backoff = Backoff::new(self.cfg.restart.backoff_initial, self.cfg.restart.backoff_max);
                            self.retry_window = RetryWindow::new(Duration::from_secs(60), self.cfg.restart.max_retries_per_minute);
                            let _ = ack.send(());
                        }
                        SupervisorCmd::Shutdown { ack } => {
                            if let Some(c) = child.take() { self.do_stop(c).await; }
                            let _ = ack.send(());
                            return;
                        }
                    }
                }
                code = wait_child(&mut child) => {
                    self.last_exit = code;
                    let now = Instant::now();
                    self.retry_window.record(now);
                    let cap = self.retry_window.cap_reached(now);
                    let decision = decide(self.cfg.restart.policy, code, cap);
                    debug!(name = %self.cfg.name, ?code, ?decision, "child exited");
                    match decision {
                        RestartDecision::StayStopped => {
                            self.started_at = None;
                            self.set_state(ServerState::Stopped);
                        }
                        RestartDecision::MarkFailed => {
                            self.started_at = None;
                            self.set_state(ServerState::Failed);
                        }
                        RestartDecision::Restart => {
                            self.set_state(ServerState::Restarting);
                            let delay = self.backoff.next();
                            sleep(delay).await;
                            match self.do_start().await {
                                Ok(c) => child = Some(c),
                                Err(e) => {
                                    warn!(name = %self.cfg.name, error = %e, "restart spawn failed");
                                    self.set_state(ServerState::Failed);
                                }
                            }
                        }
                    }
                }
            }
        }
    }

    async fn do_start(&mut self) -> std::io::Result<S::Child> {
        self.set_state(ServerState::Starting);
        let c = self.spawner.spawn(&self.cfg, self.log_sink.clone()).await?;
        self.restart_count = self.restart_count.saturating_add(1);
        self.started_at = Some(Instant::now());
        self.backoff.reset();
        self.set_state(ServerState::Running);
        info!(name = %self.cfg.name, pid = c.pid(), "started");
        Ok(c)
    }

    async fn do_stop(&mut self, mut c: S::Child) {
        self.set_state(ServerState::Stopping);
        let _ = c.terminate();
        let grace = self.cfg.stop.grace;
        match tokio::time::timeout(grace, c.wait()).await {
            Ok(_) => {}
            Err(_) => {
                let _ = c.kill();
                let _ = c.wait().await;
            }
        }
        self.started_at = None;
        self.set_state(ServerState::Stopped);
    }
}

async fn wait_child<C: ChildHandle>(slot: &mut Option<C>) -> Option<i32> {
    match slot.as_mut() {
        Some(c) => c.wait().await.ok().flatten(),
        None => std::future::pending().await,
    }
}

pub struct RealSpawner;

#[async_trait::async_trait]
impl Spawner for RealSpawner {
    type Child = crate::child::RealChild;
    async fn spawn(&self, cfg: &ServerConfig, sink: LogSink) -> std::io::Result<Self::Child> {
        crate::child::spawn_with_logs(cfg, sink)
    }
}
  • Step 3: Add a smoke test in supervisor.rs
#[cfg(test)]
mod tests {
    use super::*;
    use crate::child::{MockChild, MockChildController};
    use crate::logs::{LogSink, RotatingLogWriter};
    use std::sync::{Arc, Mutex};
    use tempfile::tempdir;
    use xy_protocol::{RestartConfig, RestartPolicy, StopConfig};

    struct QueueSpawner {
        queue: Arc<Mutex<Vec<MockChild>>>,
    }

    #[async_trait::async_trait]
    impl Spawner for QueueSpawner {
        type Child = MockChild;
        async fn spawn(&self, _cfg: &ServerConfig, _sink: LogSink) -> std::io::Result<MockChild> {
            let mut q = self.queue.lock().unwrap();
            Ok(q.remove(0))
        }
    }

    fn cfg(name: &str, policy: RestartPolicy, max_retries: u32) -> ServerConfig {
        ServerConfig {
            name: name.to_string(),
            command: "/bin/true".into(),
            args: vec![], port: 1,
            env: Default::default(), working_dir: None,
            restart: RestartConfig {
                policy,
                backoff_initial: Duration::from_millis(1),
                backoff_max: Duration::from_millis(1),
                max_retries_per_minute: max_retries,
            },
            stop: StopConfig { grace: Duration::from_millis(50) },
        }
    }

    fn sink(name: &str) -> LogSink {
        let dir = tempdir().unwrap();
        let writer = RotatingLogWriter::open(&dir.path().join("s.log"), 1024, 3).unwrap();
        std::mem::forget(dir);
        LogSink::new(name.to_string(), writer, 1024)
    }

    async fn wait_for(rx: &mut watch::Receiver<ServerState>, want: ServerState) {
        let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
        loop {
            if *rx.borrow() == want { return; }
            tokio::select! {
                _ = rx.changed() => {}
                _ = tokio::time::sleep_until(deadline) => panic!("never reached {want:?}, last={:?}", *rx.borrow()),
            }
        }
    }

    #[tokio::test]
    async fn start_runs_to_running_and_stop_to_stopped() {
        let (mock, mut ctl) = MockChild::new(1);
        let queue = Arc::new(Mutex::new(vec![mock]));
        let spawner = QueueSpawner { queue };

        let (state_tx, mut state_rx) = watch::channel(ServerState::Stopped);
        let (cmd_tx, cmd_rx) = mpsc::channel(8);
        let task = SupervisorTask::new(cfg("x", RestartPolicy::Never, 5), sink("x"), spawner, state_tx, cmd_rx);
        let h = tokio::spawn(task.run());

        let (ack_tx, ack_rx) = oneshot::channel();
        cmd_tx.send(SupervisorCmd::Start { ack: ack_tx }).await.unwrap();
        assert_eq!(ack_rx.await.unwrap(), StartAck::Started);
        wait_for(&mut state_rx, ServerState::Running).await;

        // Trigger child exit via the controller.
        ctl.exit_tx.take().unwrap().send(Some(0)).unwrap();
        wait_for(&mut state_rx, ServerState::Stopped).await;

        let (ack_tx, ack_rx) = oneshot::channel();
        cmd_tx.send(SupervisorCmd::Shutdown { ack: ack_tx }).await.unwrap();
        ack_rx.await.unwrap();
        h.await.unwrap();
    }
}
  • Step 4: Update lib.rs
pub mod supervisor;

pub use supervisor::{
    RealSpawner, Spawner, StartAck, StopAck, SupervisorCmd, SupervisorHandle, SupervisorTask,
};
  • Step 5: Run tests

Run: cargo test -p xy-supervisor Expected: 22 passed.

  • Step 6: Commit
git add crates/xy-supervisor/
git commit -m "feat(supervisor): supervisor task with state machine"

Phase 4 — xy-ipc

Task 17: JSON-RPC envelope types

Files:

  • Create: crates/xy-ipc/src/envelope.rs

  • Modify: crates/xy-ipc/src/lib.rs

  • Step 1: Define envelope types

use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Request {
    pub jsonrpc: String,
    pub id: serde_json::Value,
    pub method: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub params: Option<Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Notification {
    pub jsonrpc: String,
    pub method: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub params: Option<Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Response {
    pub jsonrpc: String,
    pub id: serde_json::Value,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub result: Option<Value>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub error: Option<RpcError>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpcError {
    pub code: i32,
    pub message: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub data: Option<Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Incoming {
    Request(Request),
    Response(Response),
    Notification(Notification),
}

pub const JSONRPC_VERSION: &str = "2.0";

pub fn request(id: u64, method: &str, params: Option<Value>) -> Request {
    Request { jsonrpc: JSONRPC_VERSION.into(), id: serde_json::json!(id), method: method.into(), params }
}

pub fn notification(method: &str, params: Option<Value>) -> Notification {
    Notification { jsonrpc: JSONRPC_VERSION.into(), method: method.into(), params }
}

pub fn ok_response(id: serde_json::Value, result: Value) -> Response {
    Response { jsonrpc: JSONRPC_VERSION.into(), id, result: Some(result), error: None }
}

pub fn err_response(id: serde_json::Value, code: i32, message: String) -> Response {
    Response { jsonrpc: JSONRPC_VERSION.into(), id, result: None,
        error: Some(RpcError { code, message, data: None }) }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test] fn request_round_trip() {
        let r = request(7, "list", None);
        let s = serde_json::to_string(&r).unwrap();
        let back: Request = serde_json::from_str(&s).unwrap();
        assert_eq!(back.method, "list");
        assert_eq!(back.id, serde_json::json!(7));
    }

    #[test] fn incoming_is_response() {
        let s = r#"{"jsonrpc":"2.0","id":1,"result":{"ok":true}}"#;
        let i: Incoming = serde_json::from_str(s).unwrap();
        assert!(matches!(i, Incoming::Response(_)));
    }

    #[test] fn incoming_is_notification() {
        let s = r#"{"jsonrpc":"2.0","method":"log","params":{}}"#;
        let i: Incoming = serde_json::from_str(s).unwrap();
        assert!(matches!(i, Incoming::Notification(_)));
    }
}
  • Step 2: Expose
//! JSON-RPC 2.0 over newline-delimited JSON on a Unix socket.

pub mod envelope;

pub use envelope::{
    err_response, notification, ok_response, request, Incoming, Notification, Request, Response, RpcError,
};
  • Step 3: Run tests

Run: cargo test -p xy-ipc Expected: 3 passed.

  • Step 4: Commit
git add crates/xy-ipc/
git commit -m "feat(ipc): JSON-RPC envelope types"

Task 18: Newline-delimited framing helpers

Files:

  • Create: crates/xy-ipc/src/framing.rs

  • Modify: crates/xy-ipc/src/lib.rs

  • Step 1: Implement JsonFramed

use serde::Serialize;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;

pub struct JsonFramed {
    reader: BufReader<tokio::net::unix::OwnedReadHalf>,
    writer: tokio::net::unix::OwnedWriteHalf,
}

impl JsonFramed {
    pub fn new(stream: UnixStream) -> Self {
        let (r, w) = stream.into_split();
        Self { reader: BufReader::new(r), writer: w }
    }

    pub async fn read<T: serde::de::DeserializeOwned>(&mut self) -> std::io::Result<Option<T>> {
        let mut buf = String::new();
        let n = self.reader.read_line(&mut buf).await?;
        if n == 0 { return Ok(None); }
        let v: T = serde_json::from_str(buf.trim_end())
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
        Ok(Some(v))
    }

    pub async fn write<T: Serialize>(&mut self, value: &T) -> std::io::Result<()> {
        let mut bytes = serde_json::to_vec(value)
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
        bytes.push(b'\n');
        self.writer.write_all(&bytes).await?;
        self.writer.flush().await
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde::Deserialize;

    #[derive(Debug, Serialize, Deserialize, PartialEq)]
    struct M { x: u32, name: String }

    #[tokio::test]
    async fn round_trip_over_socket_pair() {
        let (a, b) = UnixStream::pair().unwrap();
        let mut sa = JsonFramed::new(a);
        let mut sb = JsonFramed::new(b);
        sa.write(&M { x: 1, name: "hi".into() }).await.unwrap();
        let got: Option<M> = sb.read().await.unwrap();
        assert_eq!(got, Some(M { x: 1, name: "hi".into() }));
    }

    #[tokio::test]
    async fn eof_returns_none() {
        let (a, b) = UnixStream::pair().unwrap();
        drop(a);
        let mut sb = JsonFramed::new(b);
        let got: Option<M> = sb.read().await.unwrap();
        assert!(got.is_none());
    }
}
  • Step 2: Expose

Append: pub mod framing; pub use framing::JsonFramed;

  • Step 3: Run tests

Run: cargo test -p xy-ipc Expected: 5 passed.

  • Step 4: Commit
git add crates/xy-ipc/
git commit -m "feat(ipc): newline-delimited JSON framing"

Task 19: Client helper (call + subscribe)

Files:

  • Create: crates/xy-ipc/src/client.rs

  • Modify: crates/xy-ipc/src/lib.rs

  • Step 1: Implement client

use crate::envelope::{Incoming, Notification, Request};
use crate::framing::JsonFramed;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::path::Path;
use thiserror::Error;
use tokio::net::UnixStream;

#[derive(Debug, Error)]
pub enum ClientError {
    #[error("io: {0}")] Io(#[from] std::io::Error),
    #[error("rpc error {code}: {message}")] Rpc { code: i32, message: String },
    #[error("unexpected message kind from daemon")] Unexpected,
    #[error("daemon unreachable: {0}")] Unreachable(std::io::Error),
    #[error("serialization: {0}")] Serde(#[from] serde_json::Error),
}

pub struct Client {
    framed: JsonFramed,
    next_id: u64,
}

impl Client {
    pub async fn connect(socket_path: &Path) -> Result<Self, ClientError> {
        let stream = UnixStream::connect(socket_path).await.map_err(ClientError::Unreachable)?;
        Ok(Self { framed: JsonFramed::new(stream), next_id: 1 })
    }

    pub async fn call<P: Serialize, R: DeserializeOwned>(&mut self, method: &str, params: &P) -> Result<R, ClientError> {
        let id = self.next_id; self.next_id += 1;
        let params_val = serde_json::to_value(params)?;
        let req = crate::envelope::request(id, method, Some(params_val));
        self.framed.write(&req).await?;
        loop {
            let msg: Option<Incoming> = self.framed.read().await?;
            let Some(msg) = msg else {
                return Err(ClientError::Unreachable(std::io::Error::from(std::io::ErrorKind::UnexpectedEof)));
            };
            match msg {
                Incoming::Response(r) => {
                    if r.id != serde_json::json!(id) { return Err(ClientError::Unexpected); }
                    if let Some(err) = r.error { return Err(ClientError::Rpc { code: err.code, message: err.message }); }
                    let result = r.result.unwrap_or(serde_json::Value::Null);
                    return Ok(serde_json::from_value(result)?);
                }
                Incoming::Notification(_) => continue,
                Incoming::Request(_) => return Err(ClientError::Unexpected),
            }
        }
    }

    pub async fn call_no_params<R: DeserializeOwned>(&mut self, method: &str) -> Result<R, ClientError> {
        let id = self.next_id; self.next_id += 1;
        let req = Request { jsonrpc: "2.0".into(), id: serde_json::json!(id), method: method.into(), params: None };
        self.framed.write(&req).await?;
        let msg: Option<Incoming> = self.framed.read().await?;
        let Some(Incoming::Response(r)) = msg else { return Err(ClientError::Unexpected); };
        if let Some(err) = r.error { return Err(ClientError::Rpc { code: err.code, message: err.message }); }
        Ok(serde_json::from_value(r.result.unwrap_or(serde_json::Value::Null))?)
    }

    pub async fn read_notification(&mut self) -> Result<Option<Notification>, ClientError> {
        loop {
            let msg: Option<Incoming> = self.framed.read().await?;
            match msg {
                None => return Ok(None),
                Some(Incoming::Notification(n)) => return Ok(Some(n)),
                Some(Incoming::Response(_)) => continue,
                Some(Incoming::Request(_)) => return Err(ClientError::Unexpected),
            }
        }
    }

    pub async fn send_notification(&mut self, n: &Notification) -> Result<(), ClientError> {
        self.framed.write(n).await?; Ok(())
    }
}
  • Step 2: Expose

Append: pub mod client; pub use client::{Client, ClientError};

  • Step 3: Build

Run: cargo build -p xy-ipc Expected: compiles.

  • Step 4: Commit
git add crates/xy-ipc/
git commit -m "feat(ipc): client with call + notification reader"

Task 20: Server helper (bind + Connection)

Files:

  • Create: crates/xy-ipc/src/server.rs

  • Modify: crates/xy-ipc/src/lib.rs

  • Step 1: Implement server primitives

use crate::envelope::{Incoming, Notification, Response};
use crate::framing::JsonFramed;
use std::path::Path;
use std::sync::Arc;
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::Mutex;

pub struct Connection {
    inner: Arc<Mutex<JsonFramed>>,
}

impl Connection {
    pub fn new(stream: UnixStream) -> Self {
        Self { inner: Arc::new(Mutex::new(JsonFramed::new(stream))) }
    }

    pub async fn read_incoming(&self) -> std::io::Result<Option<Incoming>> {
        let mut g = self.inner.lock().await;
        g.read::<Incoming>().await
    }

    pub async fn write_response(&self, r: &Response) -> std::io::Result<()> {
        let mut g = self.inner.lock().await;
        g.write(r).await
    }

    pub async fn write_notification(&self, n: &Notification) -> std::io::Result<()> {
        let mut g = self.inner.lock().await;
        g.write(n).await
    }
}

pub fn bind(socket_path: &Path) -> std::io::Result<UnixListener> {
    if socket_path.exists() { std::fs::remove_file(socket_path)?; }
    if let Some(parent) = socket_path.parent() { std::fs::create_dir_all(parent)?; }
    let listener = UnixListener::bind(socket_path)?;
    use std::os::unix::fs::PermissionsExt;
    std::fs::set_permissions(socket_path, std::fs::Permissions::from_mode(0o600))?;
    Ok(listener)
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;

    #[tokio::test]
    async fn bind_creates_socket_with_0600() {
        let dir = tempdir().unwrap();
        let path = dir.path().join("x.sock");
        let _listener = bind(&path).unwrap();
        use std::os::unix::fs::PermissionsExt;
        let mode = std::fs::metadata(&path).unwrap().permissions().mode() & 0o777;
        assert_eq!(mode, 0o600);
    }
}
  • Step 2: Expose

Append: pub mod server; pub use server::{bind, Connection};

  • Step 3: Run tests

Run: cargo test -p xy-ipc Expected: 6 passed.

  • Step 4: Commit
git add crates/xy-ipc/
git commit -m "feat(ipc): server bind + Connection wrapper"

Phase 5 — xy binary

Task 21: XDG path resolution

Files:

  • Create: crates/xy/src/paths.rs

  • Modify: crates/xy/src/main.rs

  • Step 1: Implement

use etcetera::base_strategy::{BaseStrategy, Xdg};
use std::path::PathBuf;

#[derive(Debug, Clone)]
pub struct Paths {
    pub config_dir: PathBuf,
    pub state_dir: PathBuf,
    pub log_dir: PathBuf,
    pub socket: PathBuf,
    pub pidfile: PathBuf,
}

impl Paths {
    pub fn resolve() -> std::io::Result<Self> {
        let xdg = Xdg::new().map_err(std::io::Error::other)?;
        let config_dir = xdg.config_dir().join("xy").join("servers");
        let state_dir = xdg.state_dir().unwrap_or_else(|| xdg.data_dir()).join("xy");
        let log_dir = state_dir.join("logs");
        let socket = std::env::var_os("XDG_RUNTIME_DIR")
            .map(|p| PathBuf::from(p).join("xy.sock"))
            .unwrap_or_else(|| state_dir.join("xy.sock"));
        let pidfile = state_dir.join("xy.pid");
        Ok(Self { config_dir, state_dir, log_dir, socket, pidfile })
    }

    pub fn ensure_dirs(&self) -> std::io::Result<()> {
        std::fs::create_dir_all(&self.state_dir)?;
        std::fs::create_dir_all(&self.log_dir)?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test] fn resolves_without_panicking() {
        let p = Paths::resolve().unwrap();
        assert!(p.pidfile.starts_with(&p.state_dir));
    }
}
  • Step 2: Reference from main.rs
mod paths;

fn main() {
    let p = paths::Paths::resolve().unwrap();
    eprintln!("xy: socket would be at {}", p.socket.display());
}
  • Step 3: Run tests

Run: cargo test -p xy Expected: 1 passed.

  • Step 4: Commit
git add crates/xy/
git commit -m "feat(xy): XDG path resolution"

Task 22: Pidfile guard

Files:

  • Create: crates/xy/src/pidfile.rs

  • Modify: crates/xy/src/main.rs

  • Step 1: Implement

use std::fs::{File, OpenOptions};
use std::io::Write;
use std::os::unix::fs::OpenOptionsExt;
use std::path::{Path, PathBuf};

pub struct PidFile {
    path: PathBuf,
    _file: File,
}

impl PidFile {
    pub fn acquire(path: &Path) -> std::io::Result<Self> {
        let mut f = OpenOptions::new()
            .write(true).create_new(true).mode(0o600).open(path)?;
        writeln!(f, "{}", std::process::id())?;
        Ok(Self { path: path.to_path_buf(), _file: f })
    }
}

impl Drop for PidFile {
    fn drop(&mut self) { let _ = std::fs::remove_file(&self.path); }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;

    #[test] fn acquire_then_second_fails() {
        let dir = tempdir().unwrap();
        let p = dir.path().join("x.pid");
        let _g = PidFile::acquire(&p).unwrap();
        let err = PidFile::acquire(&p).unwrap_err();
        assert_eq!(err.kind(), std::io::ErrorKind::AlreadyExists);
    }

    #[test] fn drop_removes_file() {
        let dir = tempdir().unwrap();
        let p = dir.path().join("x.pid");
        { let _g = PidFile::acquire(&p).unwrap(); assert!(p.exists()); }
        assert!(!p.exists());
    }
}
  • Step 2: Reference from main.rs
mod paths;
mod pidfile;

fn main() {
    let p = paths::Paths::resolve().unwrap();
    eprintln!("xy: socket would be at {}", p.socket.display());
}
  • Step 3: Run tests

Run: cargo test -p xy Expected: 3 passed.

  • Step 4: Commit
git add crates/xy/
git commit -m "feat(xy): exclusive pidfile guard"

Task 23: clap CLI definitions

Files:

  • Modify: crates/xy/src/main.rs

  • Create: crates/xy/src/cli/mod.rs (stub)

  • Create: crates/xy/src/daemon/mod.rs (stub)

  • Step 1: Replace main.rs

use clap::{Parser, Subcommand};

mod cli;
mod daemon;
mod paths;
mod pidfile;

#[derive(Debug, Parser)]
#[command(name = "xy", version, about = "HTTP MCP server supervisor")]
struct Cli {
    #[command(subcommand)]
    cmd: Cmd,
}

#[derive(Debug, Subcommand)]
enum Cmd {
    /// Run the daemon in the foreground.
    Daemon,
    /// List all configured servers with state.
    List,
    /// Show detailed status for a single server.
    Status { name: String },
    /// Start a server (or all configured servers with --all).
    Start {
        #[arg(long, conflicts_with = "name")] all: bool,
        #[arg(required_unless_present = "all")] name: Option<String>,
    },
    /// Stop a server (or --all).
    Stop {
        #[arg(long, conflicts_with = "name")] all: bool,
        #[arg(required_unless_present = "all")] name: Option<String>,
    },
    /// Restart a server (or --all).
    Restart {
        #[arg(long, conflicts_with = "name")] all: bool,
        #[arg(required_unless_present = "all")] name: Option<String>,
    },
    /// Re-read config dir and reconcile running servers.
    Reload,
    /// Stream a server's log.
    Logs {
        name: String,
        #[arg(long)] tail: Option<u32>,
        #[arg(short = 'f', long)] follow: bool,
    },
}

#[tokio::main]
async fn main() -> std::process::ExitCode {
    tracing_subscriber::fmt()
        .with_env_filter(tracing_subscriber::EnvFilter::try_from_default_env()
            .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")))
        .with_writer(std::io::stderr)
        .init();

    let cli = Cli::parse();
    let paths = match paths::Paths::resolve() {
        Ok(p) => p,
        Err(e) => { eprintln!("xy: failed to resolve XDG paths: {e}"); return std::process::ExitCode::from(3); }
    };

    let result: anyhow::Result<i32> = match cli.cmd {
        Cmd::Daemon => daemon::run(paths).await.map(|_| 0),
        Cmd::List => cli::list(paths).await,
        Cmd::Status { name } => cli::status(paths, name).await,
        Cmd::Start { all, name } => cli::start(paths, all, name).await,
        Cmd::Stop { all, name } => cli::stop(paths, all, name).await,
        Cmd::Restart { all, name } => cli::restart(paths, all, name).await,
        Cmd::Reload => cli::reload(paths).await,
        Cmd::Logs { name, tail, follow } => cli::logs(paths, name, tail, follow).await,
    };

    match result {
        Ok(code) => std::process::ExitCode::from(code as u8),
        Err(e) => { eprintln!("xy: {e:#}"); std::process::ExitCode::from(1) }
    }
}
  • Step 2: Stub cli/mod.rs
use crate::paths::Paths;
use anyhow::{bail, Result};

pub async fn list(_p: Paths) -> Result<i32> { bail!("not implemented") }
pub async fn status(_p: Paths, _name: String) -> Result<i32> { bail!("not implemented") }
pub async fn start(_p: Paths, _all: bool, _name: Option<String>) -> Result<i32> { bail!("not implemented") }
pub async fn stop(_p: Paths, _all: bool, _name: Option<String>) -> Result<i32> { bail!("not implemented") }
pub async fn restart(_p: Paths, _all: bool, _name: Option<String>) -> Result<i32> { bail!("not implemented") }
pub async fn reload(_p: Paths) -> Result<i32> { bail!("not implemented") }
pub async fn logs(_p: Paths, _name: String, _tail: Option<u32>, _follow: bool) -> Result<i32> { bail!("not implemented") }
  • Step 3: Stub daemon/mod.rs
use crate::paths::Paths;
use anyhow::{bail, Result};

pub async fn run(_paths: Paths) -> Result<()> { bail!("not implemented") }
  • Step 4: Build + verify --help

Run: cargo build -p xy Run: cargo run -p xy -- --help Expected: prints help with all subcommands.

  • Step 5: Commit
git add crates/xy/
git commit -m "feat(xy): clap CLI scaffold"

Task 24: Daemon Registry with config-hash entry

Files:

  • Create: crates/xy/src/daemon/registry.rs

  • Modify: crates/xy/src/daemon/mod.rs

  • Step 1: Implement registry with Entry { handle, config_hash }

use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use xy_supervisor::SupervisorHandle;

#[derive(Clone)]
pub struct Entry {
    pub handle: SupervisorHandle,
    pub config_hash: u64,
}

#[derive(Clone, Default)]
pub struct Registry {
    inner: Arc<RwLock<HashMap<String, Entry>>>,
}

impl Registry {
    pub fn new() -> Self { Self::default() }
    pub async fn insert(&self, name: String, entry: Entry) {
        self.inner.write().await.insert(name, entry);
    }
    pub async fn remove(&self, name: &str) -> Option<Entry> {
        self.inner.write().await.remove(name)
    }
    pub async fn get(&self, name: &str) -> Option<Entry> {
        self.inner.read().await.get(name).cloned()
    }
    pub async fn names(&self) -> Vec<String> {
        let g = self.inner.read().await;
        let mut v: Vec<String> = g.keys().cloned().collect();
        v.sort(); v
    }
    pub async fn snapshot(&self) -> Vec<(String, Entry)> {
        let g = self.inner.read().await;
        let mut v: Vec<_> = g.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
        v.sort_by(|a, b| a.0.cmp(&b.0));
        v
    }
}
  • Step 2: Add pub mod registry; to daemon/mod.rs
use crate::paths::Paths;
use anyhow::{bail, Result};

pub mod registry;

pub async fn run(_paths: Paths) -> Result<()> { bail!("not implemented") }
  • Step 3: Build

Run: cargo build -p xy Expected: compiles.

  • Step 4: Commit
git add crates/xy/
git commit -m "feat(xy): daemon Registry with config-hash entries"

Task 25: Daemon entry point — boot + accept + shutdown

Files:

  • Modify: crates/xy/src/daemon/mod.rs

  • Create: crates/xy/src/daemon/shutdown.rs

  • Create: crates/xy/src/daemon/handlers.rs (stub)

  • Step 1: Implement daemon/mod.rs

use crate::paths::Paths;
use crate::pidfile::PidFile;
use anyhow::{Context, Result};
use std::sync::{Arc, OnceLock};
use tokio::sync::{mpsc, oneshot, watch};
use tracing::{error, info};
use xy_ipc::{bind, Connection};
use xy_protocol::{kdl_parse::load_all_configs, ServerConfig, ServerState};
use xy_supervisor::{
    logs::{LogSink, RotatingLogWriter},
    supervisor::{RealSpawner, SupervisorCmd, SupervisorHandle, SupervisorTask},
};

pub mod handlers;
pub mod registry;
pub mod shutdown;

const LOG_FILE_MAX_BYTES: u64 = 10 * 1024 * 1024;
const LOG_FILE_KEEP: usize = 5;
const RING_BUFFER_BYTES: usize = 1024 * 1024;

pub static PATHS: OnceLock<Paths> = OnceLock::new();

pub fn config_hash(cfg: &ServerConfig) -> u64 {
    use std::hash::{Hash, Hasher};
    let mut h = std::collections::hash_map::DefaultHasher::new();
    serde_json::to_string(cfg).unwrap().hash(&mut h);
    h.finish()
}

pub fn spawn_supervisor(paths: &Paths, cfg: ServerConfig) -> Result<SupervisorHandle> {
    let log_path = paths.log_dir.join(format!("{}.log", cfg.name));
    let writer = RotatingLogWriter::open(&log_path, LOG_FILE_MAX_BYTES, LOG_FILE_KEEP)
        .with_context(|| format!("open log file {}", log_path.display()))?;
    let sink = LogSink::new(cfg.name.clone(), writer, RING_BUFFER_BYTES);
    let (state_tx, state_rx) = watch::channel(ServerState::Stopped);
    let (cmd_tx, cmd_rx) = mpsc::channel(16);
    let name = cfg.name.clone();
    let task = SupervisorTask::new(cfg, sink.clone(), RealSpawner, state_tx, cmd_rx);
    tokio::spawn(task.run());
    Ok(SupervisorHandle { name, tx: cmd_tx, state: state_rx, log_sink: sink })
}

pub async fn run(paths: Paths) -> Result<()> {
    paths.ensure_dirs().context("create state dirs")?;
    let _pid = PidFile::acquire(&paths.pidfile)
        .context("another xy daemon appears to be running")?;
    let listener = bind(&paths.socket).context("bind unix socket")?;
    let _ = PATHS.set(paths.clone());
    info!(socket = %paths.socket.display(), "daemon listening");

    let configs = load_all_configs(&paths.config_dir).context("load configs")?;
    let registry = registry::Registry::new();

    for cfg in configs {
        let hash = config_hash(&cfg);
        let handle = spawn_supervisor(&paths, cfg)?;
        let name = handle.name.clone();
        registry.insert(name.clone(), registry::Entry { handle: handle.clone(), config_hash: hash }).await;
        let (ack_tx, ack_rx) = oneshot::channel();
        if handle.tx.send(SupervisorCmd::Start { ack: ack_tx }).await.is_ok() {
            let _ = ack_rx.await;
        }
    }

    let registry_for_shutdown = registry.clone();
    let shutdown_signal = shutdown::install();

    let accept = async {
        loop {
            let (stream, _addr) = match listener.accept().await {
                Ok(p) => p,
                Err(e) => { error!(error = %e, "accept failed"); continue; }
            };
            let conn = Arc::new(Connection::new(stream));
            let reg = registry.clone();
            let paths_clone = paths.clone();
            tokio::spawn(async move {
                if let Err(e) = handlers::serve(conn, reg, paths_clone).await {
                    error!(error = %e, "connection ended with error");
                }
            });
        }
    };

    tokio::select! {
        _ = accept => {}
        _ = shutdown_signal => { info!("shutdown signal received"); }
    }

    shutdown::shutdown_all(registry_for_shutdown).await;
    Ok(())
}
  • Step 2: Create shutdown.rs
use crate::daemon::registry::Registry;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::oneshot;
use xy_supervisor::supervisor::SupervisorCmd;

pub fn install() -> impl std::future::Future<Output = ()> {
    let mut term = signal(SignalKind::terminate()).expect("install SIGTERM handler");
    let mut int = signal(SignalKind::interrupt()).expect("install SIGINT handler");
    async move {
        tokio::select! {
            _ = term.recv() => {}
            _ = int.recv() => {}
        }
    }
}

pub async fn shutdown_all(reg: Registry) {
    let snapshot = reg.snapshot().await;
    let mut acks = Vec::new();
    for (_name, entry) in &snapshot {
        let (tx, rx) = oneshot::channel();
        if entry.handle.tx.send(SupervisorCmd::Shutdown { ack: tx }).await.is_ok() {
            acks.push(rx);
        }
    }
    let deadline = tokio::time::Duration::from_secs(30);
    let _ = tokio::time::timeout(deadline, async {
        for rx in acks { let _ = rx.await; }
    }).await;
}
  • Step 3: Stub handlers.rs
use crate::daemon::registry::Registry;
use crate::paths::Paths;
use std::sync::Arc;
use xy_ipc::Connection;

pub async fn serve(_conn: Arc<Connection>, _reg: Registry, _paths: Paths) -> std::io::Result<()> {
    Ok(())
}
  • Step 4: Build

Run: cargo build -p xy Expected: compiles.

  • Step 5: Commit
git add crates/xy/
git commit -m "feat(xy): daemon boot + accept loop + graceful shutdown"

Task 26: RPC handlers — list/status/start/stop/restart

Files:

  • Modify: crates/xy/src/daemon/handlers.rs

  • Step 1: Replace stub with full dispatch

use crate::daemon::registry::Registry;
use crate::paths::Paths;
use std::sync::Arc;
use tokio::sync::oneshot;
use xy_ipc::envelope::{err_response, ok_response, Incoming, Request, Response};
use xy_ipc::Connection;
use xy_protocol::rpc::{
    methods, NameOrAll, RestartResult, ServerSummary, StartResult, StatusDetail, StopResult,
};
use xy_protocol::RpcErrorCode;
use xy_supervisor::supervisor::{StartAck, StopAck, SupervisorCmd};

pub async fn serve(conn: Arc<Connection>, reg: Registry, _paths: Paths) -> std::io::Result<()> {
    loop {
        let Some(incoming) = conn.read_incoming().await? else { return Ok(()); };
        match incoming {
            Incoming::Request(req) => {
                let resp = handle_request(req, &reg).await;
                conn.write_response(&resp).await?;
            }
            _ => continue,
        }
    }
}

struct ApiError { code: i32, message: String }
impl ApiError {
    fn rpc(c: RpcErrorCode, msg: impl Into<String>) -> Self {
        Self { code: c.as_i32(), message: msg.into() }
    }
}

async fn handle_request(req: Request, reg: &Registry) -> Response {
    let id = req.id.clone();
    let method = req.method.as_str();
    let params = req.params.unwrap_or(serde_json::Value::Null);

    match method {
        methods::LIST => match list(reg).await {
            Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
            Err(e) => err_response(id, e.code, e.message),
        },
        methods::STATUS => {
            let p: xy_protocol::rpc::StatusParams = match serde_json::from_value(params) {
                Ok(p) => p,
                Err(e) => return err_response(id, -32602, format!("invalid params: {e}")),
            };
            match status(reg, &p.name).await {
                Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
                Err(e) => err_response(id, e.code, e.message),
            }
        }
        methods::START => dispatch_lifecycle(id, params, reg, Op::Start).await,
        methods::STOP => dispatch_lifecycle(id, params, reg, Op::Stop).await,
        methods::RESTART => dispatch_lifecycle(id, params, reg, Op::Restart).await,
        methods::RELOAD => err_response(id, -32601, "reload not yet implemented".into()),
        methods::LOGS => err_response(id, -32601, "logs not yet implemented".into()),
        methods::LOGS_CANCEL => err_response(id, -32601, "logs_cancel not yet implemented".into()),
        other => err_response(id, -32601, format!("unknown method `{other}`")),
    }
}

async fn list(reg: &Registry) -> Result<Vec<ServerSummary>, ApiError> {
    let mut out = Vec::new();
    for (name, entry) in reg.snapshot().await {
        out.push(ServerSummary {
            name, state: *entry.handle.state.borrow(),
            pid: None, port: 0, uptime_secs: None,
            restart_count: 0, last_exit: None,
        });
    }
    Ok(out)
}

async fn status(reg: &Registry, name: &str) -> Result<StatusDetail, ApiError> {
    let Some(entry) = reg.get(name).await else {
        return Err(ApiError::rpc(RpcErrorCode::ServerNotFound, format!("no such server `{name}`")));
    };
    Ok(StatusDetail {
        summary: ServerSummary {
            name: entry.handle.name.clone(),
            state: *entry.handle.state.borrow(),
            pid: None, port: 0, uptime_secs: None,
            restart_count: 0, last_exit: None,
        },
        recent_transitions: Vec::new(),
    })
}

enum Op { Start, Stop, Restart }

async fn dispatch_lifecycle(
    id: serde_json::Value, params: serde_json::Value, reg: &Registry, op: Op,
) -> Response {
    let p: NameOrAll = match serde_json::from_value(params) {
        Ok(p) => p,
        Err(e) => return err_response(id, -32602, format!("invalid params: {e}")),
    };
    let targets: Vec<String> = match p {
        NameOrAll::All { all } if all => reg.names().await,
        NameOrAll::Name { name } => vec![name],
        NameOrAll::All { .. } => return err_response(id, -32602, "must set all=true".into()),
    };

    match op {
        Op::Start => {
            let mut started = Vec::new();
            let mut already = Vec::new();
            for name in targets {
                let Some(entry) = reg.get(&name).await else {
                    return err_response(id, RpcErrorCode::ServerNotFound.as_i32(),
                        format!("no such server `{name}`"));
                };
                let (tx, rx) = oneshot::channel();
                let _ = entry.handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
                match rx.await {
                    Ok(StartAck::Started) => started.push(name),
                    Ok(StartAck::AlreadyRunning) => already.push(name),
                    Err(_) => return err_response(id, RpcErrorCode::SpawnFailed.as_i32(),
                        format!("supervisor for `{name}` dropped")),
                }
            }
            ok_response(id, serde_json::to_value(StartResult { started, already_running: already }).unwrap())
        }
        Op::Stop => {
            let mut stopped = Vec::new();
            let mut not_running = Vec::new();
            for name in targets {
                let Some(entry) = reg.get(&name).await else {
                    return err_response(id, RpcErrorCode::ServerNotFound.as_i32(),
                        format!("no such server `{name}`"));
                };
                let (tx, rx) = oneshot::channel();
                let _ = entry.handle.tx.send(SupervisorCmd::Stop { ack: tx }).await;
                match rx.await {
                    Ok(StopAck::Stopped) => stopped.push(name),
                    Ok(StopAck::NotRunning) => not_running.push(name),
                    Err(_) => return err_response(id, RpcErrorCode::SpawnFailed.as_i32(),
                        format!("supervisor for `{name}` dropped")),
                }
            }
            ok_response(id, serde_json::to_value(StopResult { stopped, not_running }).unwrap())
        }
        Op::Restart => {
            let mut restarted = Vec::new();
            for name in targets {
                let Some(entry) = reg.get(&name).await else {
                    return err_response(id, RpcErrorCode::ServerNotFound.as_i32(),
                        format!("no such server `{name}`"));
                };
                let (tx, rx) = oneshot::channel();
                let _ = entry.handle.tx.send(SupervisorCmd::Restart { ack: tx }).await;
                let _ = rx.await;
                restarted.push(name);
            }
            ok_response(id, serde_json::to_value(RestartResult { restarted }).unwrap())
        }
    }
}
  • Step 2: Build

Run: cargo build -p xy Expected: compiles.

  • Step 3: Commit
git add crates/xy/
git commit -m "feat(xy): RPC handlers for list/status/start/stop/restart"

Task 27: RPC handler — reload

Files:

  • Modify: crates/xy/src/daemon/handlers.rs

  • Step 1: Replace RELOAD arm and add reload

In the match method block, change:

methods::RELOAD => match reload(reg).await {
    Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
    Err(e) => err_response(id, e.code, e.message),
},

Append at end of file:

use xy_protocol::rpc::ReloadResult;

async fn reload(reg: &Registry) -> Result<ReloadResult, ApiError> {
    let paths = crate::daemon::PATHS.get()
        .ok_or_else(|| ApiError::rpc(RpcErrorCode::ConfigInvalid, "daemon paths not initialized"))?;
    let new_configs = xy_protocol::kdl_parse::load_all_configs(&paths.config_dir)
        .map_err(|e| ApiError::rpc(RpcErrorCode::ConfigInvalid, e.to_string()))?;

    use std::collections::HashMap;
    let new_by_name: HashMap<String, xy_protocol::ServerConfig> =
        new_configs.into_iter().map(|c| (c.name.clone(), c)).collect();
    let existing_names: Vec<String> = reg.names().await;

    let mut added = Vec::new();
    let mut removed = Vec::new();
    let mut changed = Vec::new();
    let mut unchanged = Vec::new();

    for name in &existing_names {
        if !new_by_name.contains_key(name) {
            if let Some(entry) = reg.remove(name).await {
                let (tx, rx) = oneshot::channel();
                let _ = entry.handle.tx.send(SupervisorCmd::Shutdown { ack: tx }).await;
                let _ = rx.await;
                removed.push(name.clone());
            }
        }
    }

    for (name, cfg) in new_by_name {
        let new_hash = crate::daemon::config_hash(&cfg);
        match reg.get(&name).await {
            None => {
                let handle = crate::daemon::spawn_supervisor(paths, cfg)
                    .map_err(|e| ApiError::rpc(RpcErrorCode::SpawnFailed, e.to_string()))?;
                reg.insert(name.clone(), crate::daemon::registry::Entry {
                    handle: handle.clone(), config_hash: new_hash,
                }).await;
                let (tx, rx) = oneshot::channel();
                let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
                let _ = rx.await;
                added.push(name);
            }
            Some(entry) if entry.config_hash != new_hash => {
                let (tx, rx) = oneshot::channel();
                let _ = entry.handle.tx.send(SupervisorCmd::Shutdown { ack: tx }).await;
                let _ = rx.await;
                reg.remove(&name).await;

                let handle = crate::daemon::spawn_supervisor(paths, cfg)
                    .map_err(|e| ApiError::rpc(RpcErrorCode::SpawnFailed, e.to_string()))?;
                reg.insert(name.clone(), crate::daemon::registry::Entry {
                    handle: handle.clone(), config_hash: new_hash,
                }).await;
                let (tx, rx) = oneshot::channel();
                let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
                let _ = rx.await;
                changed.push(name);
            }
            Some(_) => unchanged.push(name),
        }
    }

    Ok(ReloadResult { added, removed, changed, unchanged })
}
  • Step 2: Build

Run: cargo build -p xy Expected: compiles.

  • Step 3: Commit
git add crates/xy/
git commit -m "feat(xy): reload handler with diff"

Task 28: RPC handlers — logs + logs_cancel

Files:

  • Modify: crates/xy/src/daemon/handlers.rs

  • Step 1: Add ConnState and rewire serve

At the top of the imports add:

use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use xy_protocol::rpc::{notifications, LogEnd, LogLine, LogsCancelParams, LogsParams, LogsSubscribed};

Add ConnState and update serve:

pub struct ConnState {
    pub subs: Mutex<HashMap<u64, JoinHandle<()>>>,
    pub next: AtomicU64,
}
impl ConnState {
    pub fn new() -> Self { Self { subs: Mutex::new(HashMap::new()), next: AtomicU64::new(1) } }
}

pub async fn serve(conn: Arc<Connection>, reg: Registry, _paths: Paths) -> std::io::Result<()> {
    let state = Arc::new(ConnState::new());
    loop {
        let Some(incoming) = conn.read_incoming().await? else {
            let mut subs = state.subs.lock().await;
            for (_, h) in subs.drain() { h.abort(); }
            return Ok(());
        };
        if let Incoming::Request(req) = incoming {
            let resp = handle_request(req, &reg, &conn, &state).await;
            conn.write_response(&resp).await?;
        }
    }
}

Update handle_request signature to:

async fn handle_request(
    req: Request, reg: &Registry, conn: &Arc<Connection>, state: &Arc<ConnState>,
) -> Response { /* same body, plus the two new arms below */ }

Replace the LOGS / LOGS_CANCEL arms with:

methods::LOGS => {
    let p: LogsParams = match serde_json::from_value(params) {
        Ok(p) => p,
        Err(e) => return err_response(id, -32602, format!("invalid params: {e}")),
    };
    match start_log_stream(reg, conn.clone(), state.clone(), p).await {
        Ok(sub_id) => ok_response(id, serde_json::to_value(LogsSubscribed { subscription_id: sub_id }).unwrap()),
        Err(e) => err_response(id, e.code, e.message),
    }
}
methods::LOGS_CANCEL => {
    let p: LogsCancelParams = match serde_json::from_value(params) {
        Ok(p) => p,
        Err(e) => return err_response(id, -32602, format!("invalid params: {e}")),
    };
    let mut subs = state.subs.lock().await;
    if let Some(h) = subs.remove(&p.subscription_id) { h.abort(); }
    ok_response(id, serde_json::json!({}))
}
  • Step 2: Implement start_log_stream

Append:

async fn start_log_stream(
    reg: &Registry, conn: Arc<Connection>, state: Arc<ConnState>, p: LogsParams,
) -> Result<u64, ApiError> {
    let Some(entry) = reg.get(&p.name).await else {
        return Err(ApiError::rpc(RpcErrorCode::ServerNotFound, format!("no such server `{}`", p.name)));
    };
    let sub_id = state.next.fetch_add(1, Ordering::Relaxed);
    let sink = entry.handle.log_sink.clone();
    let conn2 = conn.clone();
    let state2 = state.clone();
    let follow = p.follow;
    let tail = p.tail;
    let name = p.name.clone();

    let task = tokio::spawn(async move {
        for line in sink.ring.snapshot_tail(tail) {
            let n = xy_ipc::envelope::notification(notifications::LOG,
                Some(serde_json::to_value(LogLine {
                    subscription_id: sub_id,
                    name: name.clone(),
                    stream: line.stream, line: line.line,
                    ts_unix_ms: line.ts_unix_ms,
                }).unwrap()));
            if conn2.write_notification(&n).await.is_err() { return; }
        }

        if !follow {
            let end = xy_ipc::envelope::notification(notifications::LOG_END,
                Some(serde_json::to_value(LogEnd { subscription_id: sub_id }).unwrap()));
            let _ = conn2.write_notification(&end).await;
            state2.subs.lock().await.remove(&sub_id);
            return;
        }

        let mut rx = sink.broadcast.subscribe();
        loop {
            match rx.recv().await {
                Ok(mut line) => {
                    line.subscription_id = sub_id;
                    let n = xy_ipc::envelope::notification(notifications::LOG,
                        Some(serde_json::to_value(&line).unwrap()));
                    if conn2.write_notification(&n).await.is_err() { break; }
                }
                Err(_) => break,
            }
        }
    });
    state.subs.lock().await.insert(sub_id, task);
    Ok(sub_id)
}
  • Step 3: Build

Run: cargo build -p xy Expected: compiles. If handle_request callers in serve aren't updated, update the call sites.

  • Step 4: Commit
git add crates/xy/
git commit -m "feat(xy): logs streaming via subscription notifications"

Task 29: CLI client implementations

Files:

  • Modify: crates/xy/src/cli/mod.rs

  • Create: crates/xy/src/cli/format.rs

  • Step 1: Implement format.rs

use xy_protocol::rpc::ServerSummary;

pub fn list_table(rows: &[ServerSummary]) -> String {
    let mut out = String::new();
    out.push_str("NAME                STATE       PID     PORT    UPTIME    RESTARTS\n");
    for r in rows {
        let pid = r.pid.map(|p| p.to_string()).unwrap_or_else(|| "-".into());
        let up = r.uptime_secs.map(|s| format!("{}s", s)).unwrap_or_else(|| "-".into());
        out.push_str(&format!(
            "{:<20}{:<12}{:<8}{:<8}{:<10}{}\n",
            r.name, format!("{:?}", r.state).to_lowercase(),
            pid, r.port, up, r.restart_count
        ));
    }
    out
}
  • Step 2: Replace cli/mod.rs
use crate::paths::Paths;
use anyhow::Result;
use serde_json::json;
use xy_ipc::{Client, ClientError};
use xy_protocol::rpc::{
    methods, notifications, LogLine, LogsParams, LogsSubscribed, ReloadResult, RestartResult,
    ServerSummary, StartResult, StatusDetail, StopResult,
};

mod format;

async fn connect(paths: &Paths) -> Result<Client> {
    match Client::connect(&paths.socket).await {
        Ok(c) => Ok(c),
        Err(e) => {
            eprintln!("xy: cannot reach daemon at {}: {e}", paths.socket.display());
            std::process::exit(2);
        }
    }
}

fn rpc_to_exit(e: &ClientError) -> i32 {
    match e {
        ClientError::Unreachable(_) => 2,
        ClientError::Rpc { .. } => 1,
        _ => 1,
    }
}

pub async fn list(paths: Paths) -> Result<i32> {
    let mut c = connect(&paths).await?;
    let rows: Vec<ServerSummary> = match c.call_no_params(methods::LIST).await {
        Ok(v) => v,
        Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
    };
    print!("{}", format::list_table(&rows));
    Ok(0)
}

pub async fn status(paths: Paths, name: String) -> Result<i32> {
    let mut c = connect(&paths).await?;
    let d: StatusDetail = match c.call(methods::STATUS, &json!({"name": name})).await {
        Ok(v) => v,
        Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
    };
    println!("{:#?}", d);
    Ok(0)
}

fn name_or_all(all: bool, name: Option<String>) -> serde_json::Value {
    if all { json!({"all": true}) } else { json!({"name": name.unwrap()}) }
}

pub async fn start(paths: Paths, all: bool, name: Option<String>) -> Result<i32> {
    let mut c = connect(&paths).await?;
    let r: StartResult = match c.call(methods::START, &name_or_all(all, name)).await {
        Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
    };
    if !r.started.is_empty() { println!("started: {}", r.started.join(", ")); }
    if !r.already_running.is_empty() { println!("already running: {}", r.already_running.join(", ")); }
    Ok(0)
}

pub async fn stop(paths: Paths, all: bool, name: Option<String>) -> Result<i32> {
    let mut c = connect(&paths).await?;
    let r: StopResult = match c.call(methods::STOP, &name_or_all(all, name)).await {
        Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
    };
    if !r.stopped.is_empty() { println!("stopped: {}", r.stopped.join(", ")); }
    if !r.not_running.is_empty() { println!("not running: {}", r.not_running.join(", ")); }
    Ok(0)
}

pub async fn restart(paths: Paths, all: bool, name: Option<String>) -> Result<i32> {
    let mut c = connect(&paths).await?;
    let r: RestartResult = match c.call(methods::RESTART, &name_or_all(all, name)).await {
        Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
    };
    println!("restarted: {}", r.restarted.join(", "));
    Ok(0)
}

pub async fn reload(paths: Paths) -> Result<i32> {
    let mut c = connect(&paths).await?;
    let r: ReloadResult = match c.call_no_params(methods::RELOAD).await {
        Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
    };
    println!("added:     {}", r.added.join(", "));
    println!("removed:   {}", r.removed.join(", "));
    println!("changed:   {}", r.changed.join(", "));
    println!("unchanged: {}", r.unchanged.join(", "));
    Ok(0)
}

pub async fn logs(paths: Paths, name: String, tail: Option<u32>, follow: bool) -> Result<i32> {
    let mut c = connect(&paths).await?;
    let p = LogsParams { name: name.clone(), tail, follow };
    let _sub: LogsSubscribed = match c.call(methods::LOGS, &p).await {
        Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
    };
    loop {
        match c.read_notification().await {
            Ok(None) => return Ok(0),
            Ok(Some(n)) => match n.method.as_str() {
                notifications::LOG => {
                    if let Some(params) = n.params {
                        if let Ok(line) = serde_json::from_value::<LogLine>(params) {
                            let tag = match line.stream {
                                xy_protocol::rpc::LogStream::Stdout => "out",
                                xy_protocol::rpc::LogStream::Stderr => "err",
                            };
                            println!("[{tag}] {}", line.line);
                        }
                    }
                }
                notifications::LOG_END => return Ok(0),
                _ => {}
            },
            Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
        }
    }
}
  • Step 3: Build

Run: cargo build --workspace Expected: full workspace compiles.

  • Step 4: Commit
git add crates/xy/
git commit -m "feat(xy): CLI client commands"

Phase 6 — Integration tests

Task 30: Test helper binaries

Files:

  • Create: crates/xy/src/bin/xy_test_sleep_server.rs

  • Create: crates/xy/src/bin/xy_test_exit_failure.rs

  • Modify: crates/xy/Cargo.toml

  • Step 1: xy_test_sleep_server.rs

fn main() {
    use std::io::Write;
    let pid = std::process::id();
    eprintln!("sleep_server start pid={pid}");
    println!("ready");
    std::io::stdout().flush().ok();
    loop {
        std::thread::sleep(std::time::Duration::from_secs(60));
        println!("tick");
        std::io::stdout().flush().ok();
    }
}
  • Step 2: xy_test_exit_failure.rs
fn main() {
    eprintln!("exit_failure dying immediately");
    std::process::exit(7);
}
  • Step 3: Declare in crates/xy/Cargo.toml
[[bin]]
name = "xy-test-sleep-server"
path = "src/bin/xy_test_sleep_server.rs"

[[bin]]
name = "xy-test-exit-failure"
path = "src/bin/xy_test_exit_failure.rs"
  • Step 4: Build

Run: cargo build -p xy --bins Expected: three binaries produced.

  • Step 5: Commit
git add crates/xy/
git commit -m "test(xy): helper binaries for integration tests"

Task 31: Integration test scaffolding

Files:

  • Create: crates/xy/tests/common/mod.rs

  • Step 1: Write the harness

use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use tokio::process::{Child, Command};
use tempfile::TempDir;

pub struct Harness {
    pub tmp: TempDir,
    pub config_dir: PathBuf,
    pub state_dir: PathBuf,
    pub socket: PathBuf,
    pub daemon: Option<Child>,
}

impl Harness {
    pub fn new() -> Self {
        let tmp = tempfile::tempdir().expect("tempdir");
        std::fs::create_dir_all(tmp.path().join("config")).unwrap();
        std::fs::create_dir_all(tmp.path().join("state")).unwrap();
        std::fs::create_dir_all(tmp.path().join("run")).unwrap();
        let config_dir = tmp.path().join("config/xy/servers");
        let state_dir = tmp.path().join("state/xy");
        std::fs::create_dir_all(&config_dir).unwrap();
        std::fs::create_dir_all(&state_dir).unwrap();
        let socket = tmp.path().join("run/xy.sock");
        Self { tmp, config_dir, state_dir, socket, daemon: None }
    }

    pub fn write_server(&self, name: &str, command: &str, port: u16, restart_policy: &str) {
        let body = format!(
            "command \"{command}\"\nport {port}\nrestart {{ policy \"{restart_policy}\" backoff-initial \"10ms\" backoff-max \"50ms\" max-retries-per-minute 3 }}\nstop {{ grace \"500ms\" }}\n"
        );
        std::fs::write(self.config_dir.join(format!("{name}.kdl")), body).unwrap();
    }

    pub async fn start_daemon(&mut self, xy_bin: &PathBuf) {
        let child = Command::new(xy_bin)
            .arg("daemon")
            .env("XDG_CONFIG_HOME", self.tmp.path().join("config"))
            .env("XDG_STATE_HOME", self.tmp.path().join("state"))
            .env("XDG_RUNTIME_DIR", self.tmp.path().join("run"))
            .stdout(Stdio::null())
            .stderr(Stdio::inherit())
            .kill_on_drop(true)
            .spawn()
            .expect("spawn daemon");
        self.daemon = Some(child);
        let deadline = std::time::Instant::now() + Duration::from_secs(5);
        while !self.socket.exists() {
            if std::time::Instant::now() > deadline { panic!("daemon socket never appeared"); }
            tokio::time::sleep(Duration::from_millis(25)).await;
        }
    }

    pub async fn run_cli(&self, xy_bin: &PathBuf, args: &[&str]) -> (i32, String, String) {
        let out = Command::new(xy_bin)
            .args(args)
            .env("XDG_CONFIG_HOME", self.tmp.path().join("config"))
            .env("XDG_STATE_HOME", self.tmp.path().join("state"))
            .env("XDG_RUNTIME_DIR", self.tmp.path().join("run"))
            .output().await.expect("run cli");
        let code = out.status.code().unwrap_or(-1);
        let stdout = String::from_utf8_lossy(&out.stdout).to_string();
        let stderr = String::from_utf8_lossy(&out.stderr).to_string();
        (code, stdout, stderr)
    }
}

pub fn xy_bin() -> PathBuf { artifact("xy") }
pub fn sleep_server_bin() -> PathBuf { artifact("xy-test-sleep-server") }
pub fn exit_failure_bin() -> PathBuf { artifact("xy-test-exit-failure") }

fn artifact(name: &str) -> PathBuf {
    let mut p = std::env::current_exe().unwrap();
    p.pop();
    if p.ends_with("deps") { p.pop(); }
    p.push(name);
    if !p.exists() { panic!("artifact `{}` not found at {}", name, p.display()); }
    p
}
  • Step 2: Build

Run: cargo build --workspace --tests Expected: compiles.

  • Step 3: Commit
git add crates/xy/
git commit -m "test(xy): integration test harness"

Task 32: Lifecycle integration test

Files:

  • Create: crates/xy/tests/lifecycle.rs

  • Step 1: Write the test

mod common;
use common::*;

#[tokio::test]
async fn auto_starts_on_boot_then_stop_and_start() {
    let xy = xy_bin();
    let sleeper = sleep_server_bin();
    let mut h = Harness::new();
    h.write_server("alpha", sleeper.to_str().unwrap(), 19_001, "always");
    h.start_daemon(&xy).await;

    let mut last_stdout = String::new();
    for _ in 0..40 {
        let (_c, out, _e) = h.run_cli(&xy, &["list"]).await;
        last_stdout = out;
        if last_stdout.contains("alpha") && last_stdout.contains("running") { break; }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    assert!(last_stdout.contains("alpha"), "stdout: {last_stdout}");
    assert!(last_stdout.contains("running"), "stdout: {last_stdout}");

    let (code, out, _e) = h.run_cli(&xy, &["stop", "alpha"]).await;
    assert_eq!(code, 0);
    assert!(out.contains("stopped: alpha"), "stdout: {out}");

    let (code, out, _e) = h.run_cli(&xy, &["start", "alpha"]).await;
    assert_eq!(code, 0);
    assert!(out.contains("started: alpha"), "stdout: {out}");
}
  • Step 2: Run

Run: cargo test -p xy --test lifecycle -- --nocapture Expected: passes.

  • Step 3: Commit
git add crates/xy/
git commit -m "test(xy): auto-start + stop/start lifecycle"

Task 33: Restart cap integration test

Files:

  • Create: crates/xy/tests/restart_policy.rs

  • Step 1: Write the test

mod common;
use common::*;

#[tokio::test]
async fn restart_cap_marks_failed() {
    let xy = xy_bin();
    let bad = exit_failure_bin();
    let mut h = Harness::new();
    h.write_server("flaky", bad.to_str().unwrap(), 19_010, "always");
    h.start_daemon(&xy).await;

    let mut saw_failed = false;
    for _ in 0..60 {
        let (_c, out, _e) = h.run_cli(&xy, &["list"]).await;
        if out.contains("flaky") && out.contains("failed") { saw_failed = true; break; }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    assert!(saw_failed, "flaky never reached failed state");
}
  • Step 2: Run

Run: cargo test -p xy --test restart_policy -- --nocapture Expected: passes.

  • Step 3: Commit
git add crates/xy/
git commit -m "test(xy): restart cap escalates to failed"

Task 34: Reload integration test

Files:

  • Create: crates/xy/tests/reload.rs

  • Step 1: Write the test

mod common;
use common::*;

#[tokio::test]
async fn reload_adds_removes_and_changes() {
    let xy = xy_bin();
    let sleeper = sleep_server_bin();
    let mut h = Harness::new();
    h.write_server("a", sleeper.to_str().unwrap(), 19_020, "always");
    h.start_daemon(&xy).await;

    for _ in 0..40 {
        let (_c, out, _e) = h.run_cli(&xy, &["list"]).await;
        if out.contains("a") && out.contains("running") { break; }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }

    h.write_server("a", sleeper.to_str().unwrap(), 19_021, "always");
    h.write_server("b", sleeper.to_str().unwrap(), 19_022, "always");
    let (code, out, _e) = h.run_cli(&xy, &["reload"]).await;
    assert_eq!(code, 0);
    assert!(out.contains("added:") && out.contains("b"), "stdout: {out}");
    assert!(out.contains("changed:") && out.contains("a"), "stdout: {out}");

    std::fs::remove_file(h.config_dir.join("a.kdl")).unwrap();
    let (code, out, _e) = h.run_cli(&xy, &["reload"]).await;
    assert_eq!(code, 0);
    assert!(out.contains("removed:") && out.contains("a"), "stdout: {out}");
}
  • Step 2: Run

Run: cargo test -p xy --test reload -- --nocapture Expected: passes.

  • Step 3: Commit
git add crates/xy/
git commit -m "test(xy): reload diff"

Task 35: Logs integration test

Files:

  • Create: crates/xy/tests/logs.rs

  • Step 1: Write the test

mod common;
use common::*;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;

#[tokio::test]
async fn logs_tail_prints_existing_lines() {
    let xy = xy_bin();
    let sleeper = sleep_server_bin();
    let mut h = Harness::new();
    h.write_server("svc", sleeper.to_str().unwrap(), 19_030, "always");
    h.start_daemon(&xy).await;

    tokio::time::sleep(Duration::from_millis(500)).await;
    let (code, out, _e) = h.run_cli(&xy, &["logs", "svc", "--tail", "10"]).await;
    assert_eq!(code, 0);
    assert!(out.contains("ready"), "stdout: {out}");
}

#[tokio::test]
async fn logs_follow_streams_new_lines() {
    let xy = xy_bin();
    let sleeper = sleep_server_bin();
    let mut h = Harness::new();
    h.write_server("svc", sleeper.to_str().unwrap(), 19_031, "always");
    h.start_daemon(&xy).await;

    let mut child = Command::new(&xy)
        .args(["logs", "svc", "--follow"])
        .env("XDG_CONFIG_HOME", h.tmp.path().join("config"))
        .env("XDG_STATE_HOME", h.tmp.path().join("state"))
        .env("XDG_RUNTIME_DIR", h.tmp.path().join("run"))
        .stdout(Stdio::piped())
        .stderr(Stdio::null())
        .kill_on_drop(true)
        .spawn().unwrap();

    let stdout = child.stdout.take().unwrap();
    let mut lines = BufReader::new(stdout).lines();
    let first = tokio::time::timeout(Duration::from_secs(2), lines.next_line()).await
        .expect("timeout waiting for first log line").unwrap();
    assert!(first.is_some());
    let _ = child.kill().await;
}
  • Step 2: Run

Run: cargo test -p xy --test logs -- --nocapture Expected: passes.

  • Step 3: Commit
git add crates/xy/
git commit -m "test(xy): logs --tail and --follow"

Phase 7 — Polish

Task 36: CI-clean

Files: none (verification only).

  • Step 1: Format

Run: cargo +nightly fmt --all

  • Step 2: Lint

Run: cargo clippy --workspace --all-targets -- -D warnings Fix any warnings inline.

  • Step 3: Full test pass

Run: cargo test --workspace Expected: every test green.

  • Step 4: Commit cleanups
git add -A
git commit -m "chore: cargo fmt + clippy clean"

Task 37: README + example KDL

Files:

  • Create: README.md

  • Create: examples/insikt.kdl

  • Step 1: Write README.md

# xy — HTTP MCP server supervisor

Daemon + CLI that launches and supervises HTTP-based MCP servers.

## Build

    cargo build --release

## Run

    target/release/xy daemon       # foreground

Drop a server definition into `$XDG_CONFIG_HOME/xy/servers/<name>.kdl`
(see `examples/insikt.kdl`) and `xy reload`.

Commands:

    xy list
    xy status <name>
    xy start  <name|--all>
    xy stop   <name|--all>
    xy restart <name|--all>
    xy reload
    xy logs <name> [--tail N] [--follow]

Exit codes: 0 success, 1 operational error, 2 daemon unreachable, 3 config invalid.
  • Step 2: Write examples/insikt.kdl
command "/Users/you/.cargo/bin/insikt-mcp"
args "--http" "--port" "8421"
port 8421

env {
    RUST_LOG "info"
}

restart {
    policy "on-failure"
}

stop {
    grace "10s"
}
  • Step 3: Commit
git add README.md examples/
git commit -m "docs: README and example KDL config"

Self-Review (run before handoff)

Spec coverage:

  • Single xy binary, Cargo workspace with 4 crates — Tasks 1, 816, 1720, 21+.
  • XDG paths (macOS too via etcetera) — Task 21.
  • Per-server KDL configs, duplicate-port detection at load — Tasks 5, 6.
  • JSON-RPC over newline-delimited Unix socket, 0600 perms — Tasks 1720.
  • All RPC methods including subscription-based logs + logs_cancel — Tasks 26, 27, 28.
  • Restart policy + backoff + retry window → failed — Tasks 911, 16, 33.
  • Stop with SIGTERM, grace timer, SIGKILL — Task 16, 25 (daemon shutdown).
  • Log rotation + ring buffer + broadcast — Tasks 1214.
  • Process-group ownership of child — Task 15.
  • Tests at unit + integration level — Tasks 816 (units), 3235 (integration).
  • CLI exit code mapping — Task 29 (rpc_to_exit).

Placeholders: none — every step contains its code or commands.

Type consistency:

  • SupervisorHandle.tx is mpsc::Sender<SupervisorCmd> everywhere.
  • LogLine.subscription_id is u64 everywhere.
  • Registry uses Entry { handle, config_hash } consistently in Tasks 24 onward.

Order: later tasks depend only on earlier ones. xy-protocol (Phase 2) feeds everything; xy-supervisor (Phase 3) uses xy-protocol; xy-ipc (Phase 4) is independent of supervisor; xy binary (Phase 5) ties them all together; integration tests (Phase 6) drive end-to-end; polish (Phase 7) closes out.