From 800498f53001ac85b0915079be7ee16fbe2536cd Mon Sep 17 00:00:00 2001 From: rob thijssen Date: Tue, 26 May 2026 15:18:04 +0300 Subject: [PATCH] feat(neuron): bind listener before pre-warm, surface activation in /health MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two coupled changes addressing the 2026-05-26 validate-neuron failure where a fresh deploy of beast had /health unreachable for ~5 minutes while Qwen3.6-27B q5k materialised, even though systemd reported the unit as active. 1. main.rs no longer awaits load_default_models before binding axum. The listener binds first; pre-warm runs in a spawned background task that holds a read lock on the harness registry for the duration of its sequential load loop. Concurrent on-demand /models/load and /v1/chat/completions traffic still flow. 2. /health gains an `activation` field carrying: state pre_warming | ready pending model ids queued but not started in_progress model id currently loading (Option) completed model ids loaded successfully this activation failed [{model_id, error}] for failed entries The field is `#[serde(default)]` so a pre-change cortex polling a new neuron — or vice versa — keeps working. `ActivationTracker` (new module `neuron::activation`) owns the RwLock-wrapped state; load_default_models takes a tracker reference and updates it per-model. NeuronState holds an Arc clone for the /health handler. Tests updated to construct trackers and assert state transitions (empty noop, two failures → ready with both in `failed`). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/cortex-core/src/discovery.rs | 64 ++++++++++++++++++++ crates/neuron/src/activation.rs | 93 +++++++++++++++++++++++++++++ crates/neuron/src/api.rs | 12 +++- crates/neuron/src/health.rs | 6 ++ crates/neuron/src/lib.rs | 1 + crates/neuron/src/main.rs | 39 +++++++++--- crates/neuron/src/startup.rs | 46 +++++++++----- crates/neuron/tests/activation.rs | 25 +++++++- crates/neuron/tests/api.rs | 6 ++ 9 files changed, 267 insertions(+), 25 deletions(-) create mode 100644 crates/neuron/src/activation.rs diff --git a/crates/cortex-core/src/discovery.rs b/crates/cortex-core/src/discovery.rs index 9c8d1b0..ccd91bd 100644 --- a/crates/cortex-core/src/discovery.rs +++ b/crates/cortex-core/src/discovery.rs @@ -36,8 +36,72 @@ pub struct DeviceHealth { /// Runtime health response from a neuron endpoint. /// Returned by `GET /health`. +/// +/// `activation` was added in 2026-05-26 to distinguish "process is up +/// and reachable" from "process is ready to serve traffic". A `Type=simple` +/// systemd unit reports `active` the moment the binary starts — but a +/// neuron whose `default_models` list takes minutes to materialise +/// won't bind its listener (or, in the new flow, won't have any models +/// loaded) until pre-warm completes. The new field is `#[serde(default)]` +/// so a pre-2026-05-26 gateway polling a new neuron — or vice versa — +/// keeps working. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HealthResponse { pub uptime_secs: u64, pub devices: Vec, + #[serde(default)] + pub activation: ActivationStatus, +} + +/// High-level activation state of the neuron daemon. The HTTP listener +/// is bound during both states; what differs is whether the configured +/// `default_models` have finished loading. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ActivationState { + /// At least one `default_models` entry is still loading. The + /// neuron's other endpoints work, but inference against + /// not-yet-loaded models will 404. + PreWarming, + /// Every `default_models` entry has either loaded or failed; the + /// neuron is steady-state. Subsequent on-demand loads via + /// `/models/load` don't flip back to PreWarming — that field + /// reflects the activation-time set only. + #[default] + Ready, +} + +/// Per-model failure record surfaced in [`ActivationStatus::failed`]. +/// The error string is the rendered anyhow chain at the time of the +/// failure; operators read it from `/health` to decide whether to +/// retry, edit the spec, or unload+reload. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PreWarmFailure { + pub model_id: String, + pub error: String, +} + +/// Activation-time progress snapshot. All four lists are populated by +/// the neuron's pre-warm task and read by the `/health` handler. The +/// snapshot is consistent: a model id appears in exactly one of +/// `pending`, `in_progress` (as `Option`), `completed`, or +/// `failed` at any point in time. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct ActivationStatus { + pub state: ActivationState, + /// Model ids queued but not yet started. Empty in `Ready` state. + #[serde(default)] + pub pending: Vec, + /// Model id currently materialising. None when between models or + /// in `Ready` state. + #[serde(default)] + pub in_progress: Option, + /// Model ids that finished loading successfully during this + /// activation. Cleared on process restart. + #[serde(default)] + pub completed: Vec, + /// Model ids that failed during this activation, with the rendered + /// error chain. Cleared on process restart. + #[serde(default)] + pub failed: Vec, } diff --git a/crates/neuron/src/activation.rs b/crates/neuron/src/activation.rs new file mode 100644 index 0000000..ed3af50 --- /dev/null +++ b/crates/neuron/src/activation.rs @@ -0,0 +1,93 @@ +//! Activation-time pre-warm progress tracking. +//! +//! Wraps the [`ActivationStatus`] snapshot in an async RwLock so the +//! background pre-warm task can update it per-model while the +//! `/health` handler reads coherent snapshots. The tracker exists +//! because `default_models` loading moved from synchronous-before-bind +//! to background-after-bind on 2026-05-26: the listener is up +//! immediately, but `/health` now needs to tell callers which of the +//! configured defaults are still warming. + +use cortex_core::discovery::{ActivationState, ActivationStatus, PreWarmFailure}; +use cortex_core::harness::ModelSpec; +use tokio::sync::RwLock; + +/// Shared, async-safe handle to the daemon's activation progress. +/// +/// Construct once in `main` with the configured `default_models` so +/// the initial `pending` list matches the spec; clone the `Arc` into +/// the `NeuronState` for HTTP handlers and into the spawned pre-warm +/// task for updates. +pub struct ActivationTracker { + inner: RwLock, +} + +impl ActivationTracker { + /// Build a tracker primed with one entry per spec. An empty spec + /// list yields a `Ready` tracker — no point reporting PreWarming + /// when there's nothing queued. + pub fn new(default_models: &[ModelSpec]) -> Self { + let pending: Vec = default_models.iter().map(|s| s.model_id.clone()).collect(); + let state = if pending.is_empty() { + ActivationState::Ready + } else { + ActivationState::PreWarming + }; + Self { + inner: RwLock::new(ActivationStatus { + state, + pending, + in_progress: None, + completed: vec![], + failed: vec![], + }), + } + } + + /// Mark a model as in-progress: remove it from `pending`, set as + /// `in_progress`. Called immediately before `registry.load_model`. + pub async fn start_loading(&self, model_id: &str) { + let mut s = self.inner.write().await; + s.pending.retain(|m| m != model_id); + s.in_progress = Some(model_id.to_string()); + } + + /// Mark a model as completed: clear `in_progress` (if it matches), + /// append to `completed`. + pub async fn complete_loading(&self, model_id: &str) { + let mut s = self.inner.write().await; + if s.in_progress.as_deref() == Some(model_id) { + s.in_progress = None; + } + s.completed.push(model_id.to_string()); + } + + /// Mark a model as failed: clear `in_progress` (if it matches), + /// append a `PreWarmFailure` carrying the rendered error chain. + pub async fn fail_loading(&self, model_id: &str, error: &str) { + let mut s = self.inner.write().await; + if s.in_progress.as_deref() == Some(model_id) { + s.in_progress = None; + } + s.failed.push(PreWarmFailure { + model_id: model_id.to_string(), + error: error.to_string(), + }); + } + + /// Flip the high-level `state` to `Ready` once the pre-warm task + /// is done iterating. Pending should be empty by this point; if a + /// caller bails early it's a stuck activation and the operator + /// will see entries in `pending` even with `state=ready` — that's + /// a useful diagnostic, not an inconsistency to scrub. + pub async fn mark_ready(&self) { + let mut s = self.inner.write().await; + s.state = ActivationState::Ready; + s.in_progress = None; + } + + /// Cheap clone of the current state for the `/health` handler. + pub async fn snapshot(&self) -> ActivationStatus { + self.inner.read().await.clone() + } +} diff --git a/crates/neuron/src/api.rs b/crates/neuron/src/api.rs index ba95831..bd72836 100644 --- a/crates/neuron/src/api.rs +++ b/crates/neuron/src/api.rs @@ -1,5 +1,6 @@ //! HTTP API handlers for the neuron daemon. +use crate::activation::ActivationTracker; use crate::harness::HarnessRegistry; use crate::harness::candle::{CandleHarness, InferenceError}; use crate::health::HealthCache; @@ -28,6 +29,9 @@ pub struct NeuronState { /// startup so `/v1/chat/completions` doesn't have to hold the registry /// read lock or perform dyn-Trait dispatch per request. pub candle: Option>, + /// Activation-time pre-warm progress. Updated by the background + /// `load_default_models` task, read by the `/health` handler. + pub activation: Arc, } /// Build the neuron API router. @@ -47,7 +51,13 @@ async fn discovery_handler(State(state): State>) -> Json>) -> Json { - Json(state.health_cache.snapshot().await) + // HealthCache owns the uptime + per-device readings; the activation + // tracker owns the pre-warm progress. We compose the response here + // so the cache stays a thin runtime-state cache and doesn't need to + // know about activation lifecycle. + let mut snapshot = state.health_cache.snapshot().await; + snapshot.activation = state.activation.snapshot().await; + Json(snapshot) } async fn list_models(State(state): State>) -> impl IntoResponse { diff --git a/crates/neuron/src/health.rs b/crates/neuron/src/health.rs index 89f7c6f..570286f 100644 --- a/crates/neuron/src/health.rs +++ b/crates/neuron/src/health.rs @@ -24,6 +24,12 @@ impl HealthCache { inner: RwLock::new(HealthResponse { uptime_secs: 0, devices: vec![], + // The cache only owns the device-state half of /health; + // the api handler overlays activation from the tracker. + // Initialise with the default (Ready, empty lists) so a + // direct read from the cache stays a well-typed + // HealthResponse on the wire. + activation: Default::default(), }), has_gpus: RwLock::new(false), } diff --git a/crates/neuron/src/lib.rs b/crates/neuron/src/lib.rs index ac659ea..de600ca 100644 --- a/crates/neuron/src/lib.rs +++ b/crates/neuron/src/lib.rs @@ -1,3 +1,4 @@ +pub mod activation; pub mod api; pub mod config; pub mod cuda; diff --git a/crates/neuron/src/main.rs b/crates/neuron/src/main.rs index a0612be..25f9e09 100644 --- a/crates/neuron/src/main.rs +++ b/crates/neuron/src/main.rs @@ -1,7 +1,7 @@ use anyhow::{Context, Result}; use clap::Parser; use neuron::{ - api, + activation, api, config::NeuronConfig, discovery, harness::{HarnessRegistry, tp}, @@ -173,12 +173,6 @@ async fn daemon(args: Args) -> Result<()> { discovery_result.harnesses = registry.names(); let candle = registry.candle(); - // Activation: load default models before binding the listener. - // Each load may take tens of seconds to several minutes depending - // on model size and HF cache state — keep TimeoutStartSec in the - // systemd unit generous enough to cover the slowest entry. - startup::load_default_models(®istry, &cfg.default_models).await; - let health_cache = Arc::new(health::HealthCache::new()); health_cache .set_has_gpus(!discovery_result.devices.is_empty()) @@ -189,17 +183,46 @@ async fn daemon(args: Args) -> Result<()> { poller_cache.poll_loop(start_time).await; }); + // Track pre-warm progress so `/health` can tell callers whether + // configured default_models are still loading. Primed with the + // pending list now; the spawned task below flips entries through + // in_progress → completed/failed and finally toggles state=ready. + let activation = Arc::new(activation::ActivationTracker::new(&cfg.default_models)); + let state = Arc::new(api::NeuronState { discovery: discovery_result, health_cache, registry: RwLock::new(registry), candle, + activation: Arc::clone(&activation), }); + // Bind the HTTP listener BEFORE kicking off default_models loading. + // Previously load_default_models ran synchronously on this task, + // which delayed the bind by minutes for big TP models and made the + // host look down to anything probing `/health` during pre-warm. + // The pre-warm task runs in the background instead — `/health` + // surfaces its progress via the activation field. let app = api::neuron_routes().with_state(Arc::clone(&state)); let addr: std::net::SocketAddr = format!("0.0.0.0:{port}").parse()?; - tracing::info!("neuron listening on {addr}"); let listener = tokio::net::TcpListener::bind(addr).await?; + tracing::info!("neuron listening on {addr}"); + + if !cfg.default_models.is_empty() { + let state_for_prewarm = Arc::clone(&state); + let default_models = cfg.default_models.clone(); + tokio::spawn(async move { + // Read lock held for the whole pre-warm run. The unload + // path takes the same read lock per call (no writers) and + // serialises through the candle harness's own internal + // mutex, so concurrent on-demand loads and pre-warm loads + // do not race on the same model. + let registry = state_for_prewarm.registry.read().await; + startup::load_default_models(®istry, &default_models, &state_for_prewarm.activation) + .await; + }); + } + axum::serve(listener, app) .with_graceful_shutdown(startup::shutdown_signal()) .await?; diff --git a/crates/neuron/src/startup.rs b/crates/neuron/src/startup.rs index 012ae8f..35b68f9 100644 --- a/crates/neuron/src/startup.rs +++ b/crates/neuron/src/startup.rs @@ -5,6 +5,7 @@ //! graceful-shutdown future. Kept in its own module so the logic is //! unit-testable without spinning up a full neuron process. +use crate::activation::ActivationTracker; use crate::harness::HarnessRegistry; use cortex_core::harness::ModelSpec; use std::time::{Duration, Instant}; @@ -22,29 +23,46 @@ const UNLOAD_TIMEOUT: Duration = Duration::from_secs(20); /// individual failures as warnings rather than fatal errors. /// /// VRAM contention makes parallel loads risky; the sequential path is -/// boring but correct. The function logs elapsed time per load so an -/// operator can see which model is hogging activation. -pub async fn load_default_models(registry: &HarnessRegistry, specs: &[ModelSpec]) { +/// boring but correct. The function logs elapsed time per load and +/// updates `activation` so the `/health` endpoint can tell callers +/// which models are still pre-warming. Caller is expected to run this +/// in a background `tokio::spawn` task — the HTTP listener binds +/// independently so the host is reachable during the pre-warm window. +pub async fn load_default_models( + registry: &HarnessRegistry, + specs: &[ModelSpec], + activation: &ActivationTracker, +) { if specs.is_empty() { + activation.mark_ready().await; return; } tracing::info!(count = specs.len(), "loading default models"); for spec in specs { let start = Instant::now(); + activation.start_loading(&spec.model_id).await; match registry.load_model(spec).await { - Ok(()) => tracing::info!( - model = %spec.model_id, - elapsed_ms = start.elapsed().as_millis() as u64, - "loaded default model" - ), - Err(e) => tracing::warn!( - model = %spec.model_id, - error = %e, - elapsed_ms = start.elapsed().as_millis() as u64, - "failed to load default model, continuing" - ), + Ok(()) => { + activation.complete_loading(&spec.model_id).await; + tracing::info!( + model = %spec.model_id, + elapsed_ms = start.elapsed().as_millis() as u64, + "loaded default model" + ); + } + Err(e) => { + let rendered = format!("{e:#}"); + activation.fail_loading(&spec.model_id, &rendered).await; + tracing::warn!( + model = %spec.model_id, + error = %rendered, + elapsed_ms = start.elapsed().as_millis() as u64, + "failed to load default model, continuing" + ); + } } } + activation.mark_ready().await; } /// Future that resolves on SIGINT (Ctrl-C) or SIGTERM (systemd stop). diff --git a/crates/neuron/tests/activation.rs b/crates/neuron/tests/activation.rs index 8daa972..dd4398c 100644 --- a/crates/neuron/tests/activation.rs +++ b/crates/neuron/tests/activation.rs @@ -2,7 +2,9 @@ //! individual failures so a single broken catalogue entry doesn't //! prevent the rest of the fleet from starting. +use cortex_core::discovery::ActivationState; use cortex_core::harness::{HarnessConfig, ModelSpec}; +use neuron::activation::ActivationTracker; use neuron::config::HarnessSettings; use neuron::harness::HarnessRegistry; use neuron::startup; @@ -37,7 +39,8 @@ async fn test_load_default_models_skips_unknown_harness() { }, ]; - startup::load_default_models(®istry, &specs).await; + let activation = ActivationTracker::new(&specs); + startup::load_default_models(®istry, &specs, &activation).await; let listed = registry .list_all_models() @@ -47,10 +50,28 @@ async fn test_load_default_models_skips_unknown_harness() { listed.is_empty(), "no models should be loaded after failed entries" ); + + // Both specs should land in `failed`; tracker should flip to ready. + let snapshot = activation.snapshot().await; + assert_eq!(snapshot.state, ActivationState::Ready); + assert!(snapshot.pending.is_empty()); + assert!(snapshot.in_progress.is_none()); + assert!(snapshot.completed.is_empty()); + assert_eq!(snapshot.failed.len(), 2); + let failed_ids: Vec<&str> = snapshot + .failed + .iter() + .map(|f| f.model_id.as_str()) + .collect(); + assert!(failed_ids.contains(&"model-a")); + assert!(failed_ids.contains(&"model-b")); } #[tokio::test] async fn test_load_default_models_empty_is_noop() { let registry = HarnessRegistry::new(); - startup::load_default_models(®istry, &[]).await; + let activation = ActivationTracker::new(&[]); + startup::load_default_models(®istry, &[], &activation).await; + let snapshot = activation.snapshot().await; + assert_eq!(snapshot.state, ActivationState::Ready); } diff --git a/crates/neuron/tests/api.rs b/crates/neuron/tests/api.rs index 61a9a03..86b20af 100644 --- a/crates/neuron/tests/api.rs +++ b/crates/neuron/tests/api.rs @@ -1,4 +1,5 @@ use cortex_core::discovery::{DeviceInfo, DiscoveryResponse}; +use neuron::activation::ActivationTracker; use neuron::api::{self, NeuronState}; use neuron::harness::HarnessRegistry; use neuron::health::HealthCache; @@ -15,6 +16,7 @@ async fn spawn_neuron(discovery: DiscoveryResponse) -> String { health_cache, registry: RwLock::new(registry), candle: None, + activation: Arc::new(ActivationTracker::new(&[])), }); let app = api::neuron_routes().with_state(state); @@ -160,6 +162,7 @@ async fn test_candle_harness_registers_and_rejects_bogus_model() { health_cache, registry: RwLock::new(registry), candle, + activation: Arc::new(ActivationTracker::new(&[])), }); let app = api::neuron_routes().with_state(state); @@ -211,6 +214,7 @@ async fn test_chat_completions_no_candle_harness() { health_cache, registry: RwLock::new(registry), candle: None, + activation: Arc::new(ActivationTracker::new(&[])), }); let app = api::neuron_routes().with_state(state); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -252,6 +256,7 @@ async fn test_chat_completions_model_not_loaded() { health_cache, registry: RwLock::new(registry), candle, + activation: Arc::new(ActivationTracker::new(&[])), }); let app = api::neuron_routes().with_state(state); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -295,6 +300,7 @@ async fn test_chat_completions_streaming_model_not_loaded() { health_cache, registry: RwLock::new(registry), candle, + activation: Arc::new(ActivationTracker::new(&[])), }); let app = api::neuron_routes().with_state(state); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();