Files
helexa/crates/cortex-gateway/src/poller.rs
rob thijssen df9c490614
Some checks failed
CI / Format (push) Successful in 37s
CI / CUDA type-check (pull_request) Failing after 28s
CI / Format (pull_request) Successful in 37s
CI / Clippy (push) Successful in 2m54s
CI / Clippy (pull_request) Successful in 3m36s
CI / Test (push) Successful in 4m37s
CI / Test (pull_request) Successful in 5m20s
CI / Build cortex SRPM (pull_request) Has been skipped
CI / Build neuron SRPM (pull_request) Has been skipped
CI / Publish cortex to COPR (pull_request) Has been skipped
CI / Publish neuron to COPR (pull_request) Has been skipped
CI / Bump version in source (pull_request) Has been skipped
CI / CUDA type-check (push) Failing after 31s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
feat(neuron+gateway): keep auto-recovering models visible as recovering (#20)
During the #17 auto-recovery window (unload → reload, minutes for a
large TP model) the model's registry slot is absent, so it vanished
from neuron's /models — and cortex, routing by /models presence,
answered "model not found on any node" while a direct request to
neuron would have correctly said "recovering, retry shortly".

neuron: the recovery set becomes a map carrying a devices/capabilities
snapshot taken at trigger time (while the registry slot still exists).
list_models reports `recovering` for models in the set — both while
the poisoned slot is still present and during the reload gap, where
the snapshot keeps the model listed.

gateway: ModelStatus grows a Recovering variant (parsed from the
wire); the router holds the route — new RouteError::ModelRecovering
mapped to 503 instead of 404 — and deliberately does not fall through
to the catalogue cold-load, which would race a second placement
against the in-flight recovery. The evictor already ignores
non-Loaded entries.

Tests: neuron unit test (recovering model stays listed with snapshot),
gateway integration tests (poller parses `recovering`; request gets
503 retry-shortly and the model stays on /v1/models).

Closes #20

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 13:42:03 +03:00

204 lines
6.9 KiB
Rust

