Compare commits
2 Commits
fix/neuron
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
3b9a6e37f6
|
|||
|
526b662c5e
|
@@ -32,6 +32,12 @@ pub struct NodeState {
|
||||
/// least-busy replica when a model is loaded on more than one neuron.
|
||||
/// Empty until the first /health poll reports load.
|
||||
pub model_load: HashMap<String, ModelLoad>,
|
||||
/// Consecutive failed `/models` polls. The poller marks a node
|
||||
/// unhealthy only once this crosses a threshold, so a single transient
|
||||
/// miss (e.g. a neuron momentarily slow to answer while busy) doesn't
|
||||
/// yank the node — and all its models — out of routing. Reset to 0 on
|
||||
/// any successful poll.
|
||||
pub consecutive_poll_failures: u32,
|
||||
}
|
||||
|
||||
/// A model registered on a node, with its runtime status.
|
||||
|
||||
@@ -5,12 +5,29 @@ use crate::state::CortexState;
|
||||
use chrono::Utc;
|
||||
use cortex_core::discovery::{DiscoveryResponse, HealthResponse};
|
||||
use cortex_core::harness::ModelInfo;
|
||||
use cortex_core::node::{ModelEntry, ModelStatus};
|
||||
use cortex_core::node::{ModelEntry, ModelStatus, NodeState};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
const POLL_INTERVAL: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Consecutive failed `/models` polls before a node is marked unhealthy.
|
||||
/// Debounces transient misses (a busy neuron briefly slow to answer) so a
|
||||
/// single blip can't yank a node — and its models — out of routing. At the
|
||||
/// 10s poll interval this tolerates ~20s of flapping before evicting.
|
||||
const POLL_FAILURE_THRESHOLD: u32 = 3;
|
||||
|
||||
/// Record a failed poll for `node`, marking it unhealthy only once failures
|
||||
/// reach [`POLL_FAILURE_THRESHOLD`]. Below the threshold the node keeps its
|
||||
/// last-known health, riding over transient misses. A successful poll resets
|
||||
/// the counter (see the success arm in `poll_once`).
|
||||
fn record_poll_failure(node: &mut NodeState) {
|
||||
node.consecutive_poll_failures = node.consecutive_poll_failures.saturating_add(1);
|
||||
if node.consecutive_poll_failures >= POLL_FAILURE_THRESHOLD {
|
||||
node.healthy = false;
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs forever, polling all neurons on a fixed interval.
|
||||
pub async fn poll_loop(fleet: Arc<CortexState>) {
|
||||
loop {
|
||||
@@ -138,13 +155,14 @@ async fn poll_neuron(fleet: &CortexState, name: &str, endpoint: &str) {
|
||||
// Remove models no longer reported by the neuron.
|
||||
node.models.retain(|id, _| seen.contains(id));
|
||||
|
||||
node.consecutive_poll_failures = 0;
|
||||
node.healthy = true;
|
||||
node.last_poll = Some(Utc::now());
|
||||
tracing::debug!(node = name, models = models.len(), "poll ok");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(node = name, error = %e, "failed to parse /models response");
|
||||
node.healthy = false;
|
||||
record_poll_failure(node);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -154,11 +172,11 @@ async fn poll_neuron(fleet: &CortexState, name: &str, endpoint: &str) {
|
||||
status = %resp.status(),
|
||||
"neuron returned non-success status"
|
||||
);
|
||||
node.healthy = false;
|
||||
record_poll_failure(node);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(node = name, error = %e, "failed to reach neuron");
|
||||
node.healthy = false;
|
||||
record_poll_failure(node);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -50,6 +50,10 @@ pub enum RouteError {
|
||||
"model '{model_id}' is in the catalogue but no healthy neuron's topology satisfies its constraints"
|
||||
)]
|
||||
NoFeasibleNeuron { model_id: String },
|
||||
#[error(
|
||||
"model '{model_id}' is feasible on a neuron that is currently unhealthy — retry shortly"
|
||||
)]
|
||||
FeasibleNodeUnhealthy { model_id: String },
|
||||
#[error("cold-load of '{model_id}' on '{node}' failed: {message}")]
|
||||
ColdLoadFailed {
|
||||
model_id: String,
|
||||
@@ -68,7 +72,9 @@ impl RouteError {
|
||||
/// safe to retry the same request); everything else is 404.
|
||||
pub fn http_status(&self) -> u16 {
|
||||
match self {
|
||||
RouteError::NoHealthyNodes | RouteError::ModelRecovering { .. } => 503,
|
||||
RouteError::NoHealthyNodes
|
||||
| RouteError::ModelRecovering { .. }
|
||||
| RouteError::FeasibleNodeUnhealthy { .. } => 503,
|
||||
_ => 404,
|
||||
}
|
||||
}
|
||||
@@ -81,7 +87,8 @@ impl RouteError {
|
||||
| RouteError::EndpointResolveFailed(_, _)
|
||||
| RouteError::NoFeasibleNeuron { .. }
|
||||
| RouteError::ColdLoadFailed { .. }
|
||||
| RouteError::ModelRecovering { .. } => "api_error",
|
||||
| RouteError::ModelRecovering { .. }
|
||||
| RouteError::FeasibleNodeUnhealthy { .. } => "api_error",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,6 +101,7 @@ impl RouteError {
|
||||
RouteError::NoFeasibleNeuron { .. } => "service_unavailable",
|
||||
RouteError::ColdLoadFailed { .. } => "service_unavailable",
|
||||
RouteError::ModelRecovering { .. } => "service_unavailable",
|
||||
RouteError::FeasibleNodeUnhealthy { .. } => "service_unavailable",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,6 +113,7 @@ impl RouteError {
|
||||
pub fn retry_after_secs(&self) -> Option<u64> {
|
||||
match self {
|
||||
RouteError::ModelRecovering { .. } => Some(2),
|
||||
RouteError::FeasibleNodeUnhealthy { .. } => Some(3),
|
||||
RouteError::NoHealthyNodes => Some(5),
|
||||
_ => None,
|
||||
}
|
||||
@@ -252,11 +261,32 @@ async fn pick_feasible_neuron(
|
||||
b.2.cmp(&a.2) // pinned first (true > false)
|
||||
.then(a.0.cmp(&b.0))
|
||||
});
|
||||
let pick = candidates.into_iter().next();
|
||||
pick.map(|(n, e, _)| (n, e))
|
||||
.ok_or_else(|| RouteError::NoFeasibleNeuron {
|
||||
if let Some((n, e, _)) = candidates.into_iter().next() {
|
||||
return Ok((n, e));
|
||||
}
|
||||
|
||||
// No *healthy* feasible neuron. Distinguish a transient outage from a
|
||||
// permanent misconfiguration: if some neuron is topologically feasible
|
||||
// but currently unhealthy (e.g. it briefly missed polls while busy),
|
||||
// this is retryable — return 503 + Retry-After so the client backs off
|
||||
// and retries instead of treating a 404 as a hard failure. Only when no
|
||||
// neuron could *ever* satisfy the topology is it a permanent 404.
|
||||
let feasible_but_unhealthy = nodes.values().any(|node| {
|
||||
!node.healthy
|
||||
&& node
|
||||
.discovery
|
||||
.as_ref()
|
||||
.is_some_and(|disc| profile.is_feasible_on(&node.name, &disc.devices))
|
||||
});
|
||||
if feasible_but_unhealthy {
|
||||
Err(RouteError::FeasibleNodeUnhealthy {
|
||||
model_id: profile.id.clone(),
|
||||
})
|
||||
} else {
|
||||
Err(RouteError::NoFeasibleNeuron {
|
||||
model_id: profile.id.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Issue `POST {endpoint}/models/load` for this profile on this neuron,
|
||||
|
||||
@@ -38,6 +38,7 @@ impl CortexState {
|
||||
discovery: None,
|
||||
activation: None,
|
||||
model_load: HashMap::new(),
|
||||
consecutive_poll_failures: 0,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
124
crates/cortex-gateway/tests/feasibility_routing.rs
Normal file
124
crates/cortex-gateway/tests/feasibility_routing.rs
Normal file
@@ -0,0 +1,124 @@
|
||||
//! Router: a catalogued model whose only topologically-feasible neuron is
|
||||
//! currently unhealthy is a *transient* condition (retryable 503), not a
|
||||
//! permanent 404. This is the exact shape of the beast incident: benjy/
|
||||
//! quadbrat (1 GPU, healthy) can't host the 27B, and beast (2 GPU) — the
|
||||
//! sole feasible node — briefly drops out → clients must back off and retry,
|
||||
//! not hard-fail.
|
||||
|
||||
use cortex_core::config::{
|
||||
EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NeuronEndpoint,
|
||||
};
|
||||
use cortex_core::discovery::{DeviceInfo, DiscoveryResponse};
|
||||
use cortex_gateway::router::{self, RouteError};
|
||||
use cortex_gateway::state::CortexState;
|
||||
use std::sync::Arc;
|
||||
|
||||
fn devices(n: usize) -> Vec<DeviceInfo> {
|
||||
(0..n)
|
||||
.map(|i| DeviceInfo {
|
||||
index: i as u32,
|
||||
name: "RTX 5090".into(),
|
||||
vram_total_mb: 32_768,
|
||||
compute_capability: "9.0".into(),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn discovery(host: &str, n_devices: usize) -> DiscoveryResponse {
|
||||
DiscoveryResponse {
|
||||
hostname: host.into(),
|
||||
os: "Linux".into(),
|
||||
kernel: "7.0".into(),
|
||||
cuda_version: Some("13.0".into()),
|
||||
driver_version: Some("999".into()),
|
||||
devices: devices(n_devices),
|
||||
harnesses: vec!["candle".into()],
|
||||
cuda_unavailable_reason: None,
|
||||
max_prompt_tokens: 49_152,
|
||||
}
|
||||
}
|
||||
|
||||
/// Catalogue with one model needing 2 devices. Returns a temp path.
|
||||
fn write_catalogue() -> std::path::PathBuf {
|
||||
let toml = r#"
|
||||
[[models]]
|
||||
id = "big-model"
|
||||
harness = "candle"
|
||||
min_devices = 2
|
||||
"#;
|
||||
let path = std::env::temp_dir().join("cortex_test_feasibility_models.toml");
|
||||
std::fs::write(&path, toml).unwrap();
|
||||
path
|
||||
}
|
||||
|
||||
async fn fleet_with(big_healthy: bool, big_devices: usize) -> Arc<CortexState> {
|
||||
let cat = write_catalogue();
|
||||
let config = GatewayConfig {
|
||||
gateway: GatewaySettings {
|
||||
listen: "127.0.0.1:0".into(),
|
||||
metrics_listen: "127.0.0.1:0".into(),
|
||||
},
|
||||
eviction: EvictionSettings {
|
||||
strategy: EvictionStrategy::Lru,
|
||||
defrag_after_cycles: 0,
|
||||
},
|
||||
neurons: vec![
|
||||
NeuronEndpoint {
|
||||
name: "small".into(),
|
||||
endpoint: "http://127.0.0.1:1".into(),
|
||||
},
|
||||
NeuronEndpoint {
|
||||
name: "big".into(),
|
||||
endpoint: "http://127.0.0.1:2".into(),
|
||||
},
|
||||
],
|
||||
models_config: cat.to_string_lossy().into_owned(),
|
||||
entitlements: Default::default(),
|
||||
};
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
{
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
// "small" is healthy but only has 1 GPU → not feasible for the model.
|
||||
let small = nodes.get_mut("small").unwrap();
|
||||
small.healthy = true;
|
||||
small.discovery = Some(discovery("small", 1));
|
||||
// "big" has enough GPUs but its health is the variable under test.
|
||||
let big = nodes.get_mut("big").unwrap();
|
||||
big.healthy = big_healthy;
|
||||
big.discovery = Some(discovery("big", big_devices));
|
||||
}
|
||||
fleet
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn feasible_node_unhealthy_is_transient_503() {
|
||||
// big (2 GPU, the only feasible node) is unhealthy; small (1 GPU) is
|
||||
// healthy but can't host the model → retryable, not a permanent 404.
|
||||
let fleet = fleet_with(false, 2).await;
|
||||
let err = router::resolve(&fleet, "big-model")
|
||||
.await
|
||||
.expect_err("model can't be served right now");
|
||||
assert!(
|
||||
matches!(err, RouteError::FeasibleNodeUnhealthy { .. }),
|
||||
"expected FeasibleNodeUnhealthy, got {err:?}"
|
||||
);
|
||||
assert_eq!(err.http_status(), 503);
|
||||
assert_eq!(err.retry_after_secs(), Some(3));
|
||||
assert_eq!(err.code(), "service_unavailable");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_node_can_ever_satisfy_is_permanent_404() {
|
||||
// big is healthy but only has 1 GPU now (e.g. topology genuinely can't
|
||||
// satisfy min_devices=2 anywhere) → permanent, non-retryable 404.
|
||||
let fleet = fleet_with(true, 1).await;
|
||||
let err = router::resolve(&fleet, "big-model")
|
||||
.await
|
||||
.expect_err("no feasible topology");
|
||||
assert!(
|
||||
matches!(err, RouteError::NoFeasibleNeuron { .. }),
|
||||
"expected NoFeasibleNeuron, got {err:?}"
|
||||
);
|
||||
assert_eq!(err.http_status(), 404);
|
||||
assert_eq!(err.retry_after_secs(), None);
|
||||
}
|
||||
@@ -228,10 +228,26 @@ async fn test_poller_marks_unreachable_node_unhealthy() {
|
||||
nodes.get_mut("dead-node").unwrap().healthy = true;
|
||||
}
|
||||
|
||||
// Debounce (#53 follow-up): a single missed poll must NOT evict a
|
||||
// previously-healthy node — a busy neuron briefly slow to answer
|
||||
// shouldn't yank its models out of routing.
|
||||
cortex_gateway::poller::poll_once(&fleet).await;
|
||||
assert!(
|
||||
fleet.nodes.read().await.get("dead-node").unwrap().healthy,
|
||||
"one failed poll should not mark a healthy node unhealthy"
|
||||
);
|
||||
|
||||
let nodes = fleet.nodes.read().await;
|
||||
assert!(!nodes.get("dead-node").unwrap().healthy);
|
||||
// It flips unhealthy only after POLL_FAILURE_THRESHOLD (3) consecutive
|
||||
// failures.
|
||||
cortex_gateway::poller::poll_once(&fleet).await;
|
||||
cortex_gateway::poller::poll_once(&fleet).await;
|
||||
assert!(
|
||||
!fleet.nodes.read().await.get("dead-node").unwrap().healthy,
|
||||
"three consecutive failed polls should mark the node unhealthy"
|
||||
);
|
||||
|
||||
// A subsequent successful poll would reset the counter and restore
|
||||
// health; covered implicitly by the discovery tests above.
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
Reference in New Issue
Block a user