feat(neuron): bind listener before pre-warm, surface activation in /health
Some checks failed
build-prerelease / Resolve version stamps (push) Successful in 33s
CI / Format (push) Successful in 41s
CI / Clippy (push) Successful in 2m26s
build-prerelease / Build neuron-blackwell (push) Successful in 3m34s
CI / Test (push) Successful in 4m44s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Build cortex binary (push) Successful in 4m29s
build-prerelease / Package cortex RPM (push) Successful in 1m23s
build-prerelease / Build neuron-ada (push) Has been cancelled
build-prerelease / Package helexa-neuron-ada RPM (push) Has been cancelled
build-prerelease / Package helexa-neuron-ampere RPM (push) Has been cancelled
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been cancelled
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Has been cancelled
build-prerelease / Build neuron-ampere (push) Has been cancelled

Two coupled changes addressing the 2026-05-26 validate-neuron failure
where a fresh deploy of beast had /health unreachable for ~5 minutes
while Qwen3.6-27B q5k materialised, even though systemd reported the
unit as active.

1. main.rs no longer awaits load_default_models before binding axum.
   The listener binds first; pre-warm runs in a spawned background
   task that holds a read lock on the harness registry for the
   duration of its sequential load loop. Concurrent on-demand
   /models/load and /v1/chat/completions traffic still flow.

2. /health gains an `activation` field carrying:
     state         pre_warming | ready
     pending       model ids queued but not started
     in_progress   model id currently loading (Option)
     completed     model ids loaded successfully this activation
     failed        [{model_id, error}] for failed entries
   The field is `#[serde(default)]` so a pre-change cortex polling a
   new neuron — or vice versa — keeps working.

`ActivationTracker` (new module `neuron::activation`) owns the
RwLock-wrapped state; load_default_models takes a tracker reference
and updates it per-model. NeuronState holds an Arc clone for the
/health handler.

Tests updated to construct trackers and assert state transitions
(empty noop, two failures → ready with both in `failed`).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-26 15:18:04 +03:00
parent d3f2d50749
commit 800498f530
9 changed files with 267 additions and 25 deletions

View File

@@ -0,0 +1,93 @@
//! Activation-time pre-warm progress tracking.
//!
//! Wraps the [`ActivationStatus`] snapshot in an async RwLock so the
//! background pre-warm task can update it per-model while the
//! `/health` handler reads coherent snapshots. The tracker exists
//! because `default_models` loading moved from synchronous-before-bind
//! to background-after-bind on 2026-05-26: the listener is up
//! immediately, but `/health` now needs to tell callers which of the
//! configured defaults are still warming.
use cortex_core::discovery::{ActivationState, ActivationStatus, PreWarmFailure};
use cortex_core::harness::ModelSpec;
use tokio::sync::RwLock;
/// Shared, async-safe handle to the daemon's activation progress.
///
/// Construct once in `main` with the configured `default_models` so
/// the initial `pending` list matches the spec; clone the `Arc` into
/// the `NeuronState` for HTTP handlers and into the spawned pre-warm
/// task for updates.
pub struct ActivationTracker {
inner: RwLock<ActivationStatus>,
}
impl ActivationTracker {
/// Build a tracker primed with one entry per spec. An empty spec
/// list yields a `Ready` tracker — no point reporting PreWarming
/// when there's nothing queued.
pub fn new(default_models: &[ModelSpec]) -> Self {
let pending: Vec<String> = default_models.iter().map(|s| s.model_id.clone()).collect();
let state = if pending.is_empty() {
ActivationState::Ready
} else {
ActivationState::PreWarming
};
Self {
inner: RwLock::new(ActivationStatus {
state,
pending,
in_progress: None,
completed: vec![],
failed: vec![],
}),
}
}
/// Mark a model as in-progress: remove it from `pending`, set as
/// `in_progress`. Called immediately before `registry.load_model`.
pub async fn start_loading(&self, model_id: &str) {
let mut s = self.inner.write().await;
s.pending.retain(|m| m != model_id);
s.in_progress = Some(model_id.to_string());
}
/// Mark a model as completed: clear `in_progress` (if it matches),
/// append to `completed`.
pub async fn complete_loading(&self, model_id: &str) {
let mut s = self.inner.write().await;
if s.in_progress.as_deref() == Some(model_id) {
s.in_progress = None;
}
s.completed.push(model_id.to_string());
}
/// Mark a model as failed: clear `in_progress` (if it matches),
/// append a `PreWarmFailure` carrying the rendered error chain.
pub async fn fail_loading(&self, model_id: &str, error: &str) {
let mut s = self.inner.write().await;
if s.in_progress.as_deref() == Some(model_id) {
s.in_progress = None;
}
s.failed.push(PreWarmFailure {
model_id: model_id.to_string(),
error: error.to_string(),
});
}
/// Flip the high-level `state` to `Ready` once the pre-warm task
/// is done iterating. Pending should be empty by this point; if a
/// caller bails early it's a stuck activation and the operator
/// will see entries in `pending` even with `state=ready` — that's
/// a useful diagnostic, not an inconsistency to scrub.
pub async fn mark_ready(&self) {
let mut s = self.inner.write().await;
s.state = ActivationState::Ready;
s.in_progress = None;
}
/// Cheap clone of the current state for the `/health` handler.
pub async fn snapshot(&self) -> ActivationStatus {
self.inner.read().await.clone()
}
}

