feat(gateway): surface mid-prewarm models as Loading on /v1/models
The poller now fetches /health alongside /models on each neuron and stashes the activation snapshot on NodeState. The /v1/models handler gains a Pass 3 that synthesises Loading locations from each neuron's activation.in_progress and activation.pending lists, so a catalogued model that's mid-prewarm surfaces as `status: "loading"` rather than appearing absent (loaded=false, locations=[]). Without this, a client polling /v1/models during a beast restart sees Qwen3.6-27B disappear for the ~5 minutes the q5k load takes, then reappear. Now it stays visible the whole time with a clear status. Adds ModelStatus::Loading to cortex-core. The router's per-node priority loop gets an explicit (no-op) arm: Loading models aren't routable yet, and falling through to the catalogue cold-load path is the existing race — no worse than before, but tagged as a known follow-up needing neuron-side in-flight tracking on /models/load. New test_poller_captures_activation_from_health exercises the full round-trip: mock neuron with empty /models but a pre_warming /health → poller writes node.activation. Common test helpers gain spawn_mock_neuron_with_models_and_health and default_health_response. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,4 +1,4 @@
|
|||||||
use crate::discovery::DiscoveryResponse;
|
use crate::discovery::{ActivationStatus, DiscoveryResponse};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@@ -20,6 +20,12 @@ pub struct NodeState {
|
|||||||
/// successful poll. Used by the router and `/v1/models` to do
|
/// successful poll. Used by the router and `/v1/models` to do
|
||||||
/// catalogue × topology feasibility checks.
|
/// catalogue × topology feasibility checks.
|
||||||
pub discovery: Option<DiscoveryResponse>,
|
pub discovery: Option<DiscoveryResponse>,
|
||||||
|
/// Last-seen pre-warm progress from this neuron's `/health`
|
||||||
|
/// endpoint. `None` until the first /health poll succeeds. The
|
||||||
|
/// `/v1/models` handler reads `in_progress` + `pending` from here
|
||||||
|
/// to synthesize `Loading` locations so clients see a catalogued
|
||||||
|
/// model that's mid-prewarm as "loading", not "missing".
|
||||||
|
pub activation: Option<ActivationStatus>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A model registered on a node, with its runtime status.
|
/// A model registered on a node, with its runtime status.
|
||||||
@@ -34,12 +40,21 @@ pub struct ModelEntry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Model lifecycle status.
|
/// Model lifecycle status.
|
||||||
|
///
|
||||||
|
/// `Loading` is a gateway-side synthetic status: neurons never emit it
|
||||||
|
/// on `/models` (that endpoint only knows about already-loaded handles).
|
||||||
|
/// The gateway populates it from a neuron's `/health` activation
|
||||||
|
/// snapshot so the unified `/v1/models` can distinguish "model is
|
||||||
|
/// catalogued but no one has it" from "model is materialising on
|
||||||
|
/// neuron N right now". Other status values are reported verbatim by
|
||||||
|
/// neurons.
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "lowercase")]
|
#[serde(rename_all = "lowercase")]
|
||||||
pub enum ModelStatus {
|
pub enum ModelStatus {
|
||||||
Loaded,
|
Loaded,
|
||||||
Unloaded,
|
Unloaded,
|
||||||
Reloading,
|
Reloading,
|
||||||
|
Loading,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Unified model entry as exposed by the gateway's `/v1/models` endpoint.
|
/// Unified model entry as exposed by the gateway's `/v1/models` endpoint.
|
||||||
|
|||||||
@@ -385,6 +385,55 @@ async fn list_models(State(fleet): State<Arc<CortexState>>) -> Json<Value> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pass 3: surface pre-warming models. Each neuron's `/health`
|
||||||
|
// activation snapshot (polled separately from /models) reports
|
||||||
|
// `in_progress` (the model currently materialising) and `pending`
|
||||||
|
// (queued behind it). Neither appears on the neuron's `/models`
|
||||||
|
// yet — that endpoint only knows about fully-loaded handles — so
|
||||||
|
// without this pass a client polling `/v1/models` during pre-warm
|
||||||
|
// sees Qwen3.6-27B with no location and concludes "not there".
|
||||||
|
// Synthesising a Loading location instead tells clients the model
|
||||||
|
// is on its way. Idempotent against Pass 2: if a Loading location
|
||||||
|
// for this node already exists (shouldn't, but be safe) we skip.
|
||||||
|
for node in nodes.values() {
|
||||||
|
let Some(activation) = node.activation.as_ref() else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let mut loading_ids: Vec<&str> = Vec::new();
|
||||||
|
if let Some(id) = activation.in_progress.as_deref() {
|
||||||
|
loading_ids.push(id);
|
||||||
|
}
|
||||||
|
for id in &activation.pending {
|
||||||
|
loading_ids.push(id.as_str());
|
||||||
|
}
|
||||||
|
for model_id in loading_ids {
|
||||||
|
let location = ModelLocation {
|
||||||
|
node: node.name.clone(),
|
||||||
|
status: cortex_core::node::ModelStatus::Loading,
|
||||||
|
vram_estimate_mb: None,
|
||||||
|
};
|
||||||
|
entries
|
||||||
|
.entry(model_id.to_string())
|
||||||
|
.and_modify(|e| {
|
||||||
|
let already = e.locations.iter().any(|l| {
|
||||||
|
l.node == node.name && l.status == cortex_core::node::ModelStatus::Loading
|
||||||
|
});
|
||||||
|
if !already {
|
||||||
|
e.locations.push(location.clone());
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.or_insert_with(|| CortexModelEntry {
|
||||||
|
id: model_id.to_string(),
|
||||||
|
object: "model".into(),
|
||||||
|
created: now,
|
||||||
|
owned_by: "helexa".into(),
|
||||||
|
loaded: false,
|
||||||
|
feasible_on: Vec::new(),
|
||||||
|
locations: vec![location],
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let data: Vec<Value> = entries.values().map(|e| json!(e)).collect();
|
let data: Vec<Value> = entries.values().map(|e| json!(e)).collect();
|
||||||
Json(json!({
|
Json(json!({
|
||||||
"object": "list",
|
"object": "list",
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
|
|
||||||
use crate::state::CortexState;
|
use crate::state::CortexState;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use cortex_core::discovery::DiscoveryResponse;
|
use cortex_core::discovery::{DiscoveryResponse, HealthResponse};
|
||||||
use cortex_core::harness::ModelInfo;
|
use cortex_core::harness::ModelInfo;
|
||||||
use cortex_core::node::{ModelEntry, ModelStatus};
|
use cortex_core::node::{ModelEntry, ModelStatus};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -142,6 +142,51 @@ async fn poll_neuron(fleet: &CortexState, name: &str, endpoint: &str) {
|
|||||||
node.healthy = false;
|
node.healthy = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Release the write lock before the next HTTP call.
|
||||||
|
drop(nodes);
|
||||||
|
|
||||||
|
// Poll /health for the activation snapshot. We don't want this to
|
||||||
|
// flip the node to unhealthy on its own — a neuron that's serving
|
||||||
|
// /models fine is still operational even if /health is briefly
|
||||||
|
// unavailable — so failures are debug-level and leave the existing
|
||||||
|
// activation reading in place.
|
||||||
|
poll_health(fleet, name, endpoint).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch `/health` and stash the activation snapshot on NodeState.
|
||||||
|
/// Decoupled from the /models poll so a /health glitch doesn't mark
|
||||||
|
/// the neuron unhealthy or evict the model list.
|
||||||
|
async fn poll_health(fleet: &CortexState, name: &str, endpoint: &str) {
|
||||||
|
let url = format!("{endpoint}/health");
|
||||||
|
let resp = match fleet
|
||||||
|
.http_client
|
||||||
|
.get(&url)
|
||||||
|
.timeout(Duration::from_secs(5))
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(r) if r.status().is_success() => r,
|
||||||
|
Ok(r) => {
|
||||||
|
tracing::debug!(node = name, status = %r.status(), "/health probe non-success");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::debug!(node = name, error = %e, "/health probe failed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
match resp.json::<HealthResponse>().await {
|
||||||
|
Ok(h) => {
|
||||||
|
let mut nodes = fleet.nodes.write().await;
|
||||||
|
if let Some(node) = nodes.get_mut(name) {
|
||||||
|
node.activation = Some(h.activation);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::debug!(node = name, error = %e, "failed to parse /health response");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_status(s: &str) -> ModelStatus {
|
fn parse_status(s: &str) -> ModelStatus {
|
||||||
@@ -149,6 +194,7 @@ fn parse_status(s: &str) -> ModelStatus {
|
|||||||
"loaded" => ModelStatus::Loaded,
|
"loaded" => ModelStatus::Loaded,
|
||||||
"unloaded" => ModelStatus::Unloaded,
|
"unloaded" => ModelStatus::Unloaded,
|
||||||
"reloading" => ModelStatus::Reloading,
|
"reloading" => ModelStatus::Reloading,
|
||||||
|
"loading" => ModelStatus::Loading,
|
||||||
_ => ModelStatus::Loaded,
|
_ => ModelStatus::Loaded,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -79,6 +79,15 @@ pub async fn resolve(
|
|||||||
unloaded_route = Some((node.name.clone(), node.endpoint.clone(), true));
|
unloaded_route = Some((node.name.clone(), node.endpoint.clone(), true));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Loading is gateway-synthesised from neuron's
|
||||||
|
// activation snapshot; it never appears on the
|
||||||
|
// wire from neuron's `/models`. Skip — the model
|
||||||
|
// isn't actually servable yet. The pre-existing
|
||||||
|
// race (catalogue cold_load fires a parallel
|
||||||
|
// /models/load against the in-flight load) is no
|
||||||
|
// worse than before; fixing it needs neuron-side
|
||||||
|
// in-flight tracking on /models/load itself.
|
||||||
|
ModelStatus::Loading => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ impl CortexState {
|
|||||||
lifecycle_cycles: 0,
|
lifecycle_cycles: 0,
|
||||||
last_poll: None,
|
last_poll: None,
|
||||||
discovery: None,
|
discovery: None,
|
||||||
|
activation: None,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -164,6 +164,33 @@ pub async fn spawn_streaming_mock_neuron(chunk_count: usize, chunk_delay: Durati
|
|||||||
|
|
||||||
/// Spawns a mock neuron with a custom models list.
|
/// Spawns a mock neuron with a custom models list.
|
||||||
pub async fn spawn_mock_neuron_with_models(models_response: Value) -> String {
|
pub async fn spawn_mock_neuron_with_models(models_response: Value) -> String {
|
||||||
|
spawn_mock_neuron_with_models_and_health(models_response, default_health_response()).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Default `/health` response used by mocks that don't care about the
|
||||||
|
/// activation field — empty devices, no in-flight pre-warm, state=ready.
|
||||||
|
pub fn default_health_response() -> Value {
|
||||||
|
json!({
|
||||||
|
"uptime_secs": 0,
|
||||||
|
"devices": [],
|
||||||
|
"activation": {
|
||||||
|
"state": "ready",
|
||||||
|
"pending": [],
|
||||||
|
"in_progress": null,
|
||||||
|
"completed": [],
|
||||||
|
"failed": []
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Variant of `spawn_mock_neuron_with_models` that also serves a
|
||||||
|
/// `/health` body. Used by tests that drive the gateway's activation
|
||||||
|
/// surface (poller reading /health, /v1/models synthesising Loading
|
||||||
|
/// locations from in_progress / pending).
|
||||||
|
pub async fn spawn_mock_neuron_with_models_and_health(
|
||||||
|
models_response: Value,
|
||||||
|
health_response: Value,
|
||||||
|
) -> String {
|
||||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
let addr = listener.local_addr().unwrap();
|
let addr = listener.local_addr().unwrap();
|
||||||
let base_url = format!("http://{addr}");
|
let base_url = format!("http://{addr}");
|
||||||
@@ -177,6 +204,13 @@ pub async fn spawn_mock_neuron_with_models(models_response: Value) -> String {
|
|||||||
async move { Json(resp) }
|
async move { Json(resp) }
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
.route(
|
||||||
|
"/health",
|
||||||
|
get(move || {
|
||||||
|
let resp = health_response.clone();
|
||||||
|
async move { Json(resp) }
|
||||||
|
}),
|
||||||
|
)
|
||||||
.route(
|
.route(
|
||||||
"/models/{model_id}/endpoint",
|
"/models/{model_id}/endpoint",
|
||||||
get(move |Path(_model_id): Path<String>| {
|
get(move |Path(_model_id): Path<String>| {
|
||||||
|
|||||||
@@ -237,3 +237,58 @@ async fn test_poller_removes_stale_models() {
|
|||||||
assert!(node.models.contains_key("keep-me"));
|
assert!(node.models.contains_key("keep-me"));
|
||||||
assert!(!node.models.contains_key("drop-me"));
|
assert!(!node.models.contains_key("drop-me"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_poller_captures_activation_from_health() {
|
||||||
|
// Mock neuron is mid-prewarm: /models reports nothing (the loading
|
||||||
|
// model hasn't been inserted into the harness map yet), but
|
||||||
|
// /health's activation says model-x is in_progress and model-y is
|
||||||
|
// queued behind it.
|
||||||
|
let mock_url = common::spawn_mock_neuron_with_models_and_health(
|
||||||
|
json!([]),
|
||||||
|
json!({
|
||||||
|
"uptime_secs": 30,
|
||||||
|
"devices": [],
|
||||||
|
"activation": {
|
||||||
|
"state": "pre_warming",
|
||||||
|
"pending": ["Qwen/model-y"],
|
||||||
|
"in_progress": "Qwen/model-x",
|
||||||
|
"completed": [],
|
||||||
|
"failed": []
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let config = GatewayConfig {
|
||||||
|
gateway: GatewaySettings {
|
||||||
|
listen: "127.0.0.1:0".into(),
|
||||||
|
metrics_listen: "127.0.0.1:0".into(),
|
||||||
|
},
|
||||||
|
eviction: EvictionSettings {
|
||||||
|
strategy: EvictionStrategy::Lru,
|
||||||
|
defrag_after_cycles: 0,
|
||||||
|
},
|
||||||
|
neurons: vec![NeuronEndpoint {
|
||||||
|
name: "prewarm-node".into(),
|
||||||
|
endpoint: mock_url,
|
||||||
|
}],
|
||||||
|
models_config: "/dev/null".into(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let fleet = Arc::new(CortexState::from_config(&config));
|
||||||
|
cortex_gateway::poller::poll_once(&fleet).await;
|
||||||
|
|
||||||
|
let nodes = fleet.nodes.read().await;
|
||||||
|
let node = nodes.get("prewarm-node").unwrap();
|
||||||
|
assert!(node.healthy);
|
||||||
|
// /models was empty — no entries in the per-node model map.
|
||||||
|
assert!(node.models.is_empty());
|
||||||
|
// But /health's activation should be captured.
|
||||||
|
let activation = node
|
||||||
|
.activation
|
||||||
|
.as_ref()
|
||||||
|
.expect("activation should be populated after /health poll");
|
||||||
|
assert_eq!(activation.in_progress.as_deref(), Some("Qwen/model-x"));
|
||||||
|
assert_eq!(activation.pending, vec!["Qwen/model-y".to_string()]);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user