//! Background poller that periodically queries each neuron's API
//! to refresh the fleet state.
use crate::state::CortexState;
use chrono::Utc;
use cortex_core::discovery::{DiscoveryResponse, HealthResponse};
use cortex_core::harness::ModelInfo;
use cortex_core::node::{ModelEntry, ModelStatus};
use std::sync::Arc;
use std::time::Duration;
const POLL_INTERVAL: Duration = Duration::from_secs(10);
/// Runs forever, polling all neurons on a fixed interval.
pub async fn poll_loop(fleet: Arc<CortexState>) {
loop {
poll_once(&fleet).await;
tokio::time::sleep(POLL_INTERVAL).await;
}
}
/// Poll all neurons once. Used by `poll_loop` and available for testing.
pub async fn poll_once(fleet: &CortexState) {
for nc in &fleet.neuron_configs {
poll_neuron(fleet, &nc.name, &nc.endpoint).await;
}
}
/// One-shot fetch of `GET /discovery`. Cached on the NodeState forever
/// after the first success — topology is invariant for a given neuron
/// process. Skipped when the cache is already populated.
async fn maybe_poll_discovery(fleet: &CortexState, name: &str, endpoint: &str) {
{
let nodes = fleet.nodes.read().await;
match nodes.get(name) {
Some(n) if n.discovery.is_some() => return,
_ => {}
}
}
let url = format!("{endpoint}/discovery");
let resp = match fleet
.http_client
.get(&url)
.timeout(Duration::from_secs(5))
.send()
.await
{
Ok(r) if r.status().is_success() => r,
Ok(r) => {
tracing::debug!(node = name, status = %r.status(), "discovery probe non-success");
return;
}
Err(e) => {
tracing::debug!(node = name, error = %e, "discovery probe unreachable");
return;
}
};
match resp.json::<DiscoveryResponse>().await {
Ok(d) => {
let mut nodes = fleet.nodes.write().await;
if let Some(node) = nodes.get_mut(name) {
tracing::info!(
node = name,
hostname = %d.hostname,
devices = d.devices.len(),
"discovery cached"
);
node.discovery = Some(d);
}
}
Err(e) => {
tracing::warn!(node = name, error = %e, "failed to parse /discovery response");
}
}
}
async fn poll_neuron(fleet: &CortexState, name: &str, endpoint: &str) {
// Topology first — cheap once cached, and the router needs it to
// route requests against catalogue entries that aren't loaded yet.
maybe_poll_discovery(fleet, name, endpoint).await;
let url = format!("{endpoint}/models");
let result = fleet
.http_client
.get(&url)
.timeout(Duration::from_secs(5))
.send()
.await;
let mut nodes = fleet.nodes.write().await;
let Some(node) = nodes.get_mut(name) else {
return;
};
match result {
Ok(resp) if resp.status().is_success() => {
match resp.json::<Vec<ModelInfo>>().await {
Ok(models) => {
let mut seen = std::collections::HashSet::new();
for upstream in &models {
seen.insert(upstream.id.clone());
let status = parse_status(&upstream.status);
node.models
.entry(upstream.id.clone())
.and_modify(|e| {
e.status = status;
e.vram_estimate_mb = upstream.vram_used_mb;
e.capabilities = upstream.capabilities.clone();
})
.or_insert_with(|| ModelEntry {
id: upstream.id.clone(),
status,
last_accessed: None,
vram_estimate_mb: upstream.vram_used_mb,
capabilities: upstream.capabilities.clone(),
});
}
// Remove models no longer reported by the neuron.
node.models.retain(|id, _| seen.contains(id));
node.healthy = true;
node.last_poll = Some(Utc::now());
tracing::debug!(node = name, models = models.len(), "poll ok");
}
Err(e) => {
tracing::warn!(node = name, error = %e, "failed to parse /models response");
node.healthy = false;
}
}
}
Ok(resp) => {
tracing::warn!(
node = name,
status = %resp.status(),
"neuron returned non-success status"
);
node.healthy = false;
}
Err(e) => {
tracing::warn!(node = name, error = %e, "failed to reach neuron");
node.healthy = false;
}
}
// Release the write lock before the next HTTP call.
drop(nodes);
// Poll /health for the activation snapshot. We don't want this to
// flip the node to unhealthy on its own — a neuron that's serving
// /models fine is still operational even if /health is briefly
// unavailable — so failures are debug-level and leave the existing
// activation reading in place.
poll_health(fleet, name, endpoint).await;
}
/// Fetch `/health` and stash the activation snapshot on NodeState.
/// Decoupled from the /models poll so a /health glitch doesn't mark
/// the neuron unhealthy or evict the model list.
async fn poll_health(fleet: &CortexState, name: &str, endpoint: &str) {
let url = format!("{endpoint}/health");
let resp = match fleet
.http_client
.get(&url)
.timeout(Duration::from_secs(5))
.send()
.await
{
Ok(r) if r.status().is_success() => r,
Ok(r) => {
tracing::debug!(node = name, status = %r.status(), "/health probe non-success");
return;
}
Err(e) => {
tracing::debug!(node = name, error = %e, "/health probe failed");
return;
}
};
match resp.json::<HealthResponse>().await {
Ok(h) => {
let mut nodes = fleet.nodes.write().await;
if let Some(node) = nodes.get_mut(name) {
node.activation = Some(h.activation);
}
}
Err(e) => {
tracing::debug!(node = name, error = %e, "failed to parse /health response");
}
}
}
fn parse_status(s: &str) -> ModelStatus {
match s {
"loaded" => ModelStatus::Loaded,
"unloaded" => ModelStatus::Unloaded,
"reloading" => ModelStatus::Reloading,
"loading" => ModelStatus::Loading,
"recovering" => ModelStatus::Recovering,
_ => ModelStatus::Loaded,
}
}