View File

@@ -1,5 +1,6 @@
//! HTTP API handlers for the neuron daemon.
use crate::activation::ActivationTracker;
use crate::harness::HarnessRegistry;
use crate::harness::candle::{CandleHarness, InferenceError};
use crate::health::HealthCache;
@@ -28,6 +29,9 @@ pub struct NeuronState {
/// startup so `/v1/chat/completions` doesn't have to hold the registry
/// read lock or perform dyn-Trait dispatch per request.
pub candle: Option<Arc<CandleHarness>>,
/// Activation-time pre-warm progress. Updated by the background
/// `load_default_models` task, read by the `/health` handler.
pub activation: Arc<ActivationTracker>,
}
/// Build the neuron API router.
@@ -47,7 +51,13 @@ async fn discovery_handler(State(state): State<Arc<NeuronState>>) -> Json<Discov
}
async fn health_handler(State(state): State<Arc<NeuronState>>) -> Json<HealthResponse> {
Json(state.health_cache.snapshot().await)
// HealthCache owns the uptime + per-device readings; the activation
// tracker owns the pre-warm progress. We compose the response here
// so the cache stays a thin runtime-state cache and doesn't need to
// know about activation lifecycle.
let mut snapshot = state.health_cache.snapshot().await;
snapshot.activation = state.activation.snapshot().await;
Json(snapshot)
}
async fn list_models(State(state): State<Arc<NeuronState>>) -> impl IntoResponse {

View File

@@ -24,6 +24,12 @@ impl HealthCache {
inner: RwLock::new(HealthResponse {
uptime_secs: 0,
devices: vec![],
// The cache only owns the device-state half of /health;
// the api handler overlays activation from the tracker.
// Initialise with the default (Ready, empty lists) so a
// direct read from the cache stays a well-typed
// HealthResponse on the wire.
activation: Default::default(),
}),
has_gpus: RwLock::new(false),
}

View File

@@ -1,3 +1,4 @@
pub mod activation;
pub mod api;
pub mod config;
pub mod cuda;

View File

@@ -1,7 +1,7 @@
use anyhow::{Context, Result};
use clap::Parser;
use neuron::{
api,
activation, api,
config::NeuronConfig,
discovery,
harness::{HarnessRegistry, tp},
@@ -173,12 +173,6 @@ async fn daemon(args: Args) -> Result<()> {
discovery_result.harnesses = registry.names();
let candle = registry.candle();
// Activation: load default models before binding the listener.
// Each load may take tens of seconds to several minutes depending
// on model size and HF cache state — keep TimeoutStartSec in the
// systemd unit generous enough to cover the slowest entry.
startup::load_default_models(&registry, &cfg.default_models).await;
let health_cache = Arc::new(health::HealthCache::new());
health_cache
.set_has_gpus(!discovery_result.devices.is_empty())
@@ -189,17 +183,46 @@ async fn daemon(args: Args) -> Result<()> {
poller_cache.poll_loop(start_time).await;
});
// Track pre-warm progress so `/health` can tell callers whether
// configured default_models are still loading. Primed with the
// pending list now; the spawned task below flips entries through
// in_progress → completed/failed and finally toggles state=ready.
let activation = Arc::new(activation::ActivationTracker::new(&cfg.default_models));
let state = Arc::new(api::NeuronState {
discovery: discovery_result,
health_cache,
registry: RwLock::new(registry),
candle,
activation: Arc::clone(&activation),
});
// Bind the HTTP listener BEFORE kicking off default_models loading.
// Previously load_default_models ran synchronously on this task,
// which delayed the bind by minutes for big TP models and made the
// host look down to anything probing `/health` during pre-warm.
// The pre-warm task runs in the background instead — `/health`
// surfaces its progress via the activation field.
let app = api::neuron_routes().with_state(Arc::clone(&state));
let addr: std::net::SocketAddr = format!("0.0.0.0:{port}").parse()?;
tracing::info!("neuron listening on {addr}");
let listener = tokio::net::TcpListener::bind(addr).await?;
tracing::info!("neuron listening on {addr}");
if !cfg.default_models.is_empty() {
let state_for_prewarm = Arc::clone(&state);
let default_models = cfg.default_models.clone();
tokio::spawn(async move {
// Read lock held for the whole pre-warm run. The unload
// path takes the same read lock per call (no writers) and
// serialises through the candle harness's own internal
// mutex, so concurrent on-demand loads and pre-warm loads
// do not race on the same model.
let registry = state_for_prewarm.registry.read().await;
startup::load_default_models(&registry, &default_models, &state_for_prewarm.activation)
.await;
});
}
axum::serve(listener, app)
.with_graceful_shutdown(startup::shutdown_signal())
.await?;

View File

@@ -5,6 +5,7 @@
//! graceful-shutdown future. Kept in its own module so the logic is
//! unit-testable without spinning up a full neuron process.
use crate::activation::ActivationTracker;
use crate::harness::HarnessRegistry;
use cortex_core::harness::ModelSpec;
use std::time::{Duration, Instant};
@@ -22,29 +23,46 @@ const UNLOAD_TIMEOUT: Duration = Duration::from_secs(20);
/// individual failures as warnings rather than fatal errors.
///
/// VRAM contention makes parallel loads risky; the sequential path is
/// boring but correct. The function logs elapsed time per load so an
/// operator can see which model is hogging activation.
pub async fn load_default_models(registry: &HarnessRegistry, specs: &[ModelSpec]) {
/// boring but correct. The function logs elapsed time per load and
/// updates `activation` so the `/health` endpoint can tell callers
/// which models are still pre-warming. Caller is expected to run this
/// in a background `tokio::spawn` task — the HTTP listener binds
/// independently so the host is reachable during the pre-warm window.
pub async fn load_default_models(
registry: &HarnessRegistry,
specs: &[ModelSpec],
activation: &ActivationTracker,
) {
if specs.is_empty() {
activation.mark_ready().await;
return;
}
tracing::info!(count = specs.len(), "loading default models");
for spec in specs {
let start = Instant::now();
activation.start_loading(&spec.model_id).await;
match registry.load_model(spec).await {
Ok(()) => tracing::info!(
model = %spec.model_id,
elapsed_ms = start.elapsed().as_millis() as u64,
"loaded default model"
),
Err(e) => tracing::warn!(
model = %spec.model_id,
error = %e,
elapsed_ms = start.elapsed().as_millis() as u64,
"failed to load default model, continuing"
),
Ok(()) => {
activation.complete_loading(&spec.model_id).await;
tracing::info!(
model = %spec.model_id,
elapsed_ms = start.elapsed().as_millis() as u64,
"loaded default model"
);
}
Err(e) => {
let rendered = format!("{e:#}");
activation.fail_loading(&spec.model_id, &rendered).await;
tracing::warn!(
model = %spec.model_id,
error = %rendered,
elapsed_ms = start.elapsed().as_millis() as u64,
"failed to load default model, continuing"
);
}
}
}
activation.mark_ready().await;
}
/// Future that resolves on SIGINT (Ctrl-C) or SIGTERM (systemd stop).

View File

@@ -2,7 +2,9 @@
//! individual failures so a single broken catalogue entry doesn't
//! prevent the rest of the fleet from starting.
use cortex_core::discovery::ActivationState;
use cortex_core::harness::{HarnessConfig, ModelSpec};
use neuron::activation::ActivationTracker;
use neuron::config::HarnessSettings;
use neuron::harness::HarnessRegistry;
use neuron::startup;
@@ -37,7 +39,8 @@ async fn test_load_default_models_skips_unknown_harness() {
},
];
startup::load_default_models(&registry, &specs).await;
let activation = ActivationTracker::new(&specs);
startup::load_default_models(&registry, &specs, &activation).await;
let listed = registry
.list_all_models()
@@ -47,10 +50,28 @@ async fn test_load_default_models_skips_unknown_harness() {
listed.is_empty(),
"no models should be loaded after failed entries"
);
// Both specs should land in `failed`; tracker should flip to ready.
let snapshot = activation.snapshot().await;
assert_eq!(snapshot.state, ActivationState::Ready);
assert!(snapshot.pending.is_empty());
assert!(snapshot.in_progress.is_none());
assert!(snapshot.completed.is_empty());
assert_eq!(snapshot.failed.len(), 2);
let failed_ids: Vec<&str> = snapshot
.failed
.iter()
.map(|f| f.model_id.as_str())
.collect();
assert!(failed_ids.contains(&"model-a"));
assert!(failed_ids.contains(&"model-b"));
}
#[tokio::test]
async fn test_load_default_models_empty_is_noop() {
let registry = HarnessRegistry::new();
startup::load_default_models(&registry, &[]).await;
let activation = ActivationTracker::new(&[]);
startup::load_default_models(&registry, &[], &activation).await;
let snapshot = activation.snapshot().await;
assert_eq!(snapshot.state, ActivationState::Ready);
}

View File

@@ -1,4 +1,5 @@
use cortex_core::discovery::{DeviceInfo, DiscoveryResponse};
use neuron::activation::ActivationTracker;
use neuron::api::{self, NeuronState};
use neuron::harness::HarnessRegistry;
use neuron::health::HealthCache;
@@ -15,6 +16,7 @@ async fn spawn_neuron(discovery: DiscoveryResponse) -> String {
health_cache,
registry: RwLock::new(registry),
candle: None,
activation: Arc::new(ActivationTracker::new(&[])),
});
let app = api::neuron_routes().with_state(state);
@@ -160,6 +162,7 @@ async fn test_candle_harness_registers_and_rejects_bogus_model() {
health_cache,
registry: RwLock::new(registry),
candle,
activation: Arc::new(ActivationTracker::new(&[])),
});
let app = api::neuron_routes().with_state(state);
@@ -211,6 +214,7 @@ async fn test_chat_completions_no_candle_harness() {
health_cache,
registry: RwLock::new(registry),
candle: None,
activation: Arc::new(ActivationTracker::new(&[])),
});
let app = api::neuron_routes().with_state(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
@@ -252,6 +256,7 @@ async fn test_chat_completions_model_not_loaded() {
health_cache,
registry: RwLock::new(registry),
candle,
activation: Arc::new(ActivationTracker::new(&[])),
});
let app = api::neuron_routes().with_state(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
@@ -295,6 +300,7 @@ async fn test_chat_completions_streaming_model_not_loaded() {
health_cache,
registry: RwLock::new(registry),
candle,
activation: Arc::new(ActivationTracker::new(&[])),
});
let app = api::neuron_routes().with_state(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();