Compare commits

...

1 Commits

Author SHA1 Message Date
dd31c3cd49 feat(#47 #55 phase 2d): cortex load-aware routing across replicas
All checks were successful
CI / Format (push) Successful in 39s
CI / CUDA type-check (push) Successful in 1m50s
CI / Clippy (push) Successful in 2m24s
CI / Test (push) Successful in 4m51s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
Stage 2 completes: when a model is loaded on more than one healthy neuron,
the router picks the least-busy replica instead of always taking the first,
and neuron backpressure propagates to the client intact.

- NodeState.model_load: per-model admission load (in_flight + queue_depth),
  stashed by the poller from neuron's /health (#53/#2b).
- router::resolve collects all loaded replicas and picks the one with the
  lowest in_flight+queue_depth (ties break by node name for determinism),
  replacing the previous first-match-wins.
- Backpressure passthrough: the existing streaming proxy already forwards
  the upstream status + all headers verbatim, so a neuron 503/429 +
  Retry-After + #60 envelope reaches the client unmodified — now covered by
  a regression test so a future change can't silently unwrap it.

Tests (tests/load_routing.rs): routes to the idle replica and follows the
lighter load when it flips; ties break by name; a saturated neuron's 503 +
Retry-After + envelope propagates through the gateway intact. All
cortex-side (no CUDA); local fmt/clippy/test green.

Retry-route-to-another-replica-on-backpressure (the issue's stretch goal)
is deferred — least-busy spread + honest passthrough is the substantive win.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 20:45:50 +03:00
5 changed files with 217 additions and 4 deletions

View File

@@ -1,4 +1,4 @@
use crate::discovery::{ActivationStatus, DiscoveryResponse};
use crate::discovery::{ActivationStatus, DiscoveryResponse, ModelLoad};
use crate::harness::{ModelCost, ModelLimit};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
@@ -27,6 +27,11 @@ pub struct NodeState {
/// to synthesize `Loading` locations so clients see a catalogued
/// model that's mid-prewarm as "loading", not "missing".
pub activation: Option<ActivationStatus>,
/// Last-seen per-model admission load from this neuron's `/health`
/// (#53), keyed by model id. The router (#55) reads it to pick the
/// least-busy replica when a model is loaded on more than one neuron.
/// Empty until the first /health poll reports load.
pub model_load: HashMap<String, ModelLoad>,
}
/// A model registered on a node, with its runtime status.

View File

@@ -200,6 +200,9 @@ async fn poll_health(fleet: &CortexState, name: &str, endpoint: &str) {
let mut nodes = fleet.nodes.write().await;
if let Some(node) = nodes.get_mut(name) {
node.activation = Some(h.activation);
// Per-model admission load (#53) → keyed by id for the
// load-aware router (#55).
node.model_load = h.models.into_iter().map(|m| (m.id.clone(), m)).collect();
}
}
Err(e) => {

View File

@@ -132,7 +132,9 @@ pub async fn resolve(
// Snapshot loaded / unloaded / recovering state from the poller cache.
let (loaded_route, unloaded_route, recovering_node, any_healthy) = {
let nodes = fleet.nodes.read().await;
let mut loaded_route = None;
// All healthy nodes with the model loaded, each with its current
// admission load (#53) so we can pick the least-busy replica (#55).
let mut loaded_candidates: Vec<(String, String, usize)> = Vec::new();
let mut unloaded_route = None;
let mut recovering_node = None;
let mut any_healthy = false;
@@ -144,8 +146,15 @@ pub async fn resolve(
if let Some(entry) = node.models.get(model_id) {
match entry.status {
ModelStatus::Loaded | ModelStatus::Reloading => {
loaded_route = Some((node.name.clone(), node.endpoint.clone(), false));
break;
// Least-busy score: in-flight + queued from the
// neuron's last /health (#53). Unknown load (no poll
// yet) scores 0 so the replica stays eligible.
let score = node
.model_load
.get(model_id)
.map(|l| l.in_flight + l.queue_depth)
.unwrap_or(0);
loaded_candidates.push((node.name.clone(), node.endpoint.clone(), score));
}
ModelStatus::Unloaded => {
if unloaded_route.is_none() {
@@ -175,6 +184,12 @@ pub async fn resolve(
}
}
}
// Pick the least-busy loaded replica; ties break by node name for
// deterministic routing. `false` = not a cold start.
let loaded_route = loaded_candidates
.into_iter()
.min_by(|a, b| a.2.cmp(&b.2).then_with(|| a.0.cmp(&b.0)))
.map(|(name, endpoint, _score)| (name, endpoint, false));
(loaded_route, unloaded_route, recovering_node, any_healthy)
};

View File

@@ -37,6 +37,7 @@ impl CortexState {
last_poll: None,
discovery: None,
activation: None,
model_load: HashMap::new(),
},
);
}

View File

@@ -0,0 +1,189 @@
//! Load-aware routing across replicas (#55).
//!
//! When a model is loaded on more than one healthy neuron, the router picks
//! the least-busy replica using the per-model admission load each neuron
//! reports on `GET /health` (#53), rather than always taking the first.
mod common;
use axum::Json;
use axum::extract::Path;
use axum::http::{StatusCode, header};
use axum::response::IntoResponse;
use axum::routing::{get, post};
use cortex_core::config::{
EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NeuronEndpoint,
};
use cortex_core::discovery::ModelLoad;
use cortex_core::node::{ModelEntry, ModelStatus};
use cortex_gateway::state::CortexState;
use serde_json::{Value, json};
use std::sync::Arc;
use tokio::net::TcpListener;
/// Seed a node as healthy with `test-model` loaded and a given admission load.
async fn seed_loaded(fleet: &CortexState, node: &str, in_flight: usize, queue_depth: usize) {
let mut nodes = fleet.nodes.write().await;
let n = nodes.get_mut(node).expect("node exists");
n.healthy = true;
n.models.insert(
"test-model".into(),
ModelEntry {
id: "test-model".into(),
status: ModelStatus::Loaded,
last_accessed: None,
vram_estimate_mb: Some(8000),
capabilities: Vec::new(),
tool_call: false,
reasoning: false,
limit: None,
},
);
n.model_load.insert(
"test-model".into(),
ModelLoad {
id: "test-model".into(),
in_flight,
queue_depth,
},
);
}
/// Build a gateway state over two mock neurons (no poller; we seed state).
async fn two_neuron_fleet(endpoint_a: &str, endpoint_b: &str) -> Arc<CortexState> {
let config = GatewayConfig {
gateway: GatewaySettings {
listen: "127.0.0.1:0".into(),
metrics_listen: "127.0.0.1:0".into(),
},
eviction: EvictionSettings {
strategy: EvictionStrategy::Lru,
defrag_after_cycles: 0,
},
neurons: vec![
NeuronEndpoint {
name: "node-a".into(),
endpoint: endpoint_a.to_string(),
},
NeuronEndpoint {
name: "node-b".into(),
endpoint: endpoint_b.to_string(),
},
],
models_config: "/dev/null".into(),
entitlements: Default::default(),
};
Arc::new(CortexState::from_config(&config))
}
#[tokio::test]
async fn routes_to_least_busy_replica() {
let neuron_a = common::spawn_mock_neuron().await;
let neuron_b = common::spawn_mock_neuron().await;
let fleet = two_neuron_fleet(&neuron_a, &neuron_b).await;
// A is busy (1 running + 3 queued), B is idle.
seed_loaded(&fleet, "node-a", 1, 3).await;
seed_loaded(&fleet, "node-b", 0, 0).await;
let route = cortex_gateway::router::resolve(&fleet, "test-model")
.await
.expect("model is loaded on both nodes");
assert_eq!(route.node_name, "node-b", "should pick the idle replica");
// Flip the load: now B is the busy one.
seed_loaded(&fleet, "node-a", 0, 0).await;
seed_loaded(&fleet, "node-b", 1, 5).await;
let route = cortex_gateway::router::resolve(&fleet, "test-model")
.await
.expect("still loaded");
assert_eq!(route.node_name, "node-a", "should follow the lighter load");
}
/// Mock neuron whose inference endpoint always returns a #63 backpressure
/// envelope (503 + Retry-After) — simulating a saturated neuron.
async fn spawn_busy_neuron() -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let base_url = format!("http://{addr}");
let inference_url = base_url.clone();
let app = axum::Router::new()
.route(
"/models/{model_id}/endpoint",
get(move |Path(_): Path<String>| {
let url = inference_url.clone();
async move { Json(json!({ "url": url })) }
}),
)
.route(
"/v1/chat/completions",
post(|| async {
let body = json!({"error": {
"message": "model is busy (admission queue full); retry shortly",
"type": "rate_limit_error",
"code": "rate_limit_exceeded",
"param": null
}});
(
StatusCode::SERVICE_UNAVAILABLE,
[(header::RETRY_AFTER, "6")],
Json(body),
)
.into_response()
}),
);
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
base_url
}
#[tokio::test]
async fn neuron_backpressure_is_propagated_intact() {
// A saturated neuron's 503 + Retry-After + envelope must reach the client
// verbatim — not unwrapped, remapped, or stripped (#55 / #63).
let neuron = spawn_busy_neuron().await;
let fleet = two_neuron_fleet(&neuron, &neuron).await;
seed_loaded(&fleet, "node-a", 1, 8).await;
let app = cortex_gateway::build_app(Arc::clone(&fleet));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
let resp = reqwest::Client::new()
.post(format!("http://{addr}/v1/chat/completions"))
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(
resp.headers()
.get(reqwest::header::RETRY_AFTER)
.and_then(|v| v.to_str().ok()),
Some("6"),
"Retry-After must survive the proxy"
);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"]["code"], "rate_limit_exceeded");
}
#[tokio::test]
async fn ties_break_deterministically_by_name() {
let neuron_a = common::spawn_mock_neuron().await;
let neuron_b = common::spawn_mock_neuron().await;
let fleet = two_neuron_fleet(&neuron_a, &neuron_b).await;
// Equal load on both → stable pick (lowest node name).
seed_loaded(&fleet, "node-a", 0, 0).await;
seed_loaded(&fleet, "node-b", 0, 0).await;
let route = cortex_gateway::router::resolve(&fleet, "test-model")
.await
.expect("loaded");
assert_eq!(route.node_name, "node-a", "ties break by name");
}