feat(xy): daemon boot + accept loop + graceful shutdown
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,8 @@
|
||||
use crate::daemon::registry::Registry;
|
||||
use crate::paths::Paths;
|
||||
use std::sync::Arc;
|
||||
use xy_ipc::Connection;
|
||||
|
||||
pub async fn serve(_conn: Arc<Connection>, _reg: Registry, _paths: Paths) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
+134
-2
@@ -1,6 +1,138 @@
|
||||
use crate::paths::Paths;
|
||||
use anyhow::{bail, Result};
|
||||
use crate::pidfile::PidFile;
|
||||
use anyhow::{Context, Result};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use tokio::sync::{mpsc, oneshot, watch};
|
||||
use tracing::{error, info};
|
||||
use xy_ipc::{Connection, bind};
|
||||
use xy_protocol::{ServerConfig, ServerState, kdl_parse::load_all_configs};
|
||||
use xy_supervisor::{
|
||||
logs::{LogSink, RotatingLogWriter},
|
||||
supervisor::{RealSpawner, SupervisorCmd, SupervisorHandle, SupervisorTask},
|
||||
};
|
||||
|
||||
pub mod handlers;
|
||||
pub mod registry;
|
||||
pub mod shutdown;
|
||||
|
||||
pub async fn run(_paths: Paths) -> Result<()> { bail!("not implemented") }
|
||||
const LOG_FILE_MAX_BYTES: u64 = 10 * 1024 * 1024;
|
||||
const LOG_FILE_KEEP: usize = 5;
|
||||
const RING_BUFFER_BYTES: usize = 1024 * 1024;
|
||||
|
||||
pub static PATHS: OnceLock<Paths> = OnceLock::new();
|
||||
|
||||
pub fn config_hash(cfg: &ServerConfig) -> u64 {
|
||||
use std::hash::{Hash, Hasher};
|
||||
|
||||
let mut h = std::collections::hash_map::DefaultHasher::new();
|
||||
|
||||
serde_json::to_string(cfg).unwrap().hash(&mut h);
|
||||
|
||||
h.finish()
|
||||
}
|
||||
|
||||
pub fn spawn_supervisor(paths: &Paths, cfg: ServerConfig) -> Result<SupervisorHandle> {
|
||||
let log_path = paths.log_dir.join(format!("{}.log", cfg.name));
|
||||
|
||||
let writer = RotatingLogWriter::open(&log_path, LOG_FILE_MAX_BYTES, LOG_FILE_KEEP)
|
||||
.with_context(|| format!("open log file {}", log_path.display()))?;
|
||||
|
||||
let sink = LogSink::new(cfg.name.clone(), writer, RING_BUFFER_BYTES);
|
||||
|
||||
let (state_tx, state_rx) = watch::channel(ServerState::Stopped);
|
||||
let (cmd_tx, cmd_rx) = mpsc::channel(16);
|
||||
|
||||
let name = cfg.name.clone();
|
||||
|
||||
let task = SupervisorTask::new(cfg, sink.clone(), RealSpawner, state_tx, cmd_rx);
|
||||
|
||||
tokio::spawn(task.run());
|
||||
|
||||
Ok(SupervisorHandle {
|
||||
name,
|
||||
tx: cmd_tx,
|
||||
state: state_rx,
|
||||
log_sink: sink,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn run(paths: Paths) -> Result<()> {
|
||||
paths.ensure_dirs().context("create state dirs")?;
|
||||
|
||||
let _pid =
|
||||
PidFile::acquire(&paths.pidfile).context("another xy daemon appears to be running")?;
|
||||
|
||||
let listener = bind(&paths.socket).context("bind unix socket")?;
|
||||
|
||||
let _ = PATHS.set(paths.clone());
|
||||
|
||||
info!(socket = %paths.socket.display(), "daemon listening");
|
||||
|
||||
let configs = load_all_configs(&paths.config_dir).context("load configs")?;
|
||||
|
||||
let registry = registry::Registry::new();
|
||||
|
||||
for cfg in configs {
|
||||
let hash = config_hash(&cfg);
|
||||
|
||||
let handle = spawn_supervisor(&paths, cfg)?;
|
||||
|
||||
let name = handle.name.clone();
|
||||
|
||||
registry
|
||||
.insert(
|
||||
name.clone(),
|
||||
registry::Entry {
|
||||
handle: handle.clone(),
|
||||
config_hash: hash,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let (ack_tx, ack_rx) = oneshot::channel();
|
||||
|
||||
if handle
|
||||
.tx
|
||||
.send(SupervisorCmd::Start { ack: ack_tx })
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
let _ = ack_rx.await;
|
||||
}
|
||||
}
|
||||
|
||||
let registry_for_shutdown = registry.clone();
|
||||
|
||||
let shutdown_signal = shutdown::install();
|
||||
|
||||
let accept = async {
|
||||
loop {
|
||||
let (stream, _addr) = match listener.accept().await {
|
||||
Ok(p) => p,
|
||||
Err(err) => {
|
||||
error!(error = %err, "accept failed");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let conn = Arc::new(Connection::new(stream));
|
||||
let reg = registry.clone();
|
||||
let paths_clone = paths.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = handlers::serve(conn, reg, paths_clone).await {
|
||||
error!(error = %err, "connection ended with error");
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = accept => {}
|
||||
_ = shutdown_signal => { info!("shutdown signal received"); }
|
||||
}
|
||||
|
||||
shutdown::shutdown_all(registry_for_shutdown).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
use crate::daemon::registry::Registry;
|
||||
use tokio::signal::unix::{SignalKind, signal};
|
||||
use tokio::sync::oneshot;
|
||||
use xy_supervisor::supervisor::SupervisorCmd;
|
||||
|
||||
pub fn install() -> impl std::future::Future<Output = ()> {
|
||||
let mut term = signal(SignalKind::terminate()).expect("install SIGTERM handler");
|
||||
let mut int = signal(SignalKind::interrupt()).expect("install SIGINT handler");
|
||||
|
||||
async move {
|
||||
tokio::select! {
|
||||
_ = term.recv() => {}
|
||||
_ = int.recv() => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn shutdown_all(reg: Registry) {
|
||||
let snapshot = reg.snapshot().await;
|
||||
|
||||
let mut acks = Vec::new();
|
||||
|
||||
for (_name, entry) in &snapshot {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
if entry
|
||||
.handle
|
||||
.tx
|
||||
.send(SupervisorCmd::Shutdown { ack: tx })
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
acks.push(rx);
|
||||
}
|
||||
}
|
||||
|
||||
let deadline = tokio::time::Duration::from_secs(30);
|
||||
|
||||
let _ = tokio::time::timeout(deadline, async {
|
||||
for rx in acks {
|
||||
let _ = rx.await;
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
Reference in New Issue
Block a user