From b9e7a76a7af899abd0df8c8ccca52b7ea47b9d10 Mon Sep 17 00:00:00 2001 From: rob thijssen Date: Tue, 26 May 2026 15:26:12 +0300 Subject: [PATCH] feat(gateway): surface mid-prewarm models as Loading on /v1/models MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The poller now fetches /health alongside /models on each neuron and stashes the activation snapshot on NodeState. The /v1/models handler gains a Pass 3 that synthesises Loading locations from each neuron's activation.in_progress and activation.pending lists, so a catalogued model that's mid-prewarm surfaces as `status: "loading"` rather than appearing absent (loaded=false, locations=[]). Without this, a client polling /v1/models during a beast restart sees Qwen3.6-27B disappear for the ~5 minutes the q5k load takes, then reappear. Now it stays visible the whole time with a clear status. Adds ModelStatus::Loading to cortex-core. The router's per-node priority loop gets an explicit (no-op) arm: Loading models aren't routable yet, and falling through to the catalogue cold-load path is the existing race — no worse than before, but tagged as a known follow-up needing neuron-side in-flight tracking on /models/load. New test_poller_captures_activation_from_health exercises the full round-trip: mock neuron with empty /models but a pre_warming /health → poller writes node.activation. Common test helpers gain spawn_mock_neuron_with_models_and_health and default_health_response. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/cortex-core/src/node.rs | 17 ++++++- crates/cortex-gateway/src/handlers.rs | 49 ++++++++++++++++++++ crates/cortex-gateway/src/poller.rs | 48 +++++++++++++++++++- crates/cortex-gateway/src/router.rs | 9 ++++ crates/cortex-gateway/src/state.rs | 1 + crates/cortex-gateway/tests/common/mod.rs | 34 ++++++++++++++ crates/cortex-gateway/tests/poller.rs | 55 +++++++++++++++++++++++ 7 files changed, 211 insertions(+), 2 deletions(-) diff --git a/crates/cortex-core/src/node.rs b/crates/cortex-core/src/node.rs index e67ab89..b008577 100644 --- a/crates/cortex-core/src/node.rs +++ b/crates/cortex-core/src/node.rs @@ -1,4 +1,4 @@ -use crate::discovery::DiscoveryResponse; +use crate::discovery::{ActivationStatus, DiscoveryResponse}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -20,6 +20,12 @@ pub struct NodeState { /// successful poll. Used by the router and `/v1/models` to do /// catalogue × topology feasibility checks. pub discovery: Option, + /// Last-seen pre-warm progress from this neuron's `/health` + /// endpoint. `None` until the first /health poll succeeds. The + /// `/v1/models` handler reads `in_progress` + `pending` from here + /// to synthesize `Loading` locations so clients see a catalogued + /// model that's mid-prewarm as "loading", not "missing". + pub activation: Option, } /// A model registered on a node, with its runtime status. @@ -34,12 +40,21 @@ pub struct ModelEntry { } /// Model lifecycle status. +/// +/// `Loading` is a gateway-side synthetic status: neurons never emit it +/// on `/models` (that endpoint only knows about already-loaded handles). +/// The gateway populates it from a neuron's `/health` activation +/// snapshot so the unified `/v1/models` can distinguish "model is +/// catalogued but no one has it" from "model is materialising on +/// neuron N right now". Other status values are reported verbatim by +/// neurons. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum ModelStatus { Loaded, Unloaded, Reloading, + Loading, } /// Unified model entry as exposed by the gateway's `/v1/models` endpoint. diff --git a/crates/cortex-gateway/src/handlers.rs b/crates/cortex-gateway/src/handlers.rs index 9a6effe..daf799f 100644 --- a/crates/cortex-gateway/src/handlers.rs +++ b/crates/cortex-gateway/src/handlers.rs @@ -385,6 +385,55 @@ async fn list_models(State(fleet): State>) -> Json { } } + // Pass 3: surface pre-warming models. Each neuron's `/health` + // activation snapshot (polled separately from /models) reports + // `in_progress` (the model currently materialising) and `pending` + // (queued behind it). Neither appears on the neuron's `/models` + // yet — that endpoint only knows about fully-loaded handles — so + // without this pass a client polling `/v1/models` during pre-warm + // sees Qwen3.6-27B with no location and concludes "not there". + // Synthesising a Loading location instead tells clients the model + // is on its way. Idempotent against Pass 2: if a Loading location + // for this node already exists (shouldn't, but be safe) we skip. + for node in nodes.values() { + let Some(activation) = node.activation.as_ref() else { + continue; + }; + let mut loading_ids: Vec<&str> = Vec::new(); + if let Some(id) = activation.in_progress.as_deref() { + loading_ids.push(id); + } + for id in &activation.pending { + loading_ids.push(id.as_str()); + } + for model_id in loading_ids { + let location = ModelLocation { + node: node.name.clone(), + status: cortex_core::node::ModelStatus::Loading, + vram_estimate_mb: None, + }; + entries + .entry(model_id.to_string()) + .and_modify(|e| { + let already = e.locations.iter().any(|l| { + l.node == node.name && l.status == cortex_core::node::ModelStatus::Loading + }); + if !already { + e.locations.push(location.clone()); + } + }) + .or_insert_with(|| CortexModelEntry { + id: model_id.to_string(), + object: "model".into(), + created: now, + owned_by: "helexa".into(), + loaded: false, + feasible_on: Vec::new(), + locations: vec![location], + }); + } + } + let data: Vec = entries.values().map(|e| json!(e)).collect(); Json(json!({ "object": "list", diff --git a/crates/cortex-gateway/src/poller.rs b/crates/cortex-gateway/src/poller.rs index 2dbb308..367c814 100644 --- a/crates/cortex-gateway/src/poller.rs +++ b/crates/cortex-gateway/src/poller.rs @@ -3,7 +3,7 @@ use crate::state::CortexState; use chrono::Utc; -use cortex_core::discovery::DiscoveryResponse; +use cortex_core::discovery::{DiscoveryResponse, HealthResponse}; use cortex_core::harness::ModelInfo; use cortex_core::node::{ModelEntry, ModelStatus}; use std::sync::Arc; @@ -142,6 +142,51 @@ async fn poll_neuron(fleet: &CortexState, name: &str, endpoint: &str) { node.healthy = false; } } + + // Release the write lock before the next HTTP call. + drop(nodes); + + // Poll /health for the activation snapshot. We don't want this to + // flip the node to unhealthy on its own — a neuron that's serving + // /models fine is still operational even if /health is briefly + // unavailable — so failures are debug-level and leave the existing + // activation reading in place. + poll_health(fleet, name, endpoint).await; +} + +/// Fetch `/health` and stash the activation snapshot on NodeState. +/// Decoupled from the /models poll so a /health glitch doesn't mark +/// the neuron unhealthy or evict the model list. +async fn poll_health(fleet: &CortexState, name: &str, endpoint: &str) { + let url = format!("{endpoint}/health"); + let resp = match fleet + .http_client + .get(&url) + .timeout(Duration::from_secs(5)) + .send() + .await + { + Ok(r) if r.status().is_success() => r, + Ok(r) => { + tracing::debug!(node = name, status = %r.status(), "/health probe non-success"); + return; + } + Err(e) => { + tracing::debug!(node = name, error = %e, "/health probe failed"); + return; + } + }; + match resp.json::().await { + Ok(h) => { + let mut nodes = fleet.nodes.write().await; + if let Some(node) = nodes.get_mut(name) { + node.activation = Some(h.activation); + } + } + Err(e) => { + tracing::debug!(node = name, error = %e, "failed to parse /health response"); + } + } } fn parse_status(s: &str) -> ModelStatus { @@ -149,6 +194,7 @@ fn parse_status(s: &str) -> ModelStatus { "loaded" => ModelStatus::Loaded, "unloaded" => ModelStatus::Unloaded, "reloading" => ModelStatus::Reloading, + "loading" => ModelStatus::Loading, _ => ModelStatus::Loaded, } } diff --git a/crates/cortex-gateway/src/router.rs b/crates/cortex-gateway/src/router.rs index 45acfc0..5d4db72 100644 --- a/crates/cortex-gateway/src/router.rs +++ b/crates/cortex-gateway/src/router.rs @@ -79,6 +79,15 @@ pub async fn resolve( unloaded_route = Some((node.name.clone(), node.endpoint.clone(), true)); } } + // Loading is gateway-synthesised from neuron's + // activation snapshot; it never appears on the + // wire from neuron's `/models`. Skip — the model + // isn't actually servable yet. The pre-existing + // race (catalogue cold_load fires a parallel + // /models/load against the in-flight load) is no + // worse than before; fixing it needs neuron-side + // in-flight tracking on /models/load itself. + ModelStatus::Loading => {} } } } diff --git a/crates/cortex-gateway/src/state.rs b/crates/cortex-gateway/src/state.rs index 6699889..9a258f3 100644 --- a/crates/cortex-gateway/src/state.rs +++ b/crates/cortex-gateway/src/state.rs @@ -27,6 +27,7 @@ impl CortexState { lifecycle_cycles: 0, last_poll: None, discovery: None, + activation: None, }, ); } diff --git a/crates/cortex-gateway/tests/common/mod.rs b/crates/cortex-gateway/tests/common/mod.rs index c080e53..a1911a4 100644 --- a/crates/cortex-gateway/tests/common/mod.rs +++ b/crates/cortex-gateway/tests/common/mod.rs @@ -164,6 +164,33 @@ pub async fn spawn_streaming_mock_neuron(chunk_count: usize, chunk_delay: Durati /// Spawns a mock neuron with a custom models list. pub async fn spawn_mock_neuron_with_models(models_response: Value) -> String { + spawn_mock_neuron_with_models_and_health(models_response, default_health_response()).await +} + +/// Default `/health` response used by mocks that don't care about the +/// activation field — empty devices, no in-flight pre-warm, state=ready. +pub fn default_health_response() -> Value { + json!({ + "uptime_secs": 0, + "devices": [], + "activation": { + "state": "ready", + "pending": [], + "in_progress": null, + "completed": [], + "failed": [] + } + }) +} + +/// Variant of `spawn_mock_neuron_with_models` that also serves a +/// `/health` body. Used by tests that drive the gateway's activation +/// surface (poller reading /health, /v1/models synthesising Loading +/// locations from in_progress / pending). +pub async fn spawn_mock_neuron_with_models_and_health( + models_response: Value, + health_response: Value, +) -> String { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let base_url = format!("http://{addr}"); @@ -177,6 +204,13 @@ pub async fn spawn_mock_neuron_with_models(models_response: Value) -> String { async move { Json(resp) } }), ) + .route( + "/health", + get(move || { + let resp = health_response.clone(); + async move { Json(resp) } + }), + ) .route( "/models/{model_id}/endpoint", get(move |Path(_model_id): Path| { diff --git a/crates/cortex-gateway/tests/poller.rs b/crates/cortex-gateway/tests/poller.rs index c3f9d23..fe76ada 100644 --- a/crates/cortex-gateway/tests/poller.rs +++ b/crates/cortex-gateway/tests/poller.rs @@ -237,3 +237,58 @@ async fn test_poller_removes_stale_models() { assert!(node.models.contains_key("keep-me")); assert!(!node.models.contains_key("drop-me")); } + +#[tokio::test] +async fn test_poller_captures_activation_from_health() { + // Mock neuron is mid-prewarm: /models reports nothing (the loading + // model hasn't been inserted into the harness map yet), but + // /health's activation says model-x is in_progress and model-y is + // queued behind it. + let mock_url = common::spawn_mock_neuron_with_models_and_health( + json!([]), + json!({ + "uptime_secs": 30, + "devices": [], + "activation": { + "state": "pre_warming", + "pending": ["Qwen/model-y"], + "in_progress": "Qwen/model-x", + "completed": [], + "failed": [] + } + }), + ) + .await; + + let config = GatewayConfig { + gateway: GatewaySettings { + listen: "127.0.0.1:0".into(), + metrics_listen: "127.0.0.1:0".into(), + }, + eviction: EvictionSettings { + strategy: EvictionStrategy::Lru, + defrag_after_cycles: 0, + }, + neurons: vec![NeuronEndpoint { + name: "prewarm-node".into(), + endpoint: mock_url, + }], + models_config: "/dev/null".into(), + }; + + let fleet = Arc::new(CortexState::from_config(&config)); + cortex_gateway::poller::poll_once(&fleet).await; + + let nodes = fleet.nodes.read().await; + let node = nodes.get("prewarm-node").unwrap(); + assert!(node.healthy); + // /models was empty — no entries in the per-node model map. + assert!(node.models.is_empty()); + // But /health's activation should be captured. + let activation = node + .activation + .as_ref() + .expect("activation should be populated after /health poll"); + assert_eq!(activation.in_progress.as_deref(), Some("Qwen/model-x")); + assert_eq!(activation.pending, vec!["Qwen/model-y".to_string()]); +}