Files
xy/docs/superpowers/plans/2026-05-25-xy-mcp-supervisor.md
logaritmisk c252bd7716 docs: add xy MCP supervisor implementation plan
37-task TDD-style plan across 7 phases: workspace skeleton,
xy-protocol (config/state/rpc types), xy-supervisor (state machine
with mock-driven unit tests), xy-ipc (JSON-RPC over Unix socket),
xy binary (daemon + CLI), integration tests with test-helper bins,
and polish (fmt/clippy/README).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 11:13:56 +02:00

4165 lines
125 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# xy MCP Supervisor — Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Build the MVP of `xy`, a Tokio-based daemon that supervises HTTP-based MCP server processes on macOS/Linux, plus a CLI that drives it over a Unix socket.
**Architecture:** Single `xy` binary in a Cargo workspace of four crates (`xy-protocol`, `xy-supervisor`, `xy-ipc`, `xy`). The daemon (`xy daemon`) runs one supervisor task per managed server; the CLI subcommands act as JSON-RPC 2.0 clients over a newline-delimited Unix socket. Configs are per-server KDL files under XDG config dir.
**Tech Stack:** Rust (edition 2024), Tokio (rt-multi-thread, net, process, signal, sync, fs, io-util, macros), clap (derive), serde + serde_json, kdl, etcetera (XDG), nix (signals/process groups), tracing + tracing-subscriber, thiserror, anyhow.
**Spec:** `docs/superpowers/specs/2026-05-25-xy-mcp-supervisor-design.md`
---
## File Structure
```
xy/
├── Cargo.toml # workspace manifest (no [package])
├── crates/
│ ├── xy-protocol/
│ │ ├── Cargo.toml
│ │ └── src/
│ │ ├── lib.rs # re-exports
│ │ ├── config.rs # ServerConfig, RestartPolicy, parsing
│ │ ├── state.rs # ServerState enum
│ │ ├── rpc.rs # JSON-RPC method params/results
│ │ └── error.rs # ConfigError, RpcErrorCode
│ ├── xy-supervisor/
│ │ ├── Cargo.toml
│ │ └── src/
│ │ ├── lib.rs # public surface
│ │ ├── child.rs # ChildHandle trait + RealChild
│ │ ├── policy.rs # restart decision logic
│ │ ├── backoff.rs # backoff calculator
│ │ ├── retry_window.rs # sliding 60s window
│ │ ├── logs.rs # log writer, rotation, ring buffer, broadcast
│ │ └── supervisor.rs # supervisor task + state machine
│ ├── xy-ipc/
│ │ ├── Cargo.toml
│ │ └── src/
│ │ ├── lib.rs
│ │ ├── framing.rs # newline-delimited JSON codec
│ │ ├── envelope.rs # JSON-RPC 2.0 request/response/notification
│ │ ├── server.rs # listen, accept, dispatch
│ │ └── client.rs # connect, call, subscribe
│ └── xy/
│ ├── Cargo.toml
│ └── src/
│ ├── main.rs # clap entry, subcommand dispatch
│ ├── paths.rs # XDG path resolution (via etcetera)
│ ├── pidfile.rs # acquire/release exclusive pidfile
│ ├── daemon/
│ │ ├── mod.rs # daemon entry point
│ │ ├── handlers.rs # JSON-RPC method handlers → supervisor msgs
│ │ ├── registry.rs # name → supervisor handle map
│ │ └── shutdown.rs # SIGTERM/SIGINT graceful shutdown
│ ├── cli/
│ │ ├── mod.rs # client-side CLI command impls
│ │ └── format.rs # `list` / `status` output formatting
│ └── bin/
│ ├── xy_test_sleep_server.rs # integration test helper
│ └── xy_test_exit_failure.rs # integration test helper
└── crates/xy/tests/
├── common/mod.rs # tempdir XDG, spawn daemon, drive CLI
├── lifecycle.rs # start/stop/restart
├── reload.rs # added/removed/changed
├── restart_policy.rs # crash + cap → failed
└── logs.rs # --tail and --follow
```
Each crate's `lib.rs` re-exports its public surface so downstream crates use short paths (`xy_protocol::ServerConfig`, not `xy_protocol::config::ServerConfig`).
---
> **Note for the executor:** The full step-by-step task list (37 tasks across 7 phases) follows. Each task is bite-sized (25 minutes per step) and includes complete code, exact commands, and commit messages. The plan was authored in a single drafting session; if a step's code snippet references a type or helper, it is defined either in that task or in an earlier numbered task within this plan.
## Phase 1 — Workspace skeleton
### Task 1: Convert single crate to workspace
**Files:**
- Modify: `Cargo.toml`
- Create: `crates/xy-protocol/Cargo.toml`
- Create: `crates/xy-protocol/src/lib.rs`
- Create: `crates/xy-supervisor/Cargo.toml`
- Create: `crates/xy-supervisor/src/lib.rs`
- Create: `crates/xy-ipc/Cargo.toml`
- Create: `crates/xy-ipc/src/lib.rs`
- Create: `crates/xy/Cargo.toml`
- Create: `crates/xy/src/main.rs`
- Delete: `src/main.rs` and `src/` (replaced by `crates/xy/`)
- [ ] **Step 1: Replace root `Cargo.toml` with a workspace manifest**
```toml
[workspace]
resolver = "3"
members = [
"crates/xy-protocol",
"crates/xy-supervisor",
"crates/xy-ipc",
"crates/xy",
]
[workspace.package]
edition = "2024"
version = "0.1.0"
license = "MIT OR Apache-2.0"
[workspace.dependencies]
xy-protocol = { path = "crates/xy-protocol" }
xy-supervisor = { path = "crates/xy-supervisor" }
xy-ipc = { path = "crates/xy-ipc" }
tokio = { version = "1", features = ["rt-multi-thread", "net", "process", "signal", "sync", "fs", "io-util", "macros", "time"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "2"
anyhow = "1"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
clap = { version = "4", features = ["derive"] }
kdl = "6"
etcetera = "0.10"
nix = { version = "0.30", features = ["signal", "process"] }
humantime = "2"
humantime-serde = "1"
async-trait = "0.1"
tempfile = "3"
tokio-test = "0.4"
```
- [ ] **Step 2: Create `crates/xy-protocol/Cargo.toml`**
```toml
[package]
name = "xy-protocol"
edition.workspace = true
version.workspace = true
license.workspace = true
[dependencies]
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
kdl.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
[dev-dependencies]
tempfile.workspace = true
```
- [ ] **Step 3: Create `crates/xy-protocol/src/lib.rs`**
```rust
//! Wire types and config schema shared between the xy daemon and CLI.
```
- [ ] **Step 4: Create `crates/xy-supervisor/Cargo.toml`**
```toml
[package]
name = "xy-supervisor"
edition.workspace = true
version.workspace = true
license.workspace = true
[dependencies]
xy-protocol.workspace = true
tokio.workspace = true
tracing.workspace = true
thiserror.workspace = true
nix.workspace = true
async-trait.workspace = true
[dev-dependencies]
tokio-test.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] }
```
- [ ] **Step 5: Create `crates/xy-supervisor/src/lib.rs`**
```rust
//! Process-supervision primitives for the xy daemon.
```
- [ ] **Step 6: Create `crates/xy-ipc/Cargo.toml`**
```toml
[package]
name = "xy-ipc"
edition.workspace = true
version.workspace = true
license.workspace = true
[dependencies]
xy-protocol.workspace = true
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
thiserror.workspace = true
[dev-dependencies]
tempfile.workspace = true
```
- [ ] **Step 7: Create `crates/xy-ipc/src/lib.rs`**
```rust
//! JSON-RPC 2.0 over newline-delimited JSON on a Unix socket.
```
- [ ] **Step 8: Create `crates/xy/Cargo.toml`**
```toml
[package]
name = "xy"
edition.workspace = true
version.workspace = true
license.workspace = true
[[bin]]
name = "xy"
path = "src/main.rs"
[dependencies]
xy-protocol.workspace = true
xy-supervisor.workspace = true
xy-ipc.workspace = true
tokio.workspace = true
clap.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
anyhow.workspace = true
etcetera.workspace = true
nix.workspace = true
humantime.workspace = true
[dev-dependencies]
tempfile.workspace = true
```
- [ ] **Step 9: Create `crates/xy/src/main.rs`**
```rust
fn main() {
println!("xy: not implemented yet");
}
```
- [ ] **Step 10: Remove the old top-level `src/` directory**
Run: `rm -rf src/`
- [ ] **Step 11: Verify the workspace builds**
Run: `cargo build --workspace`
Expected: every crate compiles; one binary `xy` is produced.
- [ ] **Step 12: Commit**
```bash
git add Cargo.toml crates/
git rm -r --cached src/ 2>/dev/null || true
git add -A
git commit -m "chore: convert to cargo workspace with four crates"
```
---
## Phase 2 — `xy-protocol`
### Task 2: `ServerState` enum
**Files:**
- Create: `crates/xy-protocol/src/state.rs`
- Modify: `crates/xy-protocol/src/lib.rs`
- [ ] **Step 1: Write the failing test inline in `state.rs`**
```rust
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum ServerState {
Stopped,
Starting,
Running,
Restarting,
Failed,
Stopping,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn serializes_to_kebab_case() {
assert_eq!(serde_json::to_string(&ServerState::Restarting).unwrap(), "\"restarting\"");
}
#[test]
fn deserializes_from_kebab_case() {
let s: ServerState = serde_json::from_str("\"failed\"").unwrap();
assert_eq!(s, ServerState::Failed);
}
}
```
- [ ] **Step 2: Expose from `lib.rs`**
```rust
//! Wire types and config schema shared between the xy daemon and CLI.
pub mod state;
pub use state::ServerState;
```
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy-protocol`
Expected: 2 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-protocol/
git commit -m "feat(protocol): ServerState enum"
```
### Task 3: `RestartPolicy` + `RestartConfig` + `StopConfig`
**Files:**
- Create: `crates/xy-protocol/src/config.rs`
- Modify: `crates/xy-protocol/src/lib.rs`
- [ ] **Step 1: Write the file**
```rust
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum RestartPolicy {
Always,
OnFailure,
Never,
}
impl Default for RestartPolicy {
fn default() -> Self { RestartPolicy::OnFailure }
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RestartConfig {
#[serde(default)]
pub policy: RestartPolicy,
#[serde(default = "default_backoff_initial", with = "humantime_serde")]
pub backoff_initial: Duration,
#[serde(default = "default_backoff_max", with = "humantime_serde")]
pub backoff_max: Duration,
#[serde(default = "default_max_retries_per_minute")]
pub max_retries_per_minute: u32,
}
fn default_backoff_initial() -> Duration { Duration::from_secs(1) }
fn default_backoff_max() -> Duration { Duration::from_secs(30) }
fn default_max_retries_per_minute() -> u32 { 5 }
impl Default for RestartConfig {
fn default() -> Self {
Self {
policy: RestartPolicy::default(),
backoff_initial: default_backoff_initial(),
backoff_max: default_backoff_max(),
max_retries_per_minute: default_max_retries_per_minute(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StopConfig {
#[serde(default = "default_grace", with = "humantime_serde")]
pub grace: Duration,
}
fn default_grace() -> Duration { Duration::from_secs(10) }
impl Default for StopConfig {
fn default() -> Self { Self { grace: default_grace() } }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn restart_config_defaults() {
let c = RestartConfig::default();
assert_eq!(c.policy, RestartPolicy::OnFailure);
assert_eq!(c.backoff_initial, Duration::from_secs(1));
assert_eq!(c.backoff_max, Duration::from_secs(30));
assert_eq!(c.max_retries_per_minute, 5);
}
#[test]
fn stop_config_defaults() {
assert_eq!(StopConfig::default().grace, Duration::from_secs(10));
}
}
```
- [ ] **Step 2: Expose from `lib.rs`**
```rust
//! Wire types and config schema shared between the xy daemon and CLI.
pub mod config;
pub mod state;
pub use config::{RestartConfig, RestartPolicy, StopConfig};
pub use state::ServerState;
```
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy-protocol`
Expected: 4 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-protocol/
git commit -m "feat(protocol): RestartPolicy/RestartConfig/StopConfig with defaults"
```
### Task 4: `ServerConfig` struct + `ConfigError`
**Files:**
- Modify: `crates/xy-protocol/src/config.rs`
- Create: `crates/xy-protocol/src/error.rs`
- Modify: `crates/xy-protocol/src/lib.rs`
- [ ] **Step 1: Append `ServerConfig` to `config.rs`**
```rust
use std::collections::BTreeMap;
use std::path::PathBuf;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServerConfig {
pub name: String,
pub command: PathBuf,
#[serde(default)]
pub args: Vec<String>,
pub port: u16,
#[serde(default)]
pub env: BTreeMap<String, String>,
#[serde(default)]
pub working_dir: Option<PathBuf>,
#[serde(default)]
pub restart: RestartConfig,
#[serde(default)]
pub stop: StopConfig,
}
```
- [ ] **Step 2: Create `crates/xy-protocol/src/error.rs`**
```rust
use std::path::PathBuf;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ConfigError {
#[error("failed to read {path}: {source}")]
Io { path: PathBuf, #[source] source: std::io::Error },
#[error("failed to parse KDL in {path}: {message}")]
Parse { path: PathBuf, message: String },
#[error("missing required field `{field}` in {path}")]
MissingField { path: PathBuf, field: &'static str },
#[error("invalid value for `{field}` in {path}: {message}")]
InvalidValue { path: PathBuf, field: &'static str, message: String },
#[error("duplicate port {port} declared by both `{name_a}` and `{name_b}`")]
DuplicatePort { name_a: String, name_b: String, port: u16 },
#[error("server name `{name}` contains invalid characters (allowed: a-z, 0-9, '-', '_')")]
InvalidName { name: String },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RpcErrorCode {
ServerNotFound = -32001,
PortConflict = -32002,
ConfigInvalid = -32003,
AlreadyRunning = -32004,
NotRunning = -32005,
SpawnFailed = -32006,
}
impl RpcErrorCode {
pub fn as_i32(self) -> i32 { self as i32 }
}
```
- [ ] **Step 3: Update `lib.rs`**
```rust
//! Wire types and config schema shared between the xy daemon and CLI.
pub mod config;
pub mod error;
pub mod state;
pub use config::{RestartConfig, RestartPolicy, ServerConfig, StopConfig};
pub use error::{ConfigError, RpcErrorCode};
pub use state::ServerState;
```
- [ ] **Step 4: Run tests**
Run: `cargo test -p xy-protocol`
Expected: 4 passed (unchanged; types only).
- [ ] **Step 5: Commit**
```bash
git add crates/xy-protocol/
git commit -m "feat(protocol): ServerConfig + ConfigError + RpcErrorCode"
```
### Task 5: KDL parser for `ServerConfig`
**Files:**
- Create: `crates/xy-protocol/src/kdl_parse.rs`
- Modify: `crates/xy-protocol/src/lib.rs`
- [ ] **Step 1: Write the parser + tests in one file**
```rust
use crate::{ConfigError, RestartConfig, RestartPolicy, ServerConfig, StopConfig};
use kdl::{KdlDocument, KdlNode};
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::time::Duration;
pub fn parse_server_config(name: &str, text: &str, source_path: &Path) -> Result<ServerConfig, ConfigError> {
validate_name(name).map_err(|_| ConfigError::InvalidName { name: name.to_string() })?;
let doc: KdlDocument = text.parse().map_err(|e: kdl::KdlError| ConfigError::Parse {
path: source_path.to_path_buf(),
message: e.to_string(),
})?;
let command = require_string_arg(&doc, "command", source_path)?;
let args = optional_string_args(&doc, "args");
let port = require_u16_arg(&doc, "port", source_path)?;
let env = optional_string_map(&doc, "env");
let working_dir = optional_string_arg(&doc, "working-dir").map(PathBuf::from);
let restart = parse_restart(&doc, source_path)?;
let stop = parse_stop(&doc, source_path)?;
Ok(ServerConfig {
name: name.to_string(),
command: PathBuf::from(command),
args, port, env, working_dir, restart, stop,
})
}
fn validate_name(name: &str) -> Result<(), ()> {
if name.is_empty() { return Err(()); }
if name.chars().all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '_') {
Ok(())
} else {
Err(())
}
}
fn require_string_arg(doc: &KdlDocument, name: &'static str, path: &Path) -> Result<String, ConfigError> {
let node = find_node(doc, name).ok_or(ConfigError::MissingField { path: path.to_path_buf(), field: name })?;
node.entries().first()
.and_then(|e| e.value().as_string().map(str::to_string))
.ok_or(ConfigError::InvalidValue { path: path.to_path_buf(), field: name, message: "expected string argument".into() })
}
fn require_u16_arg(doc: &KdlDocument, name: &'static str, path: &Path) -> Result<u16, ConfigError> {
let node = find_node(doc, name).ok_or(ConfigError::MissingField { path: path.to_path_buf(), field: name })?;
let v = node.entries().first()
.and_then(|e| e.value().as_integer())
.ok_or(ConfigError::InvalidValue { path: path.to_path_buf(), field: name, message: "expected integer".into() })?;
u16::try_from(v).map_err(|_| ConfigError::InvalidValue {
path: path.to_path_buf(), field: name, message: format!("port {v} out of u16 range"),
})
}
fn optional_string_arg(doc: &KdlDocument, name: &str) -> Option<String> {
find_node(doc, name)
.and_then(|n| n.entries().first())
.and_then(|e| e.value().as_string().map(str::to_string))
}
fn optional_string_args(doc: &KdlDocument, name: &str) -> Vec<String> {
find_node(doc, name)
.map(|n| n.entries().iter()
.filter_map(|e| e.value().as_string().map(str::to_string))
.collect())
.unwrap_or_default()
}
fn optional_string_map(doc: &KdlDocument, name: &str) -> BTreeMap<String, String> {
let Some(node) = find_node(doc, name) else { return BTreeMap::new(); };
let mut out = BTreeMap::new();
if let Some(children) = node.children() {
for child in children.nodes() {
let key = child.name().value().to_string();
if let Some(val) = child.entries().first().and_then(|e| e.value().as_string()) {
out.insert(key, val.to_string());
}
}
}
out
}
fn parse_restart(doc: &KdlDocument, path: &Path) -> Result<RestartConfig, ConfigError> {
let Some(node) = find_node(doc, "restart") else { return Ok(RestartConfig::default()); };
let Some(children) = node.children() else { return Ok(RestartConfig::default()); };
let mut out = RestartConfig::default();
for child in children.nodes() {
match child.name().value() {
"policy" => {
let s = string_arg(child, "policy", path)?;
out.policy = match s.as_str() {
"always" => RestartPolicy::Always,
"on-failure" => RestartPolicy::OnFailure,
"never" => RestartPolicy::Never,
other => return Err(ConfigError::InvalidValue {
path: path.to_path_buf(), field: "restart.policy",
message: format!("unknown policy `{other}`"),
}),
};
}
"backoff-initial" => out.backoff_initial = parse_duration_arg(child, "restart.backoff-initial", path)?,
"backoff-max" => out.backoff_max = parse_duration_arg(child, "restart.backoff-max", path)?,
"max-retries-per-minute" => {
let v = child.entries().first()
.and_then(|e| e.value().as_integer())
.ok_or(ConfigError::InvalidValue {
path: path.to_path_buf(), field: "restart.max-retries-per-minute",
message: "expected integer".into(),
})?;
out.max_retries_per_minute = u32::try_from(v).map_err(|_| ConfigError::InvalidValue {
path: path.to_path_buf(), field: "restart.max-retries-per-minute",
message: format!("out of u32 range: {v}"),
})?;
}
other => return Err(ConfigError::InvalidValue {
path: path.to_path_buf(), field: "restart",
message: format!("unknown key `{other}`"),
}),
}
}
Ok(out)
}
fn parse_stop(doc: &KdlDocument, path: &Path) -> Result<StopConfig, ConfigError> {
let Some(node) = find_node(doc, "stop") else { return Ok(StopConfig::default()); };
let Some(children) = node.children() else { return Ok(StopConfig::default()); };
let mut out = StopConfig::default();
for child in children.nodes() {
match child.name().value() {
"grace" => out.grace = parse_duration_arg(child, "stop.grace", path)?,
other => return Err(ConfigError::InvalidValue {
path: path.to_path_buf(), field: "stop",
message: format!("unknown key `{other}`"),
}),
}
}
Ok(out)
}
fn string_arg(node: &KdlNode, field: &'static str, path: &Path) -> Result<String, ConfigError> {
node.entries().first()
.and_then(|e| e.value().as_string().map(str::to_string))
.ok_or(ConfigError::InvalidValue { path: path.to_path_buf(), field, message: "expected string".into() })
}
fn parse_duration_arg(node: &KdlNode, field: &'static str, path: &Path) -> Result<Duration, ConfigError> {
let s = string_arg(node, field, path)?;
humantime::parse_duration(&s).map_err(|e| ConfigError::InvalidValue {
path: path.to_path_buf(), field,
message: format!("invalid duration `{s}`: {e}"),
})
}
fn find_node<'a>(doc: &'a KdlDocument, name: &str) -> Option<&'a KdlNode> {
doc.nodes().iter().find(|n| n.name().value() == name)
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
fn p() -> &'static Path { Path::new("/tmp/test.kdl") }
#[test]
fn parses_minimal_config() {
let text = "command \"/usr/local/bin/foo\"\nport 8421\n";
let cfg = parse_server_config("foo", text, p()).unwrap();
assert_eq!(cfg.name, "foo");
assert_eq!(cfg.command, PathBuf::from("/usr/local/bin/foo"));
assert_eq!(cfg.port, 8421);
assert!(cfg.args.is_empty());
assert!(cfg.env.is_empty());
}
#[test]
fn parses_full_config() {
let text = r#"
command "/usr/local/bin/foo"
args "--http" "--port" "8421"
port 8421
env {
RUST_LOG "info"
FOO_BAR "baz"
}
working-dir "/tmp/work"
restart {
policy "always"
backoff-initial "2s"
backoff-max "1m"
max-retries-per-minute 10
}
stop {
grace "30s"
}
"#;
let cfg = parse_server_config("foo", text, p()).unwrap();
assert_eq!(cfg.args, vec!["--http", "--port", "8421"]);
assert_eq!(cfg.env.get("RUST_LOG").map(String::as_str), Some("info"));
assert_eq!(cfg.working_dir, Some(PathBuf::from("/tmp/work")));
assert_eq!(cfg.restart.policy, RestartPolicy::Always);
assert_eq!(cfg.restart.backoff_initial, Duration::from_secs(2));
assert_eq!(cfg.restart.backoff_max, Duration::from_secs(60));
assert_eq!(cfg.restart.max_retries_per_minute, 10);
assert_eq!(cfg.stop.grace, Duration::from_secs(30));
}
#[test]
fn missing_command_fails() {
let err = parse_server_config("foo", "port 8421", p()).unwrap_err();
assert!(matches!(err, ConfigError::MissingField { field: "command", .. }));
}
#[test]
fn missing_port_fails() {
let err = parse_server_config("foo", "command \"/bin/x\"", p()).unwrap_err();
assert!(matches!(err, ConfigError::MissingField { field: "port", .. }));
}
#[test]
fn unknown_restart_policy_fails() {
let text = "command \"/bin/x\"\nport 1\nrestart { policy \"maybe\" }";
let err = parse_server_config("foo", text, p()).unwrap_err();
assert!(matches!(err, ConfigError::InvalidValue { field: "restart.policy", .. }));
}
#[test]
fn invalid_name_rejected() {
let text = "command \"/bin/x\"\nport 1";
let err = parse_server_config("Foo Bar", text, p()).unwrap_err();
assert!(matches!(err, ConfigError::InvalidName { .. }));
}
}
```
- [ ] **Step 2: Expose from `lib.rs`**
Append to `lib.rs`:
```rust
pub mod kdl_parse;
pub use kdl_parse::parse_server_config;
```
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy-protocol`
Expected: 10 passed.
- [ ] **Step 4: Fix any KDL API mismatches**
If the `kdl` crate's API differs slightly in your installed version (e.g. `name().value()` vs `name().to_string()`), adapt call sites. Re-run until green.
- [ ] **Step 5: Commit**
```bash
git add crates/xy-protocol/
git commit -m "feat(protocol): KDL parser for ServerConfig"
```
### Task 6: Load all configs from a directory + duplicate-port check
**Files:**
- Modify: `crates/xy-protocol/src/kdl_parse.rs`
- Modify: `crates/xy-protocol/src/lib.rs`
- [ ] **Step 1: Append `load_all_configs` and tests**
Append to `kdl_parse.rs`:
```rust
pub fn load_all_configs(dir: &Path) -> Result<Vec<ServerConfig>, ConfigError> {
if !dir.exists() { return Ok(Vec::new()); }
let entries = std::fs::read_dir(dir).map_err(|e| ConfigError::Io {
path: dir.to_path_buf(), source: e,
})?;
let mut configs = Vec::new();
for entry in entries {
let entry = entry.map_err(|e| ConfigError::Io { path: dir.to_path_buf(), source: e })?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("kdl") { continue; }
let name = path.file_stem().and_then(|s| s.to_str())
.ok_or(ConfigError::InvalidName { name: path.display().to_string() })?
.to_string();
let text = std::fs::read_to_string(&path).map_err(|e| ConfigError::Io { path: path.clone(), source: e })?;
configs.push(parse_server_config(&name, &text, &path)?);
}
check_duplicate_ports(&configs)?;
Ok(configs)
}
fn check_duplicate_ports(configs: &[ServerConfig]) -> Result<(), ConfigError> {
let mut seen: std::collections::HashMap<u16, String> = std::collections::HashMap::new();
for c in configs {
if let Some(other) = seen.insert(c.port, c.name.clone()) {
return Err(ConfigError::DuplicatePort {
name_a: other, name_b: c.name.clone(), port: c.port,
});
}
}
Ok(())
}
```
Append tests inside the existing `mod tests`:
```rust
use tempfile::tempdir;
use std::fs;
#[test]
fn load_all_finds_and_parses_files() {
let dir = tempdir().unwrap();
fs::write(dir.path().join("a.kdl"), "command \"/bin/a\"\nport 8001").unwrap();
fs::write(dir.path().join("b.kdl"), "command \"/bin/b\"\nport 8002").unwrap();
fs::write(dir.path().join("ignored.txt"), "not a config").unwrap();
let mut configs = load_all_configs(dir.path()).unwrap();
configs.sort_by(|x, y| x.name.cmp(&y.name));
assert_eq!(configs.len(), 2);
assert_eq!(configs[0].name, "a");
assert_eq!(configs[1].port, 8002);
}
#[test]
fn load_all_returns_empty_for_missing_dir() {
let dir = tempdir().unwrap();
let configs = load_all_configs(&dir.path().join("does-not-exist")).unwrap();
assert!(configs.is_empty());
}
#[test]
fn duplicate_ports_detected() {
let dir = tempdir().unwrap();
fs::write(dir.path().join("a.kdl"), "command \"/bin/a\"\nport 8001").unwrap();
fs::write(dir.path().join("b.kdl"), "command \"/bin/b\"\nport 8001").unwrap();
let err = load_all_configs(dir.path()).unwrap_err();
match err {
ConfigError::DuplicatePort { port, .. } => assert_eq!(port, 8001),
other => panic!("unexpected error: {other:?}"),
}
}
```
- [ ] **Step 2: Expose**
Append to `lib.rs`: `pub use kdl_parse::load_all_configs;`
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy-protocol`
Expected: 13 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-protocol/
git commit -m "feat(protocol): load_all_configs from dir with duplicate port detection"
```
### Task 7: JSON-RPC method types in `rpc.rs`
**Files:**
- Create: `crates/xy-protocol/src/rpc.rs`
- Modify: `crates/xy-protocol/src/lib.rs`
- [ ] **Step 1: Write the types**
```rust
use crate::ServerState;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerSummary {
pub name: String,
pub state: ServerState,
#[serde(skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
pub port: u16,
#[serde(skip_serializing_if = "Option::is_none")]
pub uptime_secs: Option<u64>,
pub restart_count: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_exit: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusDetail {
#[serde(flatten)]
pub summary: ServerSummary,
pub recent_transitions: Vec<StateTransition>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateTransition {
pub from: ServerState,
pub to: ServerState,
pub at_unix_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum NameOrAll {
All { all: bool },
Name { name: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusParams { pub name: String }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StartResult { pub started: Vec<String>, pub already_running: Vec<String> }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StopResult { pub stopped: Vec<String>, pub not_running: Vec<String> }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RestartResult { pub restarted: Vec<String> }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReloadResult {
pub added: Vec<String>,
pub removed: Vec<String>,
pub changed: Vec<String>,
pub unchanged: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogsParams {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tail: Option<u32>,
#[serde(default)]
pub follow: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogsSubscribed { pub subscription_id: u64 }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogsCancelParams { pub subscription_id: u64 }
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum LogStream { Stdout, Stderr }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogLine {
pub subscription_id: u64,
pub name: String,
pub stream: LogStream,
pub line: String,
pub ts_unix_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEnd { pub subscription_id: u64 }
pub mod methods {
pub const LIST: &str = "list";
pub const STATUS: &str = "status";
pub const START: &str = "start";
pub const STOP: &str = "stop";
pub const RESTART: &str = "restart";
pub const RELOAD: &str = "reload";
pub const LOGS: &str = "logs";
pub const LOGS_CANCEL: &str = "logs_cancel";
}
pub mod notifications {
pub const LOG: &str = "log";
pub const LOG_END: &str = "log_end";
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn name_or_all_round_trips_name() {
let v: NameOrAll = serde_json::from_str(r#"{"name":"foo"}"#).unwrap();
match v {
NameOrAll::Name { name } => assert_eq!(name, "foo"),
_ => panic!("expected Name variant"),
}
}
#[test]
fn name_or_all_round_trips_all() {
let v: NameOrAll = serde_json::from_str(r#"{"all":true}"#).unwrap();
match v {
NameOrAll::All { all } => assert!(all),
_ => panic!("expected All variant"),
}
}
}
```
- [ ] **Step 2: Expose**
Append to `lib.rs`:
```rust
pub mod rpc;
```
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy-protocol`
Expected: 15 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-protocol/
git commit -m "feat(protocol): JSON-RPC method param/result types"
```
---
## Phase 3 — `xy-supervisor`
### Task 8: `ChildHandle` trait + `MockChild` for tests
**Files:**
- Create: `crates/xy-supervisor/src/child.rs`
- Modify: `crates/xy-supervisor/src/lib.rs`
- [ ] **Step 1: Define the trait + mock**
```rust
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
#[async_trait::async_trait]
pub trait ChildHandle: Send + 'static {
fn pid(&self) -> u32;
async fn wait(&mut self) -> std::io::Result<Option<i32>>;
fn terminate(&mut self) -> std::io::Result<()>;
fn kill(&mut self) -> std::io::Result<()>;
}
pub struct MockChild {
pid: u32,
exit_rx: Arc<Mutex<oneshot::Receiver<Option<i32>>>>,
terminate_tx: Option<oneshot::Sender<()>>,
kill_tx: Option<oneshot::Sender<()>>,
}
pub struct MockChildController {
pub exit_tx: Option<oneshot::Sender<Option<i32>>>,
pub terminate_rx: oneshot::Receiver<()>,
pub kill_rx: oneshot::Receiver<()>,
}
impl MockChild {
pub fn new(pid: u32) -> (Self, MockChildController) {
let (exit_tx, exit_rx) = oneshot::channel();
let (terminate_tx, terminate_rx) = oneshot::channel();
let (kill_tx, kill_rx) = oneshot::channel();
let child = Self {
pid,
exit_rx: Arc::new(Mutex::new(exit_rx)),
terminate_tx: Some(terminate_tx),
kill_tx: Some(kill_tx),
};
let ctl = MockChildController { exit_tx: Some(exit_tx), terminate_rx, kill_rx };
(child, ctl)
}
}
#[async_trait::async_trait]
impl ChildHandle for MockChild {
fn pid(&self) -> u32 { self.pid }
async fn wait(&mut self) -> std::io::Result<Option<i32>> {
let mut rx = self.exit_rx.lock().await;
match (&mut *rx).await {
Ok(code) => Ok(code),
Err(_) => Err(std::io::Error::other("exit_tx dropped")),
}
}
fn terminate(&mut self) -> std::io::Result<()> {
if let Some(tx) = self.terminate_tx.take() { let _ = tx.send(()); }
Ok(())
}
fn kill(&mut self) -> std::io::Result<()> {
if let Some(tx) = self.kill_tx.take() { let _ = tx.send(()); }
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn mock_child_exit() {
let (mut child, mut ctl) = MockChild::new(123);
assert_eq!(child.pid(), 123);
ctl.exit_tx.take().unwrap().send(Some(0)).unwrap();
assert_eq!(child.wait().await.unwrap(), Some(0));
}
#[tokio::test]
async fn mock_child_terminate() {
let (mut child, mut ctl) = MockChild::new(1);
child.terminate().unwrap();
ctl.terminate_rx.try_recv().unwrap();
}
}
```
- [ ] **Step 2: Update `lib.rs`**
```rust
//! Process-supervision primitives for the xy daemon.
pub mod child;
pub use child::{ChildHandle, MockChild, MockChildController};
```
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy-supervisor`
Expected: 2 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-supervisor/
git commit -m "feat(supervisor): ChildHandle trait + MockChild"
```
### Task 9: Restart-policy decision logic
**Files:**
- Create: `crates/xy-supervisor/src/policy.rs`
- Modify: `crates/xy-supervisor/src/lib.rs`
- [ ] **Step 1: Write tests + impl**
```rust
use xy_protocol::RestartPolicy;
#[derive(Debug, PartialEq, Eq)]
pub enum RestartDecision { Restart, StayStopped, MarkFailed }
pub fn decide(policy: RestartPolicy, exit_code: Option<i32>, retry_cap_reached: bool) -> RestartDecision {
let clean = matches!(exit_code, Some(0));
let want = match policy {
RestartPolicy::Never => false,
RestartPolicy::OnFailure => !clean,
RestartPolicy::Always => true,
};
if !want { return RestartDecision::StayStopped; }
if retry_cap_reached { RestartDecision::MarkFailed } else { RestartDecision::Restart }
}
#[cfg(test)]
mod tests {
use super::*;
#[test] fn never_never_restarts() {
assert_eq!(decide(RestartPolicy::Never, Some(0), false), RestartDecision::StayStopped);
assert_eq!(decide(RestartPolicy::Never, Some(1), false), RestartDecision::StayStopped);
assert_eq!(decide(RestartPolicy::Never, None, false), RestartDecision::StayStopped);
}
#[test] fn on_failure_skips_clean() {
assert_eq!(decide(RestartPolicy::OnFailure, Some(0), false), RestartDecision::StayStopped);
}
#[test] fn on_failure_restarts_nonzero() {
assert_eq!(decide(RestartPolicy::OnFailure, Some(1), false), RestartDecision::Restart);
}
#[test] fn on_failure_restarts_signal() {
assert_eq!(decide(RestartPolicy::OnFailure, None, false), RestartDecision::Restart);
}
#[test] fn always_restarts_on_clean() {
assert_eq!(decide(RestartPolicy::Always, Some(0), false), RestartDecision::Restart);
}
#[test] fn cap_reached_marks_failed() {
assert_eq!(decide(RestartPolicy::Always, Some(0), true), RestartDecision::MarkFailed);
assert_eq!(decide(RestartPolicy::OnFailure, Some(1), true), RestartDecision::MarkFailed);
}
#[test] fn cap_reached_never_still_stopped() {
assert_eq!(decide(RestartPolicy::Never, Some(1), true), RestartDecision::StayStopped);
}
}
```
- [ ] **Step 2: Expose**
Append to `lib.rs`: `pub mod policy; pub use policy::{decide, RestartDecision};`
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy-supervisor`
Expected: 9 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-supervisor/
git commit -m "feat(supervisor): restart-policy decision logic"
```
### Task 10: Backoff calculator
**Files:**
- Create: `crates/xy-supervisor/src/backoff.rs`
- Modify: `crates/xy-supervisor/src/lib.rs`
- [ ] **Step 1: Write tests + impl**
```rust
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct Backoff {
initial: Duration,
max: Duration,
current: Option<Duration>,
}
impl Backoff {
pub fn new(initial: Duration, max: Duration) -> Self {
Self { initial, max, current: None }
}
pub fn next(&mut self) -> Duration {
let next = match self.current {
None => self.initial,
Some(d) => (d * 2).min(self.max),
};
self.current = Some(next);
next
}
pub fn reset(&mut self) { self.current = None; }
}
#[cfg(test)]
mod tests {
use super::*;
#[test] fn starts_at_initial() {
let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30));
assert_eq!(b.next(), Duration::from_secs(1));
}
#[test] fn doubles_each_call() {
let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30));
assert_eq!(b.next(), Duration::from_secs(1));
assert_eq!(b.next(), Duration::from_secs(2));
assert_eq!(b.next(), Duration::from_secs(4));
assert_eq!(b.next(), Duration::from_secs(8));
}
#[test] fn caps_at_max() {
let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(5));
for _ in 0..10 { b.next(); }
assert_eq!(b.next(), Duration::from_secs(5));
}
#[test] fn reset_starts_over() {
let mut b = Backoff::new(Duration::from_secs(1), Duration::from_secs(30));
b.next(); b.next(); b.next();
b.reset();
assert_eq!(b.next(), Duration::from_secs(1));
}
}
```
- [ ] **Step 2: Expose**
Append: `pub mod backoff; pub use backoff::Backoff;`
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy-supervisor`
Expected: 13 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-supervisor/
git commit -m "feat(supervisor): exponential backoff calculator"
```
### Task 11: Sliding-60s retry-window tracker
**Files:**
- Create: `crates/xy-supervisor/src/retry_window.rs`
- Modify: `crates/xy-supervisor/src/lib.rs`
- [ ] **Step 1: Write tests + impl**
```rust
use std::collections::VecDeque;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct RetryWindow {
window: Duration,
cap: u32,
events: VecDeque<Instant>,
}
impl RetryWindow {
pub fn new(window: Duration, cap: u32) -> Self {
Self { window, cap, events: VecDeque::new() }
}
pub fn record(&mut self, now: Instant) {
self.events.push_back(now);
self.prune(now);
}
pub fn cap_reached(&mut self, now: Instant) -> bool {
self.prune(now);
self.events.len() as u32 >= self.cap
}
pub fn count(&mut self, now: Instant) -> u32 {
self.prune(now);
self.events.len() as u32
}
fn prune(&mut self, now: Instant) {
while let Some(&front) = self.events.front() {
if now.duration_since(front) > self.window { self.events.pop_front(); }
else { break; }
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test] fn below_cap_not_reached() {
let mut w = RetryWindow::new(Duration::from_secs(60), 3);
let t = Instant::now(); w.record(t); w.record(t);
assert!(!w.cap_reached(t));
}
#[test] fn at_cap_reached() {
let mut w = RetryWindow::new(Duration::from_secs(60), 3);
let t = Instant::now(); w.record(t); w.record(t); w.record(t);
assert!(w.cap_reached(t));
}
#[test] fn old_events_pruned() {
let mut w = RetryWindow::new(Duration::from_secs(60), 3);
let t0 = Instant::now(); w.record(t0); w.record(t0); w.record(t0);
let t1 = t0 + Duration::from_secs(61);
assert_eq!(w.count(t1), 0);
assert!(!w.cap_reached(t1));
}
}
```
- [ ] **Step 2: Expose**
Append: `pub mod retry_window; pub use retry_window::RetryWindow;`
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy-supervisor`
Expected: 16 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-supervisor/
git commit -m "feat(supervisor): sliding retry-window tracker"
```
### Task 12: Log file writer with rotation
**Files:**
- Create: `crates/xy-supervisor/src/logs.rs`
- Modify: `crates/xy-supervisor/src/lib.rs`
- [ ] **Step 1: Write `RotatingLogWriter` + tests**
```rust
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
pub struct RotatingLogWriter {
base: PathBuf,
max_bytes: u64,
keep: usize,
file: File,
written: u64,
}
impl RotatingLogWriter {
pub fn open(base: &Path, max_bytes: u64, keep: usize) -> std::io::Result<Self> {
if let Some(parent) = base.parent() { std::fs::create_dir_all(parent)?; }
let file = OpenOptions::new().create(true).append(true).open(base)?;
let written = file.metadata()?.len();
Ok(Self { base: base.to_path_buf(), max_bytes, keep, file, written })
}
pub fn write_line(&mut self, tag: &str, line: &str) -> std::io::Result<()> {
let bytes = format!("{tag} {line}\n");
self.file.write_all(bytes.as_bytes())?;
self.written += bytes.len() as u64;
if self.written >= self.max_bytes { self.rotate()?; }
Ok(())
}
fn rotate(&mut self) -> std::io::Result<()> {
// Drop the open file handle by replacing with /dev/null briefly.
self.file = OpenOptions::new().read(true).open("/dev/null")?;
for i in (1..self.keep).rev() {
let src = self.gen_path(i);
let dst = self.gen_path(i + 1);
if src.exists() { let _ = std::fs::rename(&src, &dst); }
}
if self.base.exists() { let _ = std::fs::rename(&self.base, self.gen_path(1)); }
self.file = OpenOptions::new().create(true).append(true).open(&self.base)?;
self.written = 0;
Ok(())
}
fn gen_path(&self, n: usize) -> PathBuf {
let mut s = self.base.as_os_str().to_os_string();
s.push(format!(".{n}"));
PathBuf::from(s)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
use std::io::Read;
#[test] fn writes_lines_with_tags() {
let dir = tempdir().unwrap();
let base = dir.path().join("x.log");
let mut w = RotatingLogWriter::open(&base, 1024, 3).unwrap();
w.write_line("[out]", "hello").unwrap();
w.write_line("[err]", "boom").unwrap();
let mut s = String::new();
File::open(&base).unwrap().read_to_string(&mut s).unwrap();
assert_eq!(s, "[out] hello\n[err] boom\n");
}
#[test] fn rotates_at_threshold() {
let dir = tempdir().unwrap();
let base = dir.path().join("x.log");
let mut w = RotatingLogWriter::open(&base, 20, 3).unwrap();
for _ in 0..5 { w.write_line("[out]", "0123456789").unwrap(); }
assert!(base.exists());
let rotated = dir.path().join("x.log.1");
assert!(rotated.exists(), "expected rotated file at {}", rotated.display());
}
}
```
- [ ] **Step 2: Expose**
Append: `pub mod logs; pub use logs::RotatingLogWriter;`
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy-supervisor`
Expected: 18 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-supervisor/
git commit -m "feat(supervisor): rotating log writer"
```
### Task 13: Ring buffer
**Files:**
- Modify: `crates/xy-supervisor/src/logs.rs`
- Modify: `crates/xy-supervisor/src/lib.rs`
- [ ] **Step 1: Append `RingBuffer` to `logs.rs`**
```rust
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
#[derive(Clone)]
pub struct RingBuffer {
inner: Arc<Mutex<RingBufferInner>>,
capacity_bytes: usize,
}
struct RingBufferInner {
lines: VecDeque<RecordedLine>,
bytes: usize,
}
#[derive(Debug, Clone)]
pub struct RecordedLine {
pub stream: xy_protocol::rpc::LogStream,
pub line: String,
pub ts_unix_ms: u64,
}
impl RingBuffer {
pub fn new(capacity_bytes: usize) -> Self {
Self {
inner: Arc::new(Mutex::new(RingBufferInner { lines: VecDeque::new(), bytes: 0 })),
capacity_bytes,
}
}
pub fn push(&self, line: RecordedLine) {
let mut g = self.inner.lock().unwrap();
g.bytes += line.line.len();
g.lines.push_back(line);
while g.bytes > self.capacity_bytes {
if let Some(removed) = g.lines.pop_front() { g.bytes -= removed.line.len(); }
else { break; }
}
}
pub fn snapshot_tail(&self, n: Option<u32>) -> Vec<RecordedLine> {
let g = self.inner.lock().unwrap();
match n {
None => g.lines.iter().cloned().collect(),
Some(n) => {
let take = (n as usize).min(g.lines.len());
let start = g.lines.len() - take;
g.lines.iter().skip(start).cloned().collect()
}
}
}
}
```
- [ ] **Step 2: Add tests inside the existing `mod tests`**
```rust
use xy_protocol::rpc::LogStream;
fn recorded(s: &str) -> RecordedLine {
RecordedLine { stream: LogStream::Stdout, line: s.to_string(), ts_unix_ms: 0 }
}
#[test] fn ring_buffer_drops_oldest_when_full() {
let rb = RingBuffer::new(10);
rb.push(recorded("aaaaa"));
rb.push(recorded("bbbbb"));
rb.push(recorded("ccc"));
let snap = rb.snapshot_tail(None);
assert_eq!(snap.len(), 2);
assert_eq!(snap[0].line, "bbbbb");
assert_eq!(snap[1].line, "ccc");
}
#[test] fn ring_buffer_tail_n() {
let rb = RingBuffer::new(1024);
for i in 0..5 { rb.push(recorded(&format!("line{i}"))); }
let snap = rb.snapshot_tail(Some(2));
assert_eq!(snap.len(), 2);
assert_eq!(snap[0].line, "line3");
assert_eq!(snap[1].line, "line4");
}
```
- [ ] **Step 3: Expose**
Append: `pub use logs::{RecordedLine, RingBuffer};`
- [ ] **Step 4: Run tests**
Run: `cargo test -p xy-supervisor`
Expected: 20 passed.
- [ ] **Step 5: Commit**
```bash
git add crates/xy-supervisor/
git commit -m "feat(supervisor): ring buffer for recent log lines"
```
### Task 14: `LogSink` — fan-out to file, ring buffer, broadcast
**Files:**
- Modify: `crates/xy-supervisor/src/logs.rs`
- Modify: `crates/xy-supervisor/src/lib.rs`
- [ ] **Step 1: Append `LogSink` to `logs.rs`**
```rust
use tokio::sync::broadcast;
use xy_protocol::rpc::{LogLine, LogStream};
const LOG_BROADCAST_CAP: usize = 256;
#[derive(Clone)]
pub struct LogSink {
pub server_name: String,
writer: Arc<Mutex<RotatingLogWriter>>,
pub ring: RingBuffer,
pub broadcast: broadcast::Sender<LogLine>,
}
impl LogSink {
pub fn new(server_name: String, writer: RotatingLogWriter, ring_capacity_bytes: usize) -> Self {
let (tx, _) = broadcast::channel(LOG_BROADCAST_CAP);
Self {
server_name,
writer: Arc::new(Mutex::new(writer)),
ring: RingBuffer::new(ring_capacity_bytes),
broadcast: tx,
}
}
pub fn record(&self, stream: LogStream, line: String) {
let ts = now_unix_ms();
let tag = match stream { LogStream::Stdout => "[out]", LogStream::Stderr => "[err]" };
if let Err(e) = self.writer.lock().unwrap().write_line(tag, &line) {
tracing::warn!(server = %self.server_name, error = %e, "log file write failed");
}
self.ring.push(RecordedLine { stream, line: line.clone(), ts_unix_ms: ts });
let _ = self.broadcast.send(LogLine {
subscription_id: 0,
name: self.server_name.clone(),
stream, line, ts_unix_ms: ts,
});
}
}
fn now_unix_ms() -> u64 {
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64).unwrap_or(0)
}
```
- [ ] **Step 2: Add a tokio test inside `mod tests`**
```rust
#[tokio::test]
async fn log_sink_records_and_broadcasts() {
let dir = tempdir().unwrap();
let writer = RotatingLogWriter::open(&dir.path().join("s.log"), 1024, 3).unwrap();
let sink = LogSink::new("s".to_string(), writer, 1024);
let mut rx = sink.broadcast.subscribe();
sink.record(LogStream::Stdout, "hello".to_string());
let got = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv()).await.unwrap().unwrap();
assert_eq!(got.line, "hello");
assert_eq!(got.stream, LogStream::Stdout);
assert_eq!(sink.ring.snapshot_tail(None).len(), 1);
}
```
- [ ] **Step 3: Expose**
Append: `pub use logs::LogSink;`
- [ ] **Step 4: Run tests**
Run: `cargo test -p xy-supervisor`
Expected: 21 passed.
- [ ] **Step 5: Commit**
```bash
git add crates/xy-supervisor/
git commit -m "feat(supervisor): LogSink fans out to file, ring buffer, broadcast"
```
### Task 15: `RealChild` — real-process implementation of `ChildHandle`
**Files:**
- Modify: `crates/xy-supervisor/src/child.rs`
- Modify: `crates/xy-supervisor/src/lib.rs`
- [ ] **Step 1: Append `RealChild` and `spawn_with_logs` to `child.rs`**
```rust
use crate::logs::LogSink;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use std::os::unix::process::CommandExt;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child as TokioChild, Command};
use xy_protocol::{rpc::LogStream, ServerConfig};
pub struct RealChild {
pid: u32,
pgid: Pid,
child: Option<TokioChild>,
}
impl RealChild {
pub fn pgid(&self) -> Pid { self.pgid }
}
#[async_trait::async_trait]
impl ChildHandle for RealChild {
fn pid(&self) -> u32 { self.pid }
async fn wait(&mut self) -> std::io::Result<Option<i32>> {
let child = self.child.as_mut().ok_or_else(|| std::io::Error::other("already waited"))?;
let status = child.wait().await?;
Ok(status.code())
}
fn terminate(&mut self) -> std::io::Result<()> {
kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGTERM)
.map_err(|e| std::io::Error::other(e.to_string()))
}
fn kill(&mut self) -> std::io::Result<()> {
kill(Pid::from_raw(-self.pgid.as_raw()), Signal::SIGKILL)
.map_err(|e| std::io::Error::other(e.to_string()))
}
}
pub fn spawn_with_logs(cfg: &ServerConfig, sink: LogSink) -> std::io::Result<RealChild> {
let mut cmd = Command::new(&cfg.command);
cmd.args(&cfg.args);
for (k, v) in &cfg.env { cmd.env(k, v); }
if let Some(dir) = &cfg.working_dir { cmd.current_dir(dir); }
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.kill_on_drop(true);
// Own process group so signals reach the whole tree.
unsafe {
cmd.pre_exec(|| {
nix::unistd::setpgid(Pid::from_raw(0), Pid::from_raw(0))
.map_err(|e| std::io::Error::other(e.to_string()))
});
}
let mut child = cmd.spawn()?;
let pid = child.id().ok_or_else(|| std::io::Error::other("no pid"))?;
let pgid = Pid::from_raw(pid as i32);
if let Some(out) = child.stdout.take() { spawn_pump(out, sink.clone(), LogStream::Stdout); }
if let Some(err) = child.stderr.take() { spawn_pump(err, sink.clone(), LogStream::Stderr); }
Ok(RealChild { pid, pgid, child: Some(child) })
}
fn spawn_pump<R: tokio::io::AsyncRead + Unpin + Send + 'static>(
reader: R, sink: LogSink, stream: LogStream,
) {
tokio::spawn(async move {
let mut lines = BufReader::new(reader).lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => sink.record(stream, line),
Ok(None) => break,
Err(e) => {
tracing::warn!(server = %sink.server_name, error = %e, ?stream, "log pump read error");
break;
}
}
}
});
}
```
- [ ] **Step 2: Expose**
Append to `lib.rs`: `pub use child::{RealChild, spawn_with_logs};`
- [ ] **Step 3: Build**
Run: `cargo build -p xy-supervisor`
Expected: compiles. Fix any nix API mismatches inline.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-supervisor/
git commit -m "feat(supervisor): RealChild + spawn_with_logs"
```
### Task 16: Supervisor task state machine
**Files:**
- Create: `crates/xy-supervisor/src/supervisor.rs`
- Modify: `crates/xy-supervisor/src/lib.rs`
- [ ] **Step 1: Define cmd types + handle**
```rust
use crate::{
backoff::Backoff, child::ChildHandle, logs::LogSink,
policy::{decide, RestartDecision}, retry_window::RetryWindow,
};
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::sleep;
use tracing::{debug, info, warn};
use xy_protocol::{ServerConfig, ServerState};
pub enum SupervisorCmd {
Start { ack: oneshot::Sender<StartAck> },
Stop { ack: oneshot::Sender<StopAck> },
Restart { ack: oneshot::Sender<()> },
Reconfigure { new: ServerConfig, ack: oneshot::Sender<()> },
Shutdown { ack: oneshot::Sender<()> },
}
#[derive(Debug, PartialEq, Eq)]
pub enum StartAck { Started, AlreadyRunning }
#[derive(Debug, PartialEq, Eq)]
pub enum StopAck { Stopped, NotRunning }
#[derive(Clone)]
pub struct SupervisorHandle {
pub name: String,
pub tx: mpsc::Sender<SupervisorCmd>,
pub state: watch::Receiver<ServerState>,
pub log_sink: LogSink,
}
#[async_trait::async_trait]
pub trait Spawner: Send + 'static {
type Child: ChildHandle;
async fn spawn(&self, cfg: &ServerConfig, sink: LogSink) -> std::io::Result<Self::Child>;
}
```
- [ ] **Step 2: Append the task + run loop**
```rust
pub struct SupervisorTask<S: Spawner> {
cfg: ServerConfig,
log_sink: LogSink,
spawner: S,
state_tx: watch::Sender<ServerState>,
cmd_rx: mpsc::Receiver<SupervisorCmd>,
backoff: Backoff,
retry_window: RetryWindow,
restart_count: u32,
last_exit: Option<i32>,
started_at: Option<Instant>,
}
impl<S: Spawner> SupervisorTask<S> {
pub fn new(
cfg: ServerConfig, log_sink: LogSink, spawner: S,
state_tx: watch::Sender<ServerState>, cmd_rx: mpsc::Receiver<SupervisorCmd>,
) -> Self {
let backoff = Backoff::new(cfg.restart.backoff_initial, cfg.restart.backoff_max);
let retry_window = RetryWindow::new(Duration::from_secs(60), cfg.restart.max_retries_per_minute);
Self { cfg, log_sink, spawner, state_tx, cmd_rx, backoff, retry_window,
restart_count: 0, last_exit: None, started_at: None }
}
fn set_state(&self, s: ServerState) { let _ = self.state_tx.send(s); }
pub async fn run(mut self) {
let mut child: Option<S::Child> = None;
loop {
tokio::select! {
cmd = self.cmd_rx.recv() => {
let Some(cmd) = cmd else { break; };
match cmd {
SupervisorCmd::Start { ack } => {
if child.is_some() { let _ = ack.send(StartAck::AlreadyRunning); }
else {
match self.do_start().await {
Ok(c) => { child = Some(c); let _ = ack.send(StartAck::Started); }
Err(e) => {
warn!(name = %self.cfg.name, error = %e, "spawn failed");
self.set_state(ServerState::Failed);
let _ = ack.send(StartAck::Started);
}
}
}
}
SupervisorCmd::Stop { ack } => {
if let Some(c) = child.take() { self.do_stop(c).await; let _ = ack.send(StopAck::Stopped); }
else { let _ = ack.send(StopAck::NotRunning); }
}
SupervisorCmd::Restart { ack } => {
if let Some(c) = child.take() {
self.set_state(ServerState::Restarting);
self.do_stop(c).await;
}
match self.do_start().await {
Ok(c) => child = Some(c),
Err(e) => {
warn!(name = %self.cfg.name, error = %e, "restart spawn failed");
self.set_state(ServerState::Failed);
}
}
let _ = ack.send(());
}
SupervisorCmd::Reconfigure { new, ack } => {
self.cfg = new;
self.backoff = Backoff::new(self.cfg.restart.backoff_initial, self.cfg.restart.backoff_max);
self.retry_window = RetryWindow::new(Duration::from_secs(60), self.cfg.restart.max_retries_per_minute);
let _ = ack.send(());
}
SupervisorCmd::Shutdown { ack } => {
if let Some(c) = child.take() { self.do_stop(c).await; }
let _ = ack.send(());
return;
}
}
}
code = wait_child(&mut child) => {
self.last_exit = code;
let now = Instant::now();
self.retry_window.record(now);
let cap = self.retry_window.cap_reached(now);
let decision = decide(self.cfg.restart.policy, code, cap);
debug!(name = %self.cfg.name, ?code, ?decision, "child exited");
match decision {
RestartDecision::StayStopped => {
self.started_at = None;
self.set_state(ServerState::Stopped);
}
RestartDecision::MarkFailed => {
self.started_at = None;
self.set_state(ServerState::Failed);
}
RestartDecision::Restart => {
self.set_state(ServerState::Restarting);
let delay = self.backoff.next();
sleep(delay).await;
match self.do_start().await {
Ok(c) => child = Some(c),
Err(e) => {
warn!(name = %self.cfg.name, error = %e, "restart spawn failed");
self.set_state(ServerState::Failed);
}
}
}
}
}
}
}
}
async fn do_start(&mut self) -> std::io::Result<S::Child> {
self.set_state(ServerState::Starting);
let c = self.spawner.spawn(&self.cfg, self.log_sink.clone()).await?;
self.restart_count = self.restart_count.saturating_add(1);
self.started_at = Some(Instant::now());
self.backoff.reset();
self.set_state(ServerState::Running);
info!(name = %self.cfg.name, pid = c.pid(), "started");
Ok(c)
}
async fn do_stop(&mut self, mut c: S::Child) {
self.set_state(ServerState::Stopping);
let _ = c.terminate();
let grace = self.cfg.stop.grace;
match tokio::time::timeout(grace, c.wait()).await {
Ok(_) => {}
Err(_) => {
let _ = c.kill();
let _ = c.wait().await;
}
}
self.started_at = None;
self.set_state(ServerState::Stopped);
}
}
async fn wait_child<C: ChildHandle>(slot: &mut Option<C>) -> Option<i32> {
match slot.as_mut() {
Some(c) => c.wait().await.ok().flatten(),
None => std::future::pending().await,
}
}
pub struct RealSpawner;
#[async_trait::async_trait]
impl Spawner for RealSpawner {
type Child = crate::child::RealChild;
async fn spawn(&self, cfg: &ServerConfig, sink: LogSink) -> std::io::Result<Self::Child> {
crate::child::spawn_with_logs(cfg, sink)
}
}
```
- [ ] **Step 3: Add a smoke test in supervisor.rs**
```rust
#[cfg(test)]
mod tests {
use super::*;
use crate::child::{MockChild, MockChildController};
use crate::logs::{LogSink, RotatingLogWriter};
use std::sync::{Arc, Mutex};
use tempfile::tempdir;
use xy_protocol::{RestartConfig, RestartPolicy, StopConfig};
struct QueueSpawner {
queue: Arc<Mutex<Vec<MockChild>>>,
}
#[async_trait::async_trait]
impl Spawner for QueueSpawner {
type Child = MockChild;
async fn spawn(&self, _cfg: &ServerConfig, _sink: LogSink) -> std::io::Result<MockChild> {
let mut q = self.queue.lock().unwrap();
Ok(q.remove(0))
}
}
fn cfg(name: &str, policy: RestartPolicy, max_retries: u32) -> ServerConfig {
ServerConfig {
name: name.to_string(),
command: "/bin/true".into(),
args: vec![], port: 1,
env: Default::default(), working_dir: None,
restart: RestartConfig {
policy,
backoff_initial: Duration::from_millis(1),
backoff_max: Duration::from_millis(1),
max_retries_per_minute: max_retries,
},
stop: StopConfig { grace: Duration::from_millis(50) },
}
}
fn sink(name: &str) -> LogSink {
let dir = tempdir().unwrap();
let writer = RotatingLogWriter::open(&dir.path().join("s.log"), 1024, 3).unwrap();
std::mem::forget(dir);
LogSink::new(name.to_string(), writer, 1024)
}
async fn wait_for(rx: &mut watch::Receiver<ServerState>, want: ServerState) {
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
if *rx.borrow() == want { return; }
tokio::select! {
_ = rx.changed() => {}
_ = tokio::time::sleep_until(deadline) => panic!("never reached {want:?}, last={:?}", *rx.borrow()),
}
}
}
#[tokio::test]
async fn start_runs_to_running_and_stop_to_stopped() {
let (mock, mut ctl) = MockChild::new(1);
let queue = Arc::new(Mutex::new(vec![mock]));
let spawner = QueueSpawner { queue };
let (state_tx, mut state_rx) = watch::channel(ServerState::Stopped);
let (cmd_tx, cmd_rx) = mpsc::channel(8);
let task = SupervisorTask::new(cfg("x", RestartPolicy::Never, 5), sink("x"), spawner, state_tx, cmd_rx);
let h = tokio::spawn(task.run());
let (ack_tx, ack_rx) = oneshot::channel();
cmd_tx.send(SupervisorCmd::Start { ack: ack_tx }).await.unwrap();
assert_eq!(ack_rx.await.unwrap(), StartAck::Started);
wait_for(&mut state_rx, ServerState::Running).await;
// Trigger child exit via the controller.
ctl.exit_tx.take().unwrap().send(Some(0)).unwrap();
wait_for(&mut state_rx, ServerState::Stopped).await;
let (ack_tx, ack_rx) = oneshot::channel();
cmd_tx.send(SupervisorCmd::Shutdown { ack: ack_tx }).await.unwrap();
ack_rx.await.unwrap();
h.await.unwrap();
}
}
```
- [ ] **Step 4: Update `lib.rs`**
```rust
pub mod supervisor;
pub use supervisor::{
RealSpawner, Spawner, StartAck, StopAck, SupervisorCmd, SupervisorHandle, SupervisorTask,
};
```
- [ ] **Step 5: Run tests**
Run: `cargo test -p xy-supervisor`
Expected: 22 passed.
- [ ] **Step 6: Commit**
```bash
git add crates/xy-supervisor/
git commit -m "feat(supervisor): supervisor task with state machine"
```
---
## Phase 4 — `xy-ipc`
### Task 17: JSON-RPC envelope types
**Files:**
- Create: `crates/xy-ipc/src/envelope.rs`
- Modify: `crates/xy-ipc/src/lib.rs`
- [ ] **Step 1: Define envelope types**
```rust
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Request {
pub jsonrpc: String,
pub id: serde_json::Value,
pub method: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub params: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Notification {
pub jsonrpc: String,
pub method: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub params: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Response {
pub jsonrpc: String,
pub id: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<RpcError>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpcError {
pub code: i32,
pub message: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Incoming {
Request(Request),
Response(Response),
Notification(Notification),
}
pub const JSONRPC_VERSION: &str = "2.0";
pub fn request(id: u64, method: &str, params: Option<Value>) -> Request {
Request { jsonrpc: JSONRPC_VERSION.into(), id: serde_json::json!(id), method: method.into(), params }
}
pub fn notification(method: &str, params: Option<Value>) -> Notification {
Notification { jsonrpc: JSONRPC_VERSION.into(), method: method.into(), params }
}
pub fn ok_response(id: serde_json::Value, result: Value) -> Response {
Response { jsonrpc: JSONRPC_VERSION.into(), id, result: Some(result), error: None }
}
pub fn err_response(id: serde_json::Value, code: i32, message: String) -> Response {
Response { jsonrpc: JSONRPC_VERSION.into(), id, result: None,
error: Some(RpcError { code, message, data: None }) }
}
#[cfg(test)]
mod tests {
use super::*;
#[test] fn request_round_trip() {
let r = request(7, "list", None);
let s = serde_json::to_string(&r).unwrap();
let back: Request = serde_json::from_str(&s).unwrap();
assert_eq!(back.method, "list");
assert_eq!(back.id, serde_json::json!(7));
}
#[test] fn incoming_is_response() {
let s = r#"{"jsonrpc":"2.0","id":1,"result":{"ok":true}}"#;
let i: Incoming = serde_json::from_str(s).unwrap();
assert!(matches!(i, Incoming::Response(_)));
}
#[test] fn incoming_is_notification() {
let s = r#"{"jsonrpc":"2.0","method":"log","params":{}}"#;
let i: Incoming = serde_json::from_str(s).unwrap();
assert!(matches!(i, Incoming::Notification(_)));
}
}
```
- [ ] **Step 2: Expose**
```rust
//! JSON-RPC 2.0 over newline-delimited JSON on a Unix socket.
pub mod envelope;
pub use envelope::{
err_response, notification, ok_response, request, Incoming, Notification, Request, Response, RpcError,
};
```
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy-ipc`
Expected: 3 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-ipc/
git commit -m "feat(ipc): JSON-RPC envelope types"
```
### Task 18: Newline-delimited framing helpers
**Files:**
- Create: `crates/xy-ipc/src/framing.rs`
- Modify: `crates/xy-ipc/src/lib.rs`
- [ ] **Step 1: Implement `JsonFramed`**
```rust
use serde::Serialize;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
pub struct JsonFramed {
reader: BufReader<tokio::net::unix::OwnedReadHalf>,
writer: tokio::net::unix::OwnedWriteHalf,
}
impl JsonFramed {
pub fn new(stream: UnixStream) -> Self {
let (r, w) = stream.into_split();
Self { reader: BufReader::new(r), writer: w }
}
pub async fn read<T: serde::de::DeserializeOwned>(&mut self) -> std::io::Result<Option<T>> {
let mut buf = String::new();
let n = self.reader.read_line(&mut buf).await?;
if n == 0 { return Ok(None); }
let v: T = serde_json::from_str(buf.trim_end())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
Ok(Some(v))
}
pub async fn write<T: Serialize>(&mut self, value: &T) -> std::io::Result<()> {
let mut bytes = serde_json::to_vec(value)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
bytes.push(b'\n');
self.writer.write_all(&bytes).await?;
self.writer.flush().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct M { x: u32, name: String }
#[tokio::test]
async fn round_trip_over_socket_pair() {
let (a, b) = UnixStream::pair().unwrap();
let mut sa = JsonFramed::new(a);
let mut sb = JsonFramed::new(b);
sa.write(&M { x: 1, name: "hi".into() }).await.unwrap();
let got: Option<M> = sb.read().await.unwrap();
assert_eq!(got, Some(M { x: 1, name: "hi".into() }));
}
#[tokio::test]
async fn eof_returns_none() {
let (a, b) = UnixStream::pair().unwrap();
drop(a);
let mut sb = JsonFramed::new(b);
let got: Option<M> = sb.read().await.unwrap();
assert!(got.is_none());
}
}
```
- [ ] **Step 2: Expose**
Append: `pub mod framing; pub use framing::JsonFramed;`
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy-ipc`
Expected: 5 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-ipc/
git commit -m "feat(ipc): newline-delimited JSON framing"
```
### Task 19: Client helper (call + subscribe)
**Files:**
- Create: `crates/xy-ipc/src/client.rs`
- Modify: `crates/xy-ipc/src/lib.rs`
- [ ] **Step 1: Implement client**
```rust
use crate::envelope::{Incoming, Notification, Request};
use crate::framing::JsonFramed;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::path::Path;
use thiserror::Error;
use tokio::net::UnixStream;
#[derive(Debug, Error)]
pub enum ClientError {
#[error("io: {0}")] Io(#[from] std::io::Error),
#[error("rpc error {code}: {message}")] Rpc { code: i32, message: String },
#[error("unexpected message kind from daemon")] Unexpected,
#[error("daemon unreachable: {0}")] Unreachable(std::io::Error),
#[error("serialization: {0}")] Serde(#[from] serde_json::Error),
}
pub struct Client {
framed: JsonFramed,
next_id: u64,
}
impl Client {
pub async fn connect(socket_path: &Path) -> Result<Self, ClientError> {
let stream = UnixStream::connect(socket_path).await.map_err(ClientError::Unreachable)?;
Ok(Self { framed: JsonFramed::new(stream), next_id: 1 })
}
pub async fn call<P: Serialize, R: DeserializeOwned>(&mut self, method: &str, params: &P) -> Result<R, ClientError> {
let id = self.next_id; self.next_id += 1;
let params_val = serde_json::to_value(params)?;
let req = crate::envelope::request(id, method, Some(params_val));
self.framed.write(&req).await?;
loop {
let msg: Option<Incoming> = self.framed.read().await?;
let Some(msg) = msg else {
return Err(ClientError::Unreachable(std::io::Error::from(std::io::ErrorKind::UnexpectedEof)));
};
match msg {
Incoming::Response(r) => {
if r.id != serde_json::json!(id) { return Err(ClientError::Unexpected); }
if let Some(err) = r.error { return Err(ClientError::Rpc { code: err.code, message: err.message }); }
let result = r.result.unwrap_or(serde_json::Value::Null);
return Ok(serde_json::from_value(result)?);
}
Incoming::Notification(_) => continue,
Incoming::Request(_) => return Err(ClientError::Unexpected),
}
}
}
pub async fn call_no_params<R: DeserializeOwned>(&mut self, method: &str) -> Result<R, ClientError> {
let id = self.next_id; self.next_id += 1;
let req = Request { jsonrpc: "2.0".into(), id: serde_json::json!(id), method: method.into(), params: None };
self.framed.write(&req).await?;
let msg: Option<Incoming> = self.framed.read().await?;
let Some(Incoming::Response(r)) = msg else { return Err(ClientError::Unexpected); };
if let Some(err) = r.error { return Err(ClientError::Rpc { code: err.code, message: err.message }); }
Ok(serde_json::from_value(r.result.unwrap_or(serde_json::Value::Null))?)
}
pub async fn read_notification(&mut self) -> Result<Option<Notification>, ClientError> {
loop {
let msg: Option<Incoming> = self.framed.read().await?;
match msg {
None => return Ok(None),
Some(Incoming::Notification(n)) => return Ok(Some(n)),
Some(Incoming::Response(_)) => continue,
Some(Incoming::Request(_)) => return Err(ClientError::Unexpected),
}
}
}
pub async fn send_notification(&mut self, n: &Notification) -> Result<(), ClientError> {
self.framed.write(n).await?; Ok(())
}
}
```
- [ ] **Step 2: Expose**
Append: `pub mod client; pub use client::{Client, ClientError};`
- [ ] **Step 3: Build**
Run: `cargo build -p xy-ipc`
Expected: compiles.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-ipc/
git commit -m "feat(ipc): client with call + notification reader"
```
### Task 20: Server helper (bind + Connection)
**Files:**
- Create: `crates/xy-ipc/src/server.rs`
- Modify: `crates/xy-ipc/src/lib.rs`
- [ ] **Step 1: Implement server primitives**
```rust
use crate::envelope::{Incoming, Notification, Response};
use crate::framing::JsonFramed;
use std::path::Path;
use std::sync::Arc;
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::Mutex;
pub struct Connection {
inner: Arc<Mutex<JsonFramed>>,
}
impl Connection {
pub fn new(stream: UnixStream) -> Self {
Self { inner: Arc::new(Mutex::new(JsonFramed::new(stream))) }
}
pub async fn read_incoming(&self) -> std::io::Result<Option<Incoming>> {
let mut g = self.inner.lock().await;
g.read::<Incoming>().await
}
pub async fn write_response(&self, r: &Response) -> std::io::Result<()> {
let mut g = self.inner.lock().await;
g.write(r).await
}
pub async fn write_notification(&self, n: &Notification) -> std::io::Result<()> {
let mut g = self.inner.lock().await;
g.write(n).await
}
}
pub fn bind(socket_path: &Path) -> std::io::Result<UnixListener> {
if socket_path.exists() { std::fs::remove_file(socket_path)?; }
if let Some(parent) = socket_path.parent() { std::fs::create_dir_all(parent)?; }
let listener = UnixListener::bind(socket_path)?;
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(socket_path, std::fs::Permissions::from_mode(0o600))?;
Ok(listener)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn bind_creates_socket_with_0600() {
let dir = tempdir().unwrap();
let path = dir.path().join("x.sock");
let _listener = bind(&path).unwrap();
use std::os::unix::fs::PermissionsExt;
let mode = std::fs::metadata(&path).unwrap().permissions().mode() & 0o777;
assert_eq!(mode, 0o600);
}
}
```
- [ ] **Step 2: Expose**
Append: `pub mod server; pub use server::{bind, Connection};`
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy-ipc`
Expected: 6 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy-ipc/
git commit -m "feat(ipc): server bind + Connection wrapper"
```
---
## Phase 5 — `xy` binary
### Task 21: XDG path resolution
**Files:**
- Create: `crates/xy/src/paths.rs`
- Modify: `crates/xy/src/main.rs`
- [ ] **Step 1: Implement**
```rust
use etcetera::base_strategy::{BaseStrategy, Xdg};
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub struct Paths {
pub config_dir: PathBuf,
pub state_dir: PathBuf,
pub log_dir: PathBuf,
pub socket: PathBuf,
pub pidfile: PathBuf,
}
impl Paths {
pub fn resolve() -> std::io::Result<Self> {
let xdg = Xdg::new().map_err(std::io::Error::other)?;
let config_dir = xdg.config_dir().join("xy").join("servers");
let state_dir = xdg.state_dir().unwrap_or_else(|| xdg.data_dir()).join("xy");
let log_dir = state_dir.join("logs");
let socket = std::env::var_os("XDG_RUNTIME_DIR")
.map(|p| PathBuf::from(p).join("xy.sock"))
.unwrap_or_else(|| state_dir.join("xy.sock"));
let pidfile = state_dir.join("xy.pid");
Ok(Self { config_dir, state_dir, log_dir, socket, pidfile })
}
pub fn ensure_dirs(&self) -> std::io::Result<()> {
std::fs::create_dir_all(&self.state_dir)?;
std::fs::create_dir_all(&self.log_dir)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test] fn resolves_without_panicking() {
let p = Paths::resolve().unwrap();
assert!(p.pidfile.starts_with(&p.state_dir));
}
}
```
- [ ] **Step 2: Reference from `main.rs`**
```rust
mod paths;
fn main() {
let p = paths::Paths::resolve().unwrap();
eprintln!("xy: socket would be at {}", p.socket.display());
}
```
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy`
Expected: 1 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy/
git commit -m "feat(xy): XDG path resolution"
```
### Task 22: Pidfile guard
**Files:**
- Create: `crates/xy/src/pidfile.rs`
- Modify: `crates/xy/src/main.rs`
- [ ] **Step 1: Implement**
```rust
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::os::unix::fs::OpenOptionsExt;
use std::path::{Path, PathBuf};
pub struct PidFile {
path: PathBuf,
_file: File,
}
impl PidFile {
pub fn acquire(path: &Path) -> std::io::Result<Self> {
let mut f = OpenOptions::new()
.write(true).create_new(true).mode(0o600).open(path)?;
writeln!(f, "{}", std::process::id())?;
Ok(Self { path: path.to_path_buf(), _file: f })
}
}
impl Drop for PidFile {
fn drop(&mut self) { let _ = std::fs::remove_file(&self.path); }
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test] fn acquire_then_second_fails() {
let dir = tempdir().unwrap();
let p = dir.path().join("x.pid");
let _g = PidFile::acquire(&p).unwrap();
let err = PidFile::acquire(&p).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::AlreadyExists);
}
#[test] fn drop_removes_file() {
let dir = tempdir().unwrap();
let p = dir.path().join("x.pid");
{ let _g = PidFile::acquire(&p).unwrap(); assert!(p.exists()); }
assert!(!p.exists());
}
}
```
- [ ] **Step 2: Reference from `main.rs`**
```rust
mod paths;
mod pidfile;
fn main() {
let p = paths::Paths::resolve().unwrap();
eprintln!("xy: socket would be at {}", p.socket.display());
}
```
- [ ] **Step 3: Run tests**
Run: `cargo test -p xy`
Expected: 3 passed.
- [ ] **Step 4: Commit**
```bash
git add crates/xy/
git commit -m "feat(xy): exclusive pidfile guard"
```
### Task 23: clap CLI definitions
**Files:**
- Modify: `crates/xy/src/main.rs`
- Create: `crates/xy/src/cli/mod.rs` (stub)
- Create: `crates/xy/src/daemon/mod.rs` (stub)
- [ ] **Step 1: Replace `main.rs`**
```rust
use clap::{Parser, Subcommand};
mod cli;
mod daemon;
mod paths;
mod pidfile;
#[derive(Debug, Parser)]
#[command(name = "xy", version, about = "HTTP MCP server supervisor")]
struct Cli {
#[command(subcommand)]
cmd: Cmd,
}
#[derive(Debug, Subcommand)]
enum Cmd {
/// Run the daemon in the foreground.
Daemon,
/// List all configured servers with state.
List,
/// Show detailed status for a single server.
Status { name: String },
/// Start a server (or all configured servers with --all).
Start {
#[arg(long, conflicts_with = "name")] all: bool,
#[arg(required_unless_present = "all")] name: Option<String>,
},
/// Stop a server (or --all).
Stop {
#[arg(long, conflicts_with = "name")] all: bool,
#[arg(required_unless_present = "all")] name: Option<String>,
},
/// Restart a server (or --all).
Restart {
#[arg(long, conflicts_with = "name")] all: bool,
#[arg(required_unless_present = "all")] name: Option<String>,
},
/// Re-read config dir and reconcile running servers.
Reload,
/// Stream a server's log.
Logs {
name: String,
#[arg(long)] tail: Option<u32>,
#[arg(short = 'f', long)] follow: bool,
},
}
#[tokio::main]
async fn main() -> std::process::ExitCode {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")))
.with_writer(std::io::stderr)
.init();
let cli = Cli::parse();
let paths = match paths::Paths::resolve() {
Ok(p) => p,
Err(e) => { eprintln!("xy: failed to resolve XDG paths: {e}"); return std::process::ExitCode::from(3); }
};
let result: anyhow::Result<i32> = match cli.cmd {
Cmd::Daemon => daemon::run(paths).await.map(|_| 0),
Cmd::List => cli::list(paths).await,
Cmd::Status { name } => cli::status(paths, name).await,
Cmd::Start { all, name } => cli::start(paths, all, name).await,
Cmd::Stop { all, name } => cli::stop(paths, all, name).await,
Cmd::Restart { all, name } => cli::restart(paths, all, name).await,
Cmd::Reload => cli::reload(paths).await,
Cmd::Logs { name, tail, follow } => cli::logs(paths, name, tail, follow).await,
};
match result {
Ok(code) => std::process::ExitCode::from(code as u8),
Err(e) => { eprintln!("xy: {e:#}"); std::process::ExitCode::from(1) }
}
}
```
- [ ] **Step 2: Stub `cli/mod.rs`**
```rust
use crate::paths::Paths;
use anyhow::{bail, Result};
pub async fn list(_p: Paths) -> Result<i32> { bail!("not implemented") }
pub async fn status(_p: Paths, _name: String) -> Result<i32> { bail!("not implemented") }
pub async fn start(_p: Paths, _all: bool, _name: Option<String>) -> Result<i32> { bail!("not implemented") }
pub async fn stop(_p: Paths, _all: bool, _name: Option<String>) -> Result<i32> { bail!("not implemented") }
pub async fn restart(_p: Paths, _all: bool, _name: Option<String>) -> Result<i32> { bail!("not implemented") }
pub async fn reload(_p: Paths) -> Result<i32> { bail!("not implemented") }
pub async fn logs(_p: Paths, _name: String, _tail: Option<u32>, _follow: bool) -> Result<i32> { bail!("not implemented") }
```
- [ ] **Step 3: Stub `daemon/mod.rs`**
```rust
use crate::paths::Paths;
use anyhow::{bail, Result};
pub async fn run(_paths: Paths) -> Result<()> { bail!("not implemented") }
```
- [ ] **Step 4: Build + verify --help**
Run: `cargo build -p xy`
Run: `cargo run -p xy -- --help`
Expected: prints help with all subcommands.
- [ ] **Step 5: Commit**
```bash
git add crates/xy/
git commit -m "feat(xy): clap CLI scaffold"
```
### Task 24: Daemon `Registry` with config-hash entry
**Files:**
- Create: `crates/xy/src/daemon/registry.rs`
- Modify: `crates/xy/src/daemon/mod.rs`
- [ ] **Step 1: Implement registry with `Entry { handle, config_hash }`**
```rust
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use xy_supervisor::SupervisorHandle;
#[derive(Clone)]
pub struct Entry {
pub handle: SupervisorHandle,
pub config_hash: u64,
}
#[derive(Clone, Default)]
pub struct Registry {
inner: Arc<RwLock<HashMap<String, Entry>>>,
}
impl Registry {
pub fn new() -> Self { Self::default() }
pub async fn insert(&self, name: String, entry: Entry) {
self.inner.write().await.insert(name, entry);
}
pub async fn remove(&self, name: &str) -> Option<Entry> {
self.inner.write().await.remove(name)
}
pub async fn get(&self, name: &str) -> Option<Entry> {
self.inner.read().await.get(name).cloned()
}
pub async fn names(&self) -> Vec<String> {
let g = self.inner.read().await;
let mut v: Vec<String> = g.keys().cloned().collect();
v.sort(); v
}
pub async fn snapshot(&self) -> Vec<(String, Entry)> {
let g = self.inner.read().await;
let mut v: Vec<_> = g.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
v.sort_by(|a, b| a.0.cmp(&b.0));
v
}
}
```
- [ ] **Step 2: Add `pub mod registry;` to `daemon/mod.rs`**
```rust
use crate::paths::Paths;
use anyhow::{bail, Result};
pub mod registry;
pub async fn run(_paths: Paths) -> Result<()> { bail!("not implemented") }
```
- [ ] **Step 3: Build**
Run: `cargo build -p xy`
Expected: compiles.
- [ ] **Step 4: Commit**
```bash
git add crates/xy/
git commit -m "feat(xy): daemon Registry with config-hash entries"
```
### Task 25: Daemon entry point — boot + accept + shutdown
**Files:**
- Modify: `crates/xy/src/daemon/mod.rs`
- Create: `crates/xy/src/daemon/shutdown.rs`
- Create: `crates/xy/src/daemon/handlers.rs` (stub)
- [ ] **Step 1: Implement `daemon/mod.rs`**
```rust
use crate::paths::Paths;
use crate::pidfile::PidFile;
use anyhow::{Context, Result};
use std::sync::{Arc, OnceLock};
use tokio::sync::{mpsc, oneshot, watch};
use tracing::{error, info};
use xy_ipc::{bind, Connection};
use xy_protocol::{kdl_parse::load_all_configs, ServerConfig, ServerState};
use xy_supervisor::{
logs::{LogSink, RotatingLogWriter},
supervisor::{RealSpawner, SupervisorCmd, SupervisorHandle, SupervisorTask},
};
pub mod handlers;
pub mod registry;
pub mod shutdown;
const LOG_FILE_MAX_BYTES: u64 = 10 * 1024 * 1024;
const LOG_FILE_KEEP: usize = 5;
const RING_BUFFER_BYTES: usize = 1024 * 1024;
pub static PATHS: OnceLock<Paths> = OnceLock::new();
pub fn config_hash(cfg: &ServerConfig) -> u64 {
use std::hash::{Hash, Hasher};
let mut h = std::collections::hash_map::DefaultHasher::new();
serde_json::to_string(cfg).unwrap().hash(&mut h);
h.finish()
}
pub fn spawn_supervisor(paths: &Paths, cfg: ServerConfig) -> Result<SupervisorHandle> {
let log_path = paths.log_dir.join(format!("{}.log", cfg.name));
let writer = RotatingLogWriter::open(&log_path, LOG_FILE_MAX_BYTES, LOG_FILE_KEEP)
.with_context(|| format!("open log file {}", log_path.display()))?;
let sink = LogSink::new(cfg.name.clone(), writer, RING_BUFFER_BYTES);
let (state_tx, state_rx) = watch::channel(ServerState::Stopped);
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let name = cfg.name.clone();
let task = SupervisorTask::new(cfg, sink.clone(), RealSpawner, state_tx, cmd_rx);
tokio::spawn(task.run());
Ok(SupervisorHandle { name, tx: cmd_tx, state: state_rx, log_sink: sink })
}
pub async fn run(paths: Paths) -> Result<()> {
paths.ensure_dirs().context("create state dirs")?;
let _pid = PidFile::acquire(&paths.pidfile)
.context("another xy daemon appears to be running")?;
let listener = bind(&paths.socket).context("bind unix socket")?;
let _ = PATHS.set(paths.clone());
info!(socket = %paths.socket.display(), "daemon listening");
let configs = load_all_configs(&paths.config_dir).context("load configs")?;
let registry = registry::Registry::new();
for cfg in configs {
let hash = config_hash(&cfg);
let handle = spawn_supervisor(&paths, cfg)?;
let name = handle.name.clone();
registry.insert(name.clone(), registry::Entry { handle: handle.clone(), config_hash: hash }).await;
let (ack_tx, ack_rx) = oneshot::channel();
if handle.tx.send(SupervisorCmd::Start { ack: ack_tx }).await.is_ok() {
let _ = ack_rx.await;
}
}
let registry_for_shutdown = registry.clone();
let shutdown_signal = shutdown::install();
let accept = async {
loop {
let (stream, _addr) = match listener.accept().await {
Ok(p) => p,
Err(e) => { error!(error = %e, "accept failed"); continue; }
};
let conn = Arc::new(Connection::new(stream));
let reg = registry.clone();
let paths_clone = paths.clone();
tokio::spawn(async move {
if let Err(e) = handlers::serve(conn, reg, paths_clone).await {
error!(error = %e, "connection ended with error");
}
});
}
};
tokio::select! {
_ = accept => {}
_ = shutdown_signal => { info!("shutdown signal received"); }
}
shutdown::shutdown_all(registry_for_shutdown).await;
Ok(())
}
```
- [ ] **Step 2: Create `shutdown.rs`**
```rust
use crate::daemon::registry::Registry;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::oneshot;
use xy_supervisor::supervisor::SupervisorCmd;
pub fn install() -> impl std::future::Future<Output = ()> {
let mut term = signal(SignalKind::terminate()).expect("install SIGTERM handler");
let mut int = signal(SignalKind::interrupt()).expect("install SIGINT handler");
async move {
tokio::select! {
_ = term.recv() => {}
_ = int.recv() => {}
}
}
}
pub async fn shutdown_all(reg: Registry) {
let snapshot = reg.snapshot().await;
let mut acks = Vec::new();
for (_name, entry) in &snapshot {
let (tx, rx) = oneshot::channel();
if entry.handle.tx.send(SupervisorCmd::Shutdown { ack: tx }).await.is_ok() {
acks.push(rx);
}
}
let deadline = tokio::time::Duration::from_secs(30);
let _ = tokio::time::timeout(deadline, async {
for rx in acks { let _ = rx.await; }
}).await;
}
```
- [ ] **Step 3: Stub `handlers.rs`**
```rust
use crate::daemon::registry::Registry;
use crate::paths::Paths;
use std::sync::Arc;
use xy_ipc::Connection;
pub async fn serve(_conn: Arc<Connection>, _reg: Registry, _paths: Paths) -> std::io::Result<()> {
Ok(())
}
```
- [ ] **Step 4: Build**
Run: `cargo build -p xy`
Expected: compiles.
- [ ] **Step 5: Commit**
```bash
git add crates/xy/
git commit -m "feat(xy): daemon boot + accept loop + graceful shutdown"
```
### Task 26: RPC handlers — list/status/start/stop/restart
**Files:**
- Modify: `crates/xy/src/daemon/handlers.rs`
- [ ] **Step 1: Replace stub with full dispatch**
```rust
use crate::daemon::registry::Registry;
use crate::paths::Paths;
use std::sync::Arc;
use tokio::sync::oneshot;
use xy_ipc::envelope::{err_response, ok_response, Incoming, Request, Response};
use xy_ipc::Connection;
use xy_protocol::rpc::{
methods, NameOrAll, RestartResult, ServerSummary, StartResult, StatusDetail, StopResult,
};
use xy_protocol::RpcErrorCode;
use xy_supervisor::supervisor::{StartAck, StopAck, SupervisorCmd};
pub async fn serve(conn: Arc<Connection>, reg: Registry, _paths: Paths) -> std::io::Result<()> {
loop {
let Some(incoming) = conn.read_incoming().await? else { return Ok(()); };
match incoming {
Incoming::Request(req) => {
let resp = handle_request(req, &reg).await;
conn.write_response(&resp).await?;
}
_ => continue,
}
}
}
struct ApiError { code: i32, message: String }
impl ApiError {
fn rpc(c: RpcErrorCode, msg: impl Into<String>) -> Self {
Self { code: c.as_i32(), message: msg.into() }
}
}
async fn handle_request(req: Request, reg: &Registry) -> Response {
let id = req.id.clone();
let method = req.method.as_str();
let params = req.params.unwrap_or(serde_json::Value::Null);
match method {
methods::LIST => match list(reg).await {
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
Err(e) => err_response(id, e.code, e.message),
},
methods::STATUS => {
let p: xy_protocol::rpc::StatusParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => return err_response(id, -32602, format!("invalid params: {e}")),
};
match status(reg, &p.name).await {
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
Err(e) => err_response(id, e.code, e.message),
}
}
methods::START => dispatch_lifecycle(id, params, reg, Op::Start).await,
methods::STOP => dispatch_lifecycle(id, params, reg, Op::Stop).await,
methods::RESTART => dispatch_lifecycle(id, params, reg, Op::Restart).await,
methods::RELOAD => err_response(id, -32601, "reload not yet implemented".into()),
methods::LOGS => err_response(id, -32601, "logs not yet implemented".into()),
methods::LOGS_CANCEL => err_response(id, -32601, "logs_cancel not yet implemented".into()),
other => err_response(id, -32601, format!("unknown method `{other}`")),
}
}
async fn list(reg: &Registry) -> Result<Vec<ServerSummary>, ApiError> {
let mut out = Vec::new();
for (name, entry) in reg.snapshot().await {
out.push(ServerSummary {
name, state: *entry.handle.state.borrow(),
pid: None, port: 0, uptime_secs: None,
restart_count: 0, last_exit: None,
});
}
Ok(out)
}
async fn status(reg: &Registry, name: &str) -> Result<StatusDetail, ApiError> {
let Some(entry) = reg.get(name).await else {
return Err(ApiError::rpc(RpcErrorCode::ServerNotFound, format!("no such server `{name}`")));
};
Ok(StatusDetail {
summary: ServerSummary {
name: entry.handle.name.clone(),
state: *entry.handle.state.borrow(),
pid: None, port: 0, uptime_secs: None,
restart_count: 0, last_exit: None,
},
recent_transitions: Vec::new(),
})
}
enum Op { Start, Stop, Restart }
async fn dispatch_lifecycle(
id: serde_json::Value, params: serde_json::Value, reg: &Registry, op: Op,
) -> Response {
let p: NameOrAll = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => return err_response(id, -32602, format!("invalid params: {e}")),
};
let targets: Vec<String> = match p {
NameOrAll::All { all } if all => reg.names().await,
NameOrAll::Name { name } => vec![name],
NameOrAll::All { .. } => return err_response(id, -32602, "must set all=true".into()),
};
match op {
Op::Start => {
let mut started = Vec::new();
let mut already = Vec::new();
for name in targets {
let Some(entry) = reg.get(&name).await else {
return err_response(id, RpcErrorCode::ServerNotFound.as_i32(),
format!("no such server `{name}`"));
};
let (tx, rx) = oneshot::channel();
let _ = entry.handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
match rx.await {
Ok(StartAck::Started) => started.push(name),
Ok(StartAck::AlreadyRunning) => already.push(name),
Err(_) => return err_response(id, RpcErrorCode::SpawnFailed.as_i32(),
format!("supervisor for `{name}` dropped")),
}
}
ok_response(id, serde_json::to_value(StartResult { started, already_running: already }).unwrap())
}
Op::Stop => {
let mut stopped = Vec::new();
let mut not_running = Vec::new();
for name in targets {
let Some(entry) = reg.get(&name).await else {
return err_response(id, RpcErrorCode::ServerNotFound.as_i32(),
format!("no such server `{name}`"));
};
let (tx, rx) = oneshot::channel();
let _ = entry.handle.tx.send(SupervisorCmd::Stop { ack: tx }).await;
match rx.await {
Ok(StopAck::Stopped) => stopped.push(name),
Ok(StopAck::NotRunning) => not_running.push(name),
Err(_) => return err_response(id, RpcErrorCode::SpawnFailed.as_i32(),
format!("supervisor for `{name}` dropped")),
}
}
ok_response(id, serde_json::to_value(StopResult { stopped, not_running }).unwrap())
}
Op::Restart => {
let mut restarted = Vec::new();
for name in targets {
let Some(entry) = reg.get(&name).await else {
return err_response(id, RpcErrorCode::ServerNotFound.as_i32(),
format!("no such server `{name}`"));
};
let (tx, rx) = oneshot::channel();
let _ = entry.handle.tx.send(SupervisorCmd::Restart { ack: tx }).await;
let _ = rx.await;
restarted.push(name);
}
ok_response(id, serde_json::to_value(RestartResult { restarted }).unwrap())
}
}
}
```
- [ ] **Step 2: Build**
Run: `cargo build -p xy`
Expected: compiles.
- [ ] **Step 3: Commit**
```bash
git add crates/xy/
git commit -m "feat(xy): RPC handlers for list/status/start/stop/restart"
```
### Task 27: RPC handler — reload
**Files:**
- Modify: `crates/xy/src/daemon/handlers.rs`
- [ ] **Step 1: Replace RELOAD arm and add `reload`**
In the `match method` block, change:
```rust
methods::RELOAD => match reload(reg).await {
Ok(v) => ok_response(id, serde_json::to_value(v).unwrap()),
Err(e) => err_response(id, e.code, e.message),
},
```
Append at end of file:
```rust
use xy_protocol::rpc::ReloadResult;
async fn reload(reg: &Registry) -> Result<ReloadResult, ApiError> {
let paths = crate::daemon::PATHS.get()
.ok_or_else(|| ApiError::rpc(RpcErrorCode::ConfigInvalid, "daemon paths not initialized"))?;
let new_configs = xy_protocol::kdl_parse::load_all_configs(&paths.config_dir)
.map_err(|e| ApiError::rpc(RpcErrorCode::ConfigInvalid, e.to_string()))?;
use std::collections::HashMap;
let new_by_name: HashMap<String, xy_protocol::ServerConfig> =
new_configs.into_iter().map(|c| (c.name.clone(), c)).collect();
let existing_names: Vec<String> = reg.names().await;
let mut added = Vec::new();
let mut removed = Vec::new();
let mut changed = Vec::new();
let mut unchanged = Vec::new();
for name in &existing_names {
if !new_by_name.contains_key(name) {
if let Some(entry) = reg.remove(name).await {
let (tx, rx) = oneshot::channel();
let _ = entry.handle.tx.send(SupervisorCmd::Shutdown { ack: tx }).await;
let _ = rx.await;
removed.push(name.clone());
}
}
}
for (name, cfg) in new_by_name {
let new_hash = crate::daemon::config_hash(&cfg);
match reg.get(&name).await {
None => {
let handle = crate::daemon::spawn_supervisor(paths, cfg)
.map_err(|e| ApiError::rpc(RpcErrorCode::SpawnFailed, e.to_string()))?;
reg.insert(name.clone(), crate::daemon::registry::Entry {
handle: handle.clone(), config_hash: new_hash,
}).await;
let (tx, rx) = oneshot::channel();
let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
let _ = rx.await;
added.push(name);
}
Some(entry) if entry.config_hash != new_hash => {
let (tx, rx) = oneshot::channel();
let _ = entry.handle.tx.send(SupervisorCmd::Shutdown { ack: tx }).await;
let _ = rx.await;
reg.remove(&name).await;
let handle = crate::daemon::spawn_supervisor(paths, cfg)
.map_err(|e| ApiError::rpc(RpcErrorCode::SpawnFailed, e.to_string()))?;
reg.insert(name.clone(), crate::daemon::registry::Entry {
handle: handle.clone(), config_hash: new_hash,
}).await;
let (tx, rx) = oneshot::channel();
let _ = handle.tx.send(SupervisorCmd::Start { ack: tx }).await;
let _ = rx.await;
changed.push(name);
}
Some(_) => unchanged.push(name),
}
}
Ok(ReloadResult { added, removed, changed, unchanged })
}
```
- [ ] **Step 2: Build**
Run: `cargo build -p xy`
Expected: compiles.
- [ ] **Step 3: Commit**
```bash
git add crates/xy/
git commit -m "feat(xy): reload handler with diff"
```
### Task 28: RPC handlers — logs + logs_cancel
**Files:**
- Modify: `crates/xy/src/daemon/handlers.rs`
- [ ] **Step 1: Add `ConnState` and rewire `serve`**
At the top of the imports add:
```rust
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use xy_protocol::rpc::{notifications, LogEnd, LogLine, LogsCancelParams, LogsParams, LogsSubscribed};
```
Add `ConnState` and update `serve`:
```rust
pub struct ConnState {
pub subs: Mutex<HashMap<u64, JoinHandle<()>>>,
pub next: AtomicU64,
}
impl ConnState {
pub fn new() -> Self { Self { subs: Mutex::new(HashMap::new()), next: AtomicU64::new(1) } }
}
pub async fn serve(conn: Arc<Connection>, reg: Registry, _paths: Paths) -> std::io::Result<()> {
let state = Arc::new(ConnState::new());
loop {
let Some(incoming) = conn.read_incoming().await? else {
let mut subs = state.subs.lock().await;
for (_, h) in subs.drain() { h.abort(); }
return Ok(());
};
if let Incoming::Request(req) = incoming {
let resp = handle_request(req, &reg, &conn, &state).await;
conn.write_response(&resp).await?;
}
}
}
```
Update `handle_request` signature to:
```rust
async fn handle_request(
req: Request, reg: &Registry, conn: &Arc<Connection>, state: &Arc<ConnState>,
) -> Response { /* same body, plus the two new arms below */ }
```
Replace the LOGS / LOGS_CANCEL arms with:
```rust
methods::LOGS => {
let p: LogsParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => return err_response(id, -32602, format!("invalid params: {e}")),
};
match start_log_stream(reg, conn.clone(), state.clone(), p).await {
Ok(sub_id) => ok_response(id, serde_json::to_value(LogsSubscribed { subscription_id: sub_id }).unwrap()),
Err(e) => err_response(id, e.code, e.message),
}
}
methods::LOGS_CANCEL => {
let p: LogsCancelParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => return err_response(id, -32602, format!("invalid params: {e}")),
};
let mut subs = state.subs.lock().await;
if let Some(h) = subs.remove(&p.subscription_id) { h.abort(); }
ok_response(id, serde_json::json!({}))
}
```
- [ ] **Step 2: Implement `start_log_stream`**
Append:
```rust
async fn start_log_stream(
reg: &Registry, conn: Arc<Connection>, state: Arc<ConnState>, p: LogsParams,
) -> Result<u64, ApiError> {
let Some(entry) = reg.get(&p.name).await else {
return Err(ApiError::rpc(RpcErrorCode::ServerNotFound, format!("no such server `{}`", p.name)));
};
let sub_id = state.next.fetch_add(1, Ordering::Relaxed);
let sink = entry.handle.log_sink.clone();
let conn2 = conn.clone();
let state2 = state.clone();
let follow = p.follow;
let tail = p.tail;
let name = p.name.clone();
let task = tokio::spawn(async move {
for line in sink.ring.snapshot_tail(tail) {
let n = xy_ipc::envelope::notification(notifications::LOG,
Some(serde_json::to_value(LogLine {
subscription_id: sub_id,
name: name.clone(),
stream: line.stream, line: line.line,
ts_unix_ms: line.ts_unix_ms,
}).unwrap()));
if conn2.write_notification(&n).await.is_err() { return; }
}
if !follow {
let end = xy_ipc::envelope::notification(notifications::LOG_END,
Some(serde_json::to_value(LogEnd { subscription_id: sub_id }).unwrap()));
let _ = conn2.write_notification(&end).await;
state2.subs.lock().await.remove(&sub_id);
return;
}
let mut rx = sink.broadcast.subscribe();
loop {
match rx.recv().await {
Ok(mut line) => {
line.subscription_id = sub_id;
let n = xy_ipc::envelope::notification(notifications::LOG,
Some(serde_json::to_value(&line).unwrap()));
if conn2.write_notification(&n).await.is_err() { break; }
}
Err(_) => break,
}
}
});
state.subs.lock().await.insert(sub_id, task);
Ok(sub_id)
}
```
- [ ] **Step 3: Build**
Run: `cargo build -p xy`
Expected: compiles. If `handle_request` callers in `serve` aren't updated, update the call sites.
- [ ] **Step 4: Commit**
```bash
git add crates/xy/
git commit -m "feat(xy): logs streaming via subscription notifications"
```
### Task 29: CLI client implementations
**Files:**
- Modify: `crates/xy/src/cli/mod.rs`
- Create: `crates/xy/src/cli/format.rs`
- [ ] **Step 1: Implement `format.rs`**
```rust
use xy_protocol::rpc::ServerSummary;
pub fn list_table(rows: &[ServerSummary]) -> String {
let mut out = String::new();
out.push_str("NAME STATE PID PORT UPTIME RESTARTS\n");
for r in rows {
let pid = r.pid.map(|p| p.to_string()).unwrap_or_else(|| "-".into());
let up = r.uptime_secs.map(|s| format!("{}s", s)).unwrap_or_else(|| "-".into());
out.push_str(&format!(
"{:<20}{:<12}{:<8}{:<8}{:<10}{}\n",
r.name, format!("{:?}", r.state).to_lowercase(),
pid, r.port, up, r.restart_count
));
}
out
}
```
- [ ] **Step 2: Replace `cli/mod.rs`**
```rust
use crate::paths::Paths;
use anyhow::Result;
use serde_json::json;
use xy_ipc::{Client, ClientError};
use xy_protocol::rpc::{
methods, notifications, LogLine, LogsParams, LogsSubscribed, ReloadResult, RestartResult,
ServerSummary, StartResult, StatusDetail, StopResult,
};
mod format;
async fn connect(paths: &Paths) -> Result<Client> {
match Client::connect(&paths.socket).await {
Ok(c) => Ok(c),
Err(e) => {
eprintln!("xy: cannot reach daemon at {}: {e}", paths.socket.display());
std::process::exit(2);
}
}
}
fn rpc_to_exit(e: &ClientError) -> i32 {
match e {
ClientError::Unreachable(_) => 2,
ClientError::Rpc { .. } => 1,
_ => 1,
}
}
pub async fn list(paths: Paths) -> Result<i32> {
let mut c = connect(&paths).await?;
let rows: Vec<ServerSummary> = match c.call_no_params(methods::LIST).await {
Ok(v) => v,
Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
};
print!("{}", format::list_table(&rows));
Ok(0)
}
pub async fn status(paths: Paths, name: String) -> Result<i32> {
let mut c = connect(&paths).await?;
let d: StatusDetail = match c.call(methods::STATUS, &json!({"name": name})).await {
Ok(v) => v,
Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
};
println!("{:#?}", d);
Ok(0)
}
fn name_or_all(all: bool, name: Option<String>) -> serde_json::Value {
if all { json!({"all": true}) } else { json!({"name": name.unwrap()}) }
}
pub async fn start(paths: Paths, all: bool, name: Option<String>) -> Result<i32> {
let mut c = connect(&paths).await?;
let r: StartResult = match c.call(methods::START, &name_or_all(all, name)).await {
Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
};
if !r.started.is_empty() { println!("started: {}", r.started.join(", ")); }
if !r.already_running.is_empty() { println!("already running: {}", r.already_running.join(", ")); }
Ok(0)
}
pub async fn stop(paths: Paths, all: bool, name: Option<String>) -> Result<i32> {
let mut c = connect(&paths).await?;
let r: StopResult = match c.call(methods::STOP, &name_or_all(all, name)).await {
Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
};
if !r.stopped.is_empty() { println!("stopped: {}", r.stopped.join(", ")); }
if !r.not_running.is_empty() { println!("not running: {}", r.not_running.join(", ")); }
Ok(0)
}
pub async fn restart(paths: Paths, all: bool, name: Option<String>) -> Result<i32> {
let mut c = connect(&paths).await?;
let r: RestartResult = match c.call(methods::RESTART, &name_or_all(all, name)).await {
Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
};
println!("restarted: {}", r.restarted.join(", "));
Ok(0)
}
pub async fn reload(paths: Paths) -> Result<i32> {
let mut c = connect(&paths).await?;
let r: ReloadResult = match c.call_no_params(methods::RELOAD).await {
Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
};
println!("added: {}", r.added.join(", "));
println!("removed: {}", r.removed.join(", "));
println!("changed: {}", r.changed.join(", "));
println!("unchanged: {}", r.unchanged.join(", "));
Ok(0)
}
pub async fn logs(paths: Paths, name: String, tail: Option<u32>, follow: bool) -> Result<i32> {
let mut c = connect(&paths).await?;
let p = LogsParams { name: name.clone(), tail, follow };
let _sub: LogsSubscribed = match c.call(methods::LOGS, &p).await {
Ok(v) => v, Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
};
loop {
match c.read_notification().await {
Ok(None) => return Ok(0),
Ok(Some(n)) => match n.method.as_str() {
notifications::LOG => {
if let Some(params) = n.params {
if let Ok(line) = serde_json::from_value::<LogLine>(params) {
let tag = match line.stream {
xy_protocol::rpc::LogStream::Stdout => "out",
xy_protocol::rpc::LogStream::Stderr => "err",
};
println!("[{tag}] {}", line.line);
}
}
}
notifications::LOG_END => return Ok(0),
_ => {}
},
Err(e) => { eprintln!("xy: {e}"); return Ok(rpc_to_exit(&e)); }
}
}
}
```
- [ ] **Step 3: Build**
Run: `cargo build --workspace`
Expected: full workspace compiles.
- [ ] **Step 4: Commit**
```bash
git add crates/xy/
git commit -m "feat(xy): CLI client commands"
```
---
## Phase 6 — Integration tests
### Task 30: Test helper binaries
**Files:**
- Create: `crates/xy/src/bin/xy_test_sleep_server.rs`
- Create: `crates/xy/src/bin/xy_test_exit_failure.rs`
- Modify: `crates/xy/Cargo.toml`
- [ ] **Step 1: `xy_test_sleep_server.rs`**
```rust
fn main() {
use std::io::Write;
let pid = std::process::id();
eprintln!("sleep_server start pid={pid}");
println!("ready");
std::io::stdout().flush().ok();
loop {
std::thread::sleep(std::time::Duration::from_secs(60));
println!("tick");
std::io::stdout().flush().ok();
}
}
```
- [ ] **Step 2: `xy_test_exit_failure.rs`**
```rust
fn main() {
eprintln!("exit_failure dying immediately");
std::process::exit(7);
}
```
- [ ] **Step 3: Declare in `crates/xy/Cargo.toml`**
```toml
[[bin]]
name = "xy-test-sleep-server"
path = "src/bin/xy_test_sleep_server.rs"
[[bin]]
name = "xy-test-exit-failure"
path = "src/bin/xy_test_exit_failure.rs"
```
- [ ] **Step 4: Build**
Run: `cargo build -p xy --bins`
Expected: three binaries produced.
- [ ] **Step 5: Commit**
```bash
git add crates/xy/
git commit -m "test(xy): helper binaries for integration tests"
```
### Task 31: Integration test scaffolding
**Files:**
- Create: `crates/xy/tests/common/mod.rs`
- [ ] **Step 1: Write the harness**
```rust
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use tokio::process::{Child, Command};
use tempfile::TempDir;
pub struct Harness {
pub tmp: TempDir,
pub config_dir: PathBuf,
pub state_dir: PathBuf,
pub socket: PathBuf,
pub daemon: Option<Child>,
}
impl Harness {
pub fn new() -> Self {
let tmp = tempfile::tempdir().expect("tempdir");
std::fs::create_dir_all(tmp.path().join("config")).unwrap();
std::fs::create_dir_all(tmp.path().join("state")).unwrap();
std::fs::create_dir_all(tmp.path().join("run")).unwrap();
let config_dir = tmp.path().join("config/xy/servers");
let state_dir = tmp.path().join("state/xy");
std::fs::create_dir_all(&config_dir).unwrap();
std::fs::create_dir_all(&state_dir).unwrap();
let socket = tmp.path().join("run/xy.sock");
Self { tmp, config_dir, state_dir, socket, daemon: None }
}
pub fn write_server(&self, name: &str, command: &str, port: u16, restart_policy: &str) {
let body = format!(
"command \"{command}\"\nport {port}\nrestart {{ policy \"{restart_policy}\" backoff-initial \"10ms\" backoff-max \"50ms\" max-retries-per-minute 3 }}\nstop {{ grace \"500ms\" }}\n"
);
std::fs::write(self.config_dir.join(format!("{name}.kdl")), body).unwrap();
}
pub async fn start_daemon(&mut self, xy_bin: &PathBuf) {
let child = Command::new(xy_bin)
.arg("daemon")
.env("XDG_CONFIG_HOME", self.tmp.path().join("config"))
.env("XDG_STATE_HOME", self.tmp.path().join("state"))
.env("XDG_RUNTIME_DIR", self.tmp.path().join("run"))
.stdout(Stdio::null())
.stderr(Stdio::inherit())
.kill_on_drop(true)
.spawn()
.expect("spawn daemon");
self.daemon = Some(child);
let deadline = std::time::Instant::now() + Duration::from_secs(5);
while !self.socket.exists() {
if std::time::Instant::now() > deadline { panic!("daemon socket never appeared"); }
tokio::time::sleep(Duration::from_millis(25)).await;
}
}
pub async fn run_cli(&self, xy_bin: &PathBuf, args: &[&str]) -> (i32, String, String) {
let out = Command::new(xy_bin)
.args(args)
.env("XDG_CONFIG_HOME", self.tmp.path().join("config"))
.env("XDG_STATE_HOME", self.tmp.path().join("state"))
.env("XDG_RUNTIME_DIR", self.tmp.path().join("run"))
.output().await.expect("run cli");
let code = out.status.code().unwrap_or(-1);
let stdout = String::from_utf8_lossy(&out.stdout).to_string();
let stderr = String::from_utf8_lossy(&out.stderr).to_string();
(code, stdout, stderr)
}
}
pub fn xy_bin() -> PathBuf { artifact("xy") }
pub fn sleep_server_bin() -> PathBuf { artifact("xy-test-sleep-server") }
pub fn exit_failure_bin() -> PathBuf { artifact("xy-test-exit-failure") }
fn artifact(name: &str) -> PathBuf {
let mut p = std::env::current_exe().unwrap();
p.pop();
if p.ends_with("deps") { p.pop(); }
p.push(name);
if !p.exists() { panic!("artifact `{}` not found at {}", name, p.display()); }
p
}
```
- [ ] **Step 2: Build**
Run: `cargo build --workspace --tests`
Expected: compiles.
- [ ] **Step 3: Commit**
```bash
git add crates/xy/
git commit -m "test(xy): integration test harness"
```
### Task 32: Lifecycle integration test
**Files:**
- Create: `crates/xy/tests/lifecycle.rs`
- [ ] **Step 1: Write the test**
```rust
mod common;
use common::*;
#[tokio::test]
async fn auto_starts_on_boot_then_stop_and_start() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("alpha", sleeper.to_str().unwrap(), 19_001, "always");
h.start_daemon(&xy).await;
let mut last_stdout = String::new();
for _ in 0..40 {
let (_c, out, _e) = h.run_cli(&xy, &["list"]).await;
last_stdout = out;
if last_stdout.contains("alpha") && last_stdout.contains("running") { break; }
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert!(last_stdout.contains("alpha"), "stdout: {last_stdout}");
assert!(last_stdout.contains("running"), "stdout: {last_stdout}");
let (code, out, _e) = h.run_cli(&xy, &["stop", "alpha"]).await;
assert_eq!(code, 0);
assert!(out.contains("stopped: alpha"), "stdout: {out}");
let (code, out, _e) = h.run_cli(&xy, &["start", "alpha"]).await;
assert_eq!(code, 0);
assert!(out.contains("started: alpha"), "stdout: {out}");
}
```
- [ ] **Step 2: Run**
Run: `cargo test -p xy --test lifecycle -- --nocapture`
Expected: passes.
- [ ] **Step 3: Commit**
```bash
git add crates/xy/
git commit -m "test(xy): auto-start + stop/start lifecycle"
```
### Task 33: Restart cap integration test
**Files:**
- Create: `crates/xy/tests/restart_policy.rs`
- [ ] **Step 1: Write the test**
```rust
mod common;
use common::*;
#[tokio::test]
async fn restart_cap_marks_failed() {
let xy = xy_bin();
let bad = exit_failure_bin();
let mut h = Harness::new();
h.write_server("flaky", bad.to_str().unwrap(), 19_010, "always");
h.start_daemon(&xy).await;
let mut saw_failed = false;
for _ in 0..60 {
let (_c, out, _e) = h.run_cli(&xy, &["list"]).await;
if out.contains("flaky") && out.contains("failed") { saw_failed = true; break; }
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert!(saw_failed, "flaky never reached failed state");
}
```
- [ ] **Step 2: Run**
Run: `cargo test -p xy --test restart_policy -- --nocapture`
Expected: passes.
- [ ] **Step 3: Commit**
```bash
git add crates/xy/
git commit -m "test(xy): restart cap escalates to failed"
```
### Task 34: Reload integration test
**Files:**
- Create: `crates/xy/tests/reload.rs`
- [ ] **Step 1: Write the test**
```rust
mod common;
use common::*;
#[tokio::test]
async fn reload_adds_removes_and_changes() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("a", sleeper.to_str().unwrap(), 19_020, "always");
h.start_daemon(&xy).await;
for _ in 0..40 {
let (_c, out, _e) = h.run_cli(&xy, &["list"]).await;
if out.contains("a") && out.contains("running") { break; }
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
h.write_server("a", sleeper.to_str().unwrap(), 19_021, "always");
h.write_server("b", sleeper.to_str().unwrap(), 19_022, "always");
let (code, out, _e) = h.run_cli(&xy, &["reload"]).await;
assert_eq!(code, 0);
assert!(out.contains("added:") && out.contains("b"), "stdout: {out}");
assert!(out.contains("changed:") && out.contains("a"), "stdout: {out}");
std::fs::remove_file(h.config_dir.join("a.kdl")).unwrap();
let (code, out, _e) = h.run_cli(&xy, &["reload"]).await;
assert_eq!(code, 0);
assert!(out.contains("removed:") && out.contains("a"), "stdout: {out}");
}
```
- [ ] **Step 2: Run**
Run: `cargo test -p xy --test reload -- --nocapture`
Expected: passes.
- [ ] **Step 3: Commit**
```bash
git add crates/xy/
git commit -m "test(xy): reload diff"
```
### Task 35: Logs integration test
**Files:**
- Create: `crates/xy/tests/logs.rs`
- [ ] **Step 1: Write the test**
```rust
mod common;
use common::*;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
#[tokio::test]
async fn logs_tail_prints_existing_lines() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("svc", sleeper.to_str().unwrap(), 19_030, "always");
h.start_daemon(&xy).await;
tokio::time::sleep(Duration::from_millis(500)).await;
let (code, out, _e) = h.run_cli(&xy, &["logs", "svc", "--tail", "10"]).await;
assert_eq!(code, 0);
assert!(out.contains("ready"), "stdout: {out}");
}
#[tokio::test]
async fn logs_follow_streams_new_lines() {
let xy = xy_bin();
let sleeper = sleep_server_bin();
let mut h = Harness::new();
h.write_server("svc", sleeper.to_str().unwrap(), 19_031, "always");
h.start_daemon(&xy).await;
let mut child = Command::new(&xy)
.args(["logs", "svc", "--follow"])
.env("XDG_CONFIG_HOME", h.tmp.path().join("config"))
.env("XDG_STATE_HOME", h.tmp.path().join("state"))
.env("XDG_RUNTIME_DIR", h.tmp.path().join("run"))
.stdout(Stdio::piped())
.stderr(Stdio::null())
.kill_on_drop(true)
.spawn().unwrap();
let stdout = child.stdout.take().unwrap();
let mut lines = BufReader::new(stdout).lines();
let first = tokio::time::timeout(Duration::from_secs(2), lines.next_line()).await
.expect("timeout waiting for first log line").unwrap();
assert!(first.is_some());
let _ = child.kill().await;
}
```
- [ ] **Step 2: Run**
Run: `cargo test -p xy --test logs -- --nocapture`
Expected: passes.
- [ ] **Step 3: Commit**
```bash
git add crates/xy/
git commit -m "test(xy): logs --tail and --follow"
```
---
## Phase 7 — Polish
### Task 36: CI-clean
**Files:** none (verification only).
- [ ] **Step 1: Format**
Run: `cargo +nightly fmt --all`
- [ ] **Step 2: Lint**
Run: `cargo clippy --workspace --all-targets -- -D warnings`
Fix any warnings inline.
- [ ] **Step 3: Full test pass**
Run: `cargo test --workspace`
Expected: every test green.
- [ ] **Step 4: Commit cleanups**
```bash
git add -A
git commit -m "chore: cargo fmt + clippy clean"
```
### Task 37: README + example KDL
**Files:**
- Create: `README.md`
- Create: `examples/insikt.kdl`
- [ ] **Step 1: Write `README.md`**
```markdown
# xy — HTTP MCP server supervisor
Daemon + CLI that launches and supervises HTTP-based MCP servers.
## Build
cargo build --release
## Run
target/release/xy daemon # foreground
Drop a server definition into `$XDG_CONFIG_HOME/xy/servers/<name>.kdl`
(see `examples/insikt.kdl`) and `xy reload`.
Commands:
xy list
xy status <name>
xy start <name|--all>
xy stop <name|--all>
xy restart <name|--all>
xy reload
xy logs <name> [--tail N] [--follow]
Exit codes: 0 success, 1 operational error, 2 daemon unreachable, 3 config invalid.
```
- [ ] **Step 2: Write `examples/insikt.kdl`**
```kdl
command "/Users/you/.cargo/bin/insikt-mcp"
args "--http" "--port" "8421"
port 8421
env {
RUST_LOG "info"
}
restart {
policy "on-failure"
}
stop {
grace "10s"
}
```
- [ ] **Step 3: Commit**
```bash
git add README.md examples/
git commit -m "docs: README and example KDL config"
```
---
## Self-Review (run before handoff)
**Spec coverage:**
- Single `xy` binary, Cargo workspace with 4 crates — Tasks 1, 816, 1720, 21+.
- XDG paths (macOS too via etcetera) — Task 21.
- Per-server KDL configs, duplicate-port detection at load — Tasks 5, 6.
- JSON-RPC over newline-delimited Unix socket, 0600 perms — Tasks 1720.
- All RPC methods including subscription-based `logs` + `logs_cancel` — Tasks 26, 27, 28.
- Restart policy + backoff + retry window → failed — Tasks 911, 16, 33.
- Stop with SIGTERM, grace timer, SIGKILL — Task 16, 25 (daemon shutdown).
- Log rotation + ring buffer + broadcast — Tasks 1214.
- Process-group ownership of child — Task 15.
- Tests at unit + integration level — Tasks 816 (units), 3235 (integration).
- CLI exit code mapping — Task 29 (`rpc_to_exit`).
**Placeholders:** none — every step contains its code or commands.
**Type consistency:**
- `SupervisorHandle.tx` is `mpsc::Sender<SupervisorCmd>` everywhere.
- `LogLine.subscription_id` is `u64` everywhere.
- `Registry` uses `Entry { handle, config_hash }` consistently in Tasks 24 onward.
**Order:** later tasks depend only on earlier ones. xy-protocol (Phase 2) feeds everything; xy-supervisor (Phase 3) uses xy-protocol; xy-ipc (Phase 4) is independent of supervisor; xy binary (Phase 5) ties them all together; integration tests (Phase 6) drive end-to-end; polish (Phase 7) closes out.