Compare commits

..

2 Commits

Author SHA1 Message Date
057bc71e80 feat(#47 #56 phase 3): fail-fast prompt pre-validation + advisory hints
All checks were successful
CI / Format (push) Successful in 29s
CI / CUDA type-check (push) Successful in 1m37s
CI / Clippy (push) Successful in 2m35s
CI / Test (push) Successful in 5m4s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
Stage 3 (DX): A0 burned an hour then failed deep in litellm with
prompt_too_long (35544 > 32768). cortex knows each model's real context
window (#62/#67) and can pre-empt that at the edge.

- Pre-validate the prompt against the model's advertised limit.context
  before dispatch (in proxy_with_metrics, covering chat/completions/
  responses). Over → 400 context_length_exceeded in the #60 envelope — the
  same shape neuron emits on overflow, just earlier and without burning a
  cold-load/queue slot. cortex has no tokenizer, so estimate_prompt_tokens
  under-counts (~4 chars/token over message text); neuron stays the exact
  wall and we only catch gross overages. Skipped when no limit is known.
- Advisory X-Helexa-Advice header: fingerprints User-Agent
  (litellm / Agent-Zero / Zed) and attaches client-specific guidance.
  Strictly advisory — header only, never in the error envelope, behaviour
  never depends on it; unknown clients get nothing.

3 integration tests: over-long prompt → 400 context_length_exceeded with
the advice header, refused before neuron is hit; within-context passes
through; unknown client gets a clean 400 with no advice header. cortex-side
(no CUDA); local fmt/clippy/test green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 20:50:38 +03:00
dd31c3cd49 feat(#47 #55 phase 2d): cortex load-aware routing across replicas
All checks were successful
CI / Format (push) Successful in 39s
CI / CUDA type-check (push) Successful in 1m50s
CI / Clippy (push) Successful in 2m24s
CI / Test (push) Successful in 4m51s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
Stage 2 completes: when a model is loaded on more than one healthy neuron,
the router picks the least-busy replica instead of always taking the first,
and neuron backpressure propagates to the client intact.

- NodeState.model_load: per-model admission load (in_flight + queue_depth),
  stashed by the poller from neuron's /health (#53/#2b).
- router::resolve collects all loaded replicas and picks the one with the
  lowest in_flight+queue_depth (ties break by node name for determinism),
  replacing the previous first-match-wins.
- Backpressure passthrough: the existing streaming proxy already forwards
  the upstream status + all headers verbatim, so a neuron 503/429 +
  Retry-After + #60 envelope reaches the client unmodified — now covered by
  a regression test so a future change can't silently unwrap it.

Tests (tests/load_routing.rs): routes to the idle replica and follows the
lighter load when it flips; ties break by name; a saturated neuron's 503 +
Retry-After + envelope propagates through the gateway intact. All
cortex-side (no CUDA); local fmt/clippy/test green.

Retry-route-to-another-replica-on-backpressure (the issue's stretch goal)
is deferred — least-busy spread + honest passthrough is the substantive win.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 20:45:50 +03:00
7 changed files with 496 additions and 4 deletions

View File

@@ -1,4 +1,4 @@
use crate::discovery::{ActivationStatus, DiscoveryResponse};
use crate::discovery::{ActivationStatus, DiscoveryResponse, ModelLoad};
use crate::harness::{ModelCost, ModelLimit};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
@@ -27,6 +27,11 @@ pub struct NodeState {
/// to synthesize `Loading` locations so clients see a catalogued
/// model that's mid-prewarm as "loading", not "missing".
pub activation: Option<ActivationStatus>,
/// Last-seen per-model admission load from this neuron's `/health`
/// (#53), keyed by model id. The router (#55) reads it to pick the
/// least-busy replica when a model is loaded on more than one neuron.
/// Empty until the first /health poll reports load.
pub model_load: HashMap<String, ModelLoad>,
}
/// A model registered on a node, with its runtime status.

View File

@@ -761,6 +761,19 @@ async fn proxy_with_metrics(
body: Bytes,
model_id: &str,
) -> Response {
// Fail-fast prompt pre-validation (#56): refuse a prompt that already
// exceeds the model's advertised context window *before* dispatching to
// neuron — the same `400 context_length_exceeded` neuron would emit on
// overflow, just earlier and without burning a cold-load/queue slot.
// cortex has no tokenizer, so the estimate under-counts and neuron stays
// the exact wall; we only catch gross overages (the A0 failure mode).
if let Some(context) = advertised_context(fleet, &route.node_name, model_id).await {
let est = estimate_prompt_tokens(&body);
if est > context {
return context_length_exceeded_response(context, est, &headers);
}
}
let labels = [
("model", model_id.to_string()),
("node", route.node_name.clone()),
@@ -844,6 +857,98 @@ async fn advertised_output_limit(
.map(|l| l.output as u64)
}
/// The model's advertised hard context window (`limit.context`, #62/#67) on a
/// node, used for fail-fast prompt pre-validation (#56). `None` when no limit
/// is known — pre-validation is then skipped and neuron remains the wall.
async fn advertised_context(fleet: &CortexState, node_name: &str, model_id: &str) -> Option<u64> {
let nodes = fleet.nodes.read().await;
nodes
.get(node_name)?
.models
.get(model_id)?
.limit
.as_ref()
.map(|l| l.context as u64)
}
/// Conservative prompt-token estimate (~4 chars/token over message text).
/// cortex has no tokenizer; under-counting is the safe direction — we only
/// pre-reject gross overages (#56), and neuron enforces the exact wall.
fn estimate_prompt_tokens(body: &[u8]) -> u64 {
let Ok(v) = serde_json::from_slice::<Value>(body) else {
return (body.len() as u64 / 4).max(1);
};
let mut chars = 0usize;
if let Some(messages) = v.get("messages").and_then(Value::as_array) {
for m in messages {
match m.get("content") {
Some(Value::String(s)) => chars += s.len(),
Some(Value::Array(parts)) => {
for p in parts {
if let Some(t) = p.get("text").and_then(Value::as_str) {
chars += t.len();
}
}
}
_ => {}
}
chars += 8; // rough per-message role/formatting overhead
}
} else if let Some(prompt) = v.get("prompt").and_then(Value::as_str) {
chars += prompt.len(); // legacy /v1/completions
} else {
return (body.len() as u64 / 4).max(1);
}
(chars as u64 / 4).max(1)
}
/// Client-specific, advisory guidance for an over-long prompt (#56),
/// fingerprinted from `User-Agent`. Strictly advisory: it rides the
/// `X-Helexa-Advice` header only, never the error envelope, and behaviour
/// never depends on it. Unknown clients get nothing.
fn client_advice(headers: &HeaderMap) -> Option<&'static str> {
let ua = headers
.get(axum::http::header::USER_AGENT)?
.to_str()
.ok()?
.to_ascii_lowercase();
if ua.contains("litellm") {
Some(
"litellm forwards the full context; lower the configured context window or enable client-side compaction",
)
} else if ua.contains("agent-zero") || ua.contains("agent zero") {
Some("reduce the conversation/context size or summarize earlier turns before resending")
} else if ua.contains("zed") {
Some("reduce the assistant context window in Zed's settings")
} else {
None
}
}
/// `400 context_length_exceeded` for an over-long prompt caught at the edge
/// (#56), in the #60 envelope — the same shape neuron emits on overflow, so
/// clients (opencode auto-compacts) handle it identically. Attaches the
/// advisory `X-Helexa-Advice` header for fingerprinted clients.
fn context_length_exceeded_response(
context: u64,
prompt_est: u64,
headers: &HeaderMap,
) -> Response {
let env = OpenAiError::context_length_exceeded(format!(
"This model's maximum context length is {context} tokens. Your request is \
estimated at ~{prompt_est} tokens. Please reduce the length of the messages."
))
.with_extra("max", json!(context))
.with_extra("estimated_prompt_tokens", json!(prompt_est));
let mut response = crate::error::envelope_response(env);
if let Some(advice) = client_advice(headers)
&& let Ok(value) = axum::http::HeaderValue::from_str(advice)
{
response.headers_mut().insert("x-helexa-advice", value);
}
response
}
/// Update `last_accessed` timestamp for a model on a node (drives LRU eviction).
async fn touch_model(fleet: &CortexState, node_name: &str, model_id: &str) {
let mut nodes = fleet.nodes.write().await;

View File

@@ -200,6 +200,9 @@ async fn poll_health(fleet: &CortexState, name: &str, endpoint: &str) {
let mut nodes = fleet.nodes.write().await;
if let Some(node) = nodes.get_mut(name) {
node.activation = Some(h.activation);
// Per-model admission load (#53) → keyed by id for the
// load-aware router (#55).
node.model_load = h.models.into_iter().map(|m| (m.id.clone(), m)).collect();
}
}
Err(e) => {

View File

@@ -132,7 +132,9 @@ pub async fn resolve(
// Snapshot loaded / unloaded / recovering state from the poller cache.
let (loaded_route, unloaded_route, recovering_node, any_healthy) = {
let nodes = fleet.nodes.read().await;
let mut loaded_route = None;
// All healthy nodes with the model loaded, each with its current
// admission load (#53) so we can pick the least-busy replica (#55).
let mut loaded_candidates: Vec<(String, String, usize)> = Vec::new();
let mut unloaded_route = None;
let mut recovering_node = None;
let mut any_healthy = false;
@@ -144,8 +146,15 @@ pub async fn resolve(
if let Some(entry) = node.models.get(model_id) {
match entry.status {
ModelStatus::Loaded | ModelStatus::Reloading => {
loaded_route = Some((node.name.clone(), node.endpoint.clone(), false));
break;
// Least-busy score: in-flight + queued from the
// neuron's last /health (#53). Unknown load (no poll
// yet) scores 0 so the replica stays eligible.
let score = node
.model_load
.get(model_id)
.map(|l| l.in_flight + l.queue_depth)
.unwrap_or(0);
loaded_candidates.push((node.name.clone(), node.endpoint.clone(), score));
}
ModelStatus::Unloaded => {
if unloaded_route.is_none() {
@@ -175,6 +184,12 @@ pub async fn resolve(
}
}
}
// Pick the least-busy loaded replica; ties break by node name for
// deterministic routing. `false` = not a cold start.
let loaded_route = loaded_candidates
.into_iter()
.min_by(|a, b| a.2.cmp(&b.2).then_with(|| a.0.cmp(&b.0)))
.map(|(name, endpoint, _score)| (name, endpoint, false));
(loaded_route, unloaded_route, recovering_node, any_healthy)
};

View File

@@ -37,6 +37,7 @@ impl CortexState {
last_poll: None,
discovery: None,
activation: None,
model_load: HashMap::new(),
},
);
}

View File

@@ -0,0 +1,189 @@
//! Load-aware routing across replicas (#55).
//!
//! When a model is loaded on more than one healthy neuron, the router picks
//! the least-busy replica using the per-model admission load each neuron
//! reports on `GET /health` (#53), rather than always taking the first.
mod common;
use axum::Json;
use axum::extract::Path;
use axum::http::{StatusCode, header};
use axum::response::IntoResponse;
use axum::routing::{get, post};
use cortex_core::config::{
EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NeuronEndpoint,
};
use cortex_core::discovery::ModelLoad;
use cortex_core::node::{ModelEntry, ModelStatus};
use cortex_gateway::state::CortexState;
use serde_json::{Value, json};
use std::sync::Arc;
use tokio::net::TcpListener;
/// Seed a node as healthy with `test-model` loaded and a given admission load.
async fn seed_loaded(fleet: &CortexState, node: &str, in_flight: usize, queue_depth: usize) {
let mut nodes = fleet.nodes.write().await;
let n = nodes.get_mut(node).expect("node exists");
n.healthy = true;
n.models.insert(
"test-model".into(),
ModelEntry {
id: "test-model".into(),
status: ModelStatus::Loaded,
last_accessed: None,
vram_estimate_mb: Some(8000),
capabilities: Vec::new(),
tool_call: false,
reasoning: false,
limit: None,
},
);
n.model_load.insert(
"test-model".into(),
ModelLoad {
id: "test-model".into(),
in_flight,
queue_depth,
},
);
}
/// Build a gateway state over two mock neurons (no poller; we seed state).
async fn two_neuron_fleet(endpoint_a: &str, endpoint_b: &str) -> Arc<CortexState> {
let config = GatewayConfig {
gateway: GatewaySettings {
listen: "127.0.0.1:0".into(),
metrics_listen: "127.0.0.1:0".into(),
},
eviction: EvictionSettings {
strategy: EvictionStrategy::Lru,
defrag_after_cycles: 0,
},
neurons: vec![
NeuronEndpoint {
name: "node-a".into(),
endpoint: endpoint_a.to_string(),
},
NeuronEndpoint {
name: "node-b".into(),
endpoint: endpoint_b.to_string(),
},
],
models_config: "/dev/null".into(),
entitlements: Default::default(),
};
Arc::new(CortexState::from_config(&config))
}
#[tokio::test]
async fn routes_to_least_busy_replica() {
let neuron_a = common::spawn_mock_neuron().await;
let neuron_b = common::spawn_mock_neuron().await;
let fleet = two_neuron_fleet(&neuron_a, &neuron_b).await;
// A is busy (1 running + 3 queued), B is idle.
seed_loaded(&fleet, "node-a", 1, 3).await;
seed_loaded(&fleet, "node-b", 0, 0).await;
let route = cortex_gateway::router::resolve(&fleet, "test-model")
.await
.expect("model is loaded on both nodes");
assert_eq!(route.node_name, "node-b", "should pick the idle replica");
// Flip the load: now B is the busy one.
seed_loaded(&fleet, "node-a", 0, 0).await;
seed_loaded(&fleet, "node-b", 1, 5).await;
let route = cortex_gateway::router::resolve(&fleet, "test-model")
.await
.expect("still loaded");
assert_eq!(route.node_name, "node-a", "should follow the lighter load");
}
/// Mock neuron whose inference endpoint always returns a #63 backpressure
/// envelope (503 + Retry-After) — simulating a saturated neuron.
async fn spawn_busy_neuron() -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let base_url = format!("http://{addr}");
let inference_url = base_url.clone();
let app = axum::Router::new()
.route(
"/models/{model_id}/endpoint",
get(move |Path(_): Path<String>| {
let url = inference_url.clone();
async move { Json(json!({ "url": url })) }
}),
)
.route(
"/v1/chat/completions",
post(|| async {
let body = json!({"error": {
"message": "model is busy (admission queue full); retry shortly",
"type": "rate_limit_error",
"code": "rate_limit_exceeded",
"param": null
}});
(
StatusCode::SERVICE_UNAVAILABLE,
[(header::RETRY_AFTER, "6")],
Json(body),
)
.into_response()
}),
);
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
base_url
}
#[tokio::test]
async fn neuron_backpressure_is_propagated_intact() {
// A saturated neuron's 503 + Retry-After + envelope must reach the client
// verbatim — not unwrapped, remapped, or stripped (#55 / #63).
let neuron = spawn_busy_neuron().await;
let fleet = two_neuron_fleet(&neuron, &neuron).await;
seed_loaded(&fleet, "node-a", 1, 8).await;
let app = cortex_gateway::build_app(Arc::clone(&fleet));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
let resp = reqwest::Client::new()
.post(format!("http://{addr}/v1/chat/completions"))
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(
resp.headers()
.get(reqwest::header::RETRY_AFTER)
.and_then(|v| v.to_str().ok()),
Some("6"),
"Retry-After must survive the proxy"
);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"]["code"], "rate_limit_exceeded");
}
#[tokio::test]
async fn ties_break_deterministically_by_name() {
let neuron_a = common::spawn_mock_neuron().await;
let neuron_b = common::spawn_mock_neuron().await;
let fleet = two_neuron_fleet(&neuron_a, &neuron_b).await;
// Equal load on both → stable pick (lowest node name).
seed_loaded(&fleet, "node-a", 0, 0).await;
seed_loaded(&fleet, "node-b", 0, 0).await;
let route = cortex_gateway::router::resolve(&fleet, "test-model")
.await
.expect("loaded");
assert_eq!(route.node_name, "node-a", "ties break by name");
}

View File

@@ -0,0 +1,174 @@
//! Fail-fast prompt pre-validation + advisory client hints (#56).
//!
//! cortex refuses a prompt that already exceeds the model's advertised
//! context window before dispatching to neuron — the same #60
//! `context_length_exceeded` envelope neuron would emit, just earlier — and
//! attaches an advisory `X-Helexa-Advice` header for fingerprinted clients.
use axum::Json;
use axum::extract::Path;
use axum::routing::{get, post};
use cortex_core::config::{
EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NeuronEndpoint,
};
use cortex_core::harness::ModelLimit;
use cortex_core::node::{ModelEntry, ModelStatus};
use cortex_gateway::state::CortexState;
use serde_json::{Value, json};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::net::TcpListener;
/// Mock neuron with a hit counter, so a test can prove a request was (or
/// wasn't) dispatched past the gateway's pre-validation.
async fn spawn_counting_neuron() -> (String, Arc<AtomicU64>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let base_url = format!("http://{addr}");
let inference_url = base_url.clone();
let hits = Arc::new(AtomicU64::new(0));
let sink = Arc::clone(&hits);
let app = axum::Router::new()
.route(
"/models/{model_id}/endpoint",
get(move |Path(_): Path<String>| {
let url = inference_url.clone();
async move { Json(json!({ "url": url })) }
}),
)
.route(
"/v1/chat/completions",
post(move || {
let sink = Arc::clone(&sink);
async move {
sink.fetch_add(1, Ordering::SeqCst);
Json(json!({
"id": "c", "object": "chat.completion", "created": 1_700_000_000_u64,
"model": "test-model",
"choices": [{"index": 0, "message": {"role": "assistant", "content": "ok"}, "finish_reason": "stop"}],
"usage": {"prompt_tokens": 3, "completion_tokens": 1, "total_tokens": 4}
}))
}
}),
);
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
(base_url, hits)
}
/// Gateway over one neuron with `test-model` loaded and a tiny advertised
/// context window (so a modest prompt overflows it).
async fn spawn_gateway(neuron: &str, context: usize) -> String {
let config = GatewayConfig {
gateway: GatewaySettings {
listen: "127.0.0.1:0".into(),
metrics_listen: "127.0.0.1:0".into(),
},
eviction: EvictionSettings {
strategy: EvictionStrategy::Lru,
defrag_after_cycles: 0,
},
neurons: vec![NeuronEndpoint {
name: "mock-node".into(),
endpoint: neuron.to_string(),
}],
models_config: "/dev/null".into(),
entitlements: Default::default(),
};
let fleet = Arc::new(CortexState::from_config(&config));
{
let mut nodes = fleet.nodes.write().await;
let n = nodes.get_mut("mock-node").unwrap();
n.healthy = true;
n.models.insert(
"test-model".into(),
ModelEntry {
id: "test-model".into(),
status: ModelStatus::Loaded,
last_accessed: None,
vram_estimate_mb: Some(8000),
capabilities: Vec::new(),
tool_call: false,
reasoning: false,
limit: Some(ModelLimit {
context,
input: None,
output: 16,
}),
},
);
}
let app = cortex_gateway::build_app(Arc::clone(&fleet));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
format!("http://{addr}")
}
#[tokio::test]
async fn over_long_prompt_is_rejected_before_dispatch() {
let (neuron, hits) = spawn_counting_neuron().await;
let gateway = spawn_gateway(&neuron, 50).await; // tiny 50-token window
// ~1200 chars → ~300 est tokens, well over 50.
let big = "word ".repeat(240);
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
.header("user-agent", "litellm/1.0")
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": big}]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
// Advisory hint for the fingerprinted client (header only, never body).
assert!(
resp.headers().get("x-helexa-advice").is_some(),
"litellm should get advice"
);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"]["code"], "context_length_exceeded");
assert_eq!(body["error"]["max"], 50);
// Refused at the edge — neuron never saw it.
assert_eq!(hits.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn within_context_passes_through() {
let (neuron, hits) = spawn_counting_neuron().await;
let gateway = spawn_gateway(&neuron, 4096).await;
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let _ = resp.bytes().await.unwrap();
assert_eq!(hits.load(Ordering::SeqCst), 1, "served by neuron");
}
#[tokio::test]
async fn unknown_client_gets_no_advice_header() {
let (neuron, _hits) = spawn_counting_neuron().await;
let gateway = spawn_gateway(&neuron, 50).await;
let big = "word ".repeat(240);
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
// no/unknown User-Agent → no advice, but still a clean 400
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": big}]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
assert!(resp.headers().get("x-helexa-advice").is_none());
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"]["code"], "context_length_exceeded");
}