Compare commits
8 Commits
feat/47-ph
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
3b9a6e37f6
|
|||
|
526b662c5e
|
|||
|
db7e373b90
|
|||
|
5c1623a817
|
|||
|
3b60dd7a31
|
|||
|
4feaaf1cfb
|
|||
|
057bc71e80
|
|||
|
c83f1eb98c
|
@@ -32,6 +32,12 @@ pub struct NodeState {
|
||||
/// least-busy replica when a model is loaded on more than one neuron.
|
||||
/// Empty until the first /health poll reports load.
|
||||
pub model_load: HashMap<String, ModelLoad>,
|
||||
/// Consecutive failed `/models` polls. The poller marks a node
|
||||
/// unhealthy only once this crosses a threshold, so a single transient
|
||||
/// miss (e.g. a neuron momentarily slow to answer while busy) doesn't
|
||||
/// yank the node — and all its models — out of routing. Reset to 0 on
|
||||
/// any successful poll.
|
||||
pub consecutive_poll_failures: u32,
|
||||
}
|
||||
|
||||
/// A model registered on a node, with its runtime status.
|
||||
|
||||
@@ -83,9 +83,23 @@ pub async fn require_principal(
|
||||
req.extensions_mut().insert(principal);
|
||||
next.run(req).await
|
||||
}
|
||||
// A present-but-invalid credential is always an error, even when
|
||||
// anonymous access is otherwise allowed.
|
||||
Err(_) => unauthorized("invalid API key"),
|
||||
// An unrecognized key only hard-fails when auth is *required*.
|
||||
// In allow-anonymous mode (the default) we must IGNORE it and
|
||||
// serve the request unauthenticated — otherwise the placeholder
|
||||
// keys that OpenAI-compatible clients send by default (opencode,
|
||||
// Open WebUI, Agent Zero, litellm) would all break, even though
|
||||
// the operator never opted into auth. Pre-#49 the bearer was
|
||||
// never inspected at all; this preserves that for require_auth=false.
|
||||
Err(_) => {
|
||||
if fleet.require_auth {
|
||||
unauthorized("invalid API key")
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"ignoring unrecognized bearer token (require_auth=false): serving anonymously"
|
||||
);
|
||||
next.run(req).await
|
||||
}
|
||||
}
|
||||
},
|
||||
None => {
|
||||
if fleet.require_auth {
|
||||
|
||||
@@ -761,6 +761,19 @@ async fn proxy_with_metrics(
|
||||
body: Bytes,
|
||||
model_id: &str,
|
||||
) -> Response {
|
||||
// Fail-fast prompt pre-validation (#56): refuse a prompt that already
|
||||
// exceeds the model's advertised context window *before* dispatching to
|
||||
// neuron — the same `400 context_length_exceeded` neuron would emit on
|
||||
// overflow, just earlier and without burning a cold-load/queue slot.
|
||||
// cortex has no tokenizer, so the estimate under-counts and neuron stays
|
||||
// the exact wall; we only catch gross overages (the A0 failure mode).
|
||||
if let Some(context) = advertised_context(fleet, &route.node_name, model_id).await {
|
||||
let est = estimate_prompt_tokens(&body);
|
||||
if est > context {
|
||||
return context_length_exceeded_response(context, est, &headers);
|
||||
}
|
||||
}
|
||||
|
||||
let labels = [
|
||||
("model", model_id.to_string()),
|
||||
("node", route.node_name.clone()),
|
||||
@@ -844,6 +857,98 @@ async fn advertised_output_limit(
|
||||
.map(|l| l.output as u64)
|
||||
}
|
||||
|
||||
/// The model's advertised hard context window (`limit.context`, #62/#67) on a
|
||||
/// node, used for fail-fast prompt pre-validation (#56). `None` when no limit
|
||||
/// is known — pre-validation is then skipped and neuron remains the wall.
|
||||
async fn advertised_context(fleet: &CortexState, node_name: &str, model_id: &str) -> Option<u64> {
|
||||
let nodes = fleet.nodes.read().await;
|
||||
nodes
|
||||
.get(node_name)?
|
||||
.models
|
||||
.get(model_id)?
|
||||
.limit
|
||||
.as_ref()
|
||||
.map(|l| l.context as u64)
|
||||
}
|
||||
|
||||
/// Conservative prompt-token estimate (~4 chars/token over message text).
|
||||
/// cortex has no tokenizer; under-counting is the safe direction — we only
|
||||
/// pre-reject gross overages (#56), and neuron enforces the exact wall.
|
||||
fn estimate_prompt_tokens(body: &[u8]) -> u64 {
|
||||
let Ok(v) = serde_json::from_slice::<Value>(body) else {
|
||||
return (body.len() as u64 / 4).max(1);
|
||||
};
|
||||
let mut chars = 0usize;
|
||||
if let Some(messages) = v.get("messages").and_then(Value::as_array) {
|
||||
for m in messages {
|
||||
match m.get("content") {
|
||||
Some(Value::String(s)) => chars += s.len(),
|
||||
Some(Value::Array(parts)) => {
|
||||
for p in parts {
|
||||
if let Some(t) = p.get("text").and_then(Value::as_str) {
|
||||
chars += t.len();
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
chars += 8; // rough per-message role/formatting overhead
|
||||
}
|
||||
} else if let Some(prompt) = v.get("prompt").and_then(Value::as_str) {
|
||||
chars += prompt.len(); // legacy /v1/completions
|
||||
} else {
|
||||
return (body.len() as u64 / 4).max(1);
|
||||
}
|
||||
(chars as u64 / 4).max(1)
|
||||
}
|
||||
|
||||
/// Client-specific, advisory guidance for an over-long prompt (#56),
|
||||
/// fingerprinted from `User-Agent`. Strictly advisory: it rides the
|
||||
/// `X-Helexa-Advice` header only, never the error envelope, and behaviour
|
||||
/// never depends on it. Unknown clients get nothing.
|
||||
fn client_advice(headers: &HeaderMap) -> Option<&'static str> {
|
||||
let ua = headers
|
||||
.get(axum::http::header::USER_AGENT)?
|
||||
.to_str()
|
||||
.ok()?
|
||||
.to_ascii_lowercase();
|
||||
if ua.contains("litellm") {
|
||||
Some(
|
||||
"litellm forwards the full context; lower the configured context window or enable client-side compaction",
|
||||
)
|
||||
} else if ua.contains("agent-zero") || ua.contains("agent zero") {
|
||||
Some("reduce the conversation/context size or summarize earlier turns before resending")
|
||||
} else if ua.contains("zed") {
|
||||
Some("reduce the assistant context window in Zed's settings")
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// `400 context_length_exceeded` for an over-long prompt caught at the edge
|
||||
/// (#56), in the #60 envelope — the same shape neuron emits on overflow, so
|
||||
/// clients (opencode auto-compacts) handle it identically. Attaches the
|
||||
/// advisory `X-Helexa-Advice` header for fingerprinted clients.
|
||||
fn context_length_exceeded_response(
|
||||
context: u64,
|
||||
prompt_est: u64,
|
||||
headers: &HeaderMap,
|
||||
) -> Response {
|
||||
let env = OpenAiError::context_length_exceeded(format!(
|
||||
"This model's maximum context length is {context} tokens. Your request is \
|
||||
estimated at ~{prompt_est} tokens. Please reduce the length of the messages."
|
||||
))
|
||||
.with_extra("max", json!(context))
|
||||
.with_extra("estimated_prompt_tokens", json!(prompt_est));
|
||||
let mut response = crate::error::envelope_response(env);
|
||||
if let Some(advice) = client_advice(headers)
|
||||
&& let Ok(value) = axum::http::HeaderValue::from_str(advice)
|
||||
{
|
||||
response.headers_mut().insert("x-helexa-advice", value);
|
||||
}
|
||||
response
|
||||
}
|
||||
|
||||
/// Update `last_accessed` timestamp for a model on a node (drives LRU eviction).
|
||||
async fn touch_model(fleet: &CortexState, node_name: &str, model_id: &str) {
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
|
||||
@@ -5,12 +5,29 @@ use crate::state::CortexState;
|
||||
use chrono::Utc;
|
||||
use cortex_core::discovery::{DiscoveryResponse, HealthResponse};
|
||||
use cortex_core::harness::ModelInfo;
|
||||
use cortex_core::node::{ModelEntry, ModelStatus};
|
||||
use cortex_core::node::{ModelEntry, ModelStatus, NodeState};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
const POLL_INTERVAL: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Consecutive failed `/models` polls before a node is marked unhealthy.
|
||||
/// Debounces transient misses (a busy neuron briefly slow to answer) so a
|
||||
/// single blip can't yank a node — and its models — out of routing. At the
|
||||
/// 10s poll interval this tolerates ~20s of flapping before evicting.
|
||||
const POLL_FAILURE_THRESHOLD: u32 = 3;
|
||||
|
||||
/// Record a failed poll for `node`, marking it unhealthy only once failures
|
||||
/// reach [`POLL_FAILURE_THRESHOLD`]. Below the threshold the node keeps its
|
||||
/// last-known health, riding over transient misses. A successful poll resets
|
||||
/// the counter (see the success arm in `poll_once`).
|
||||
fn record_poll_failure(node: &mut NodeState) {
|
||||
node.consecutive_poll_failures = node.consecutive_poll_failures.saturating_add(1);
|
||||
if node.consecutive_poll_failures >= POLL_FAILURE_THRESHOLD {
|
||||
node.healthy = false;
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs forever, polling all neurons on a fixed interval.
|
||||
pub async fn poll_loop(fleet: Arc<CortexState>) {
|
||||
loop {
|
||||
@@ -138,13 +155,14 @@ async fn poll_neuron(fleet: &CortexState, name: &str, endpoint: &str) {
|
||||
// Remove models no longer reported by the neuron.
|
||||
node.models.retain(|id, _| seen.contains(id));
|
||||
|
||||
node.consecutive_poll_failures = 0;
|
||||
node.healthy = true;
|
||||
node.last_poll = Some(Utc::now());
|
||||
tracing::debug!(node = name, models = models.len(), "poll ok");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(node = name, error = %e, "failed to parse /models response");
|
||||
node.healthy = false;
|
||||
record_poll_failure(node);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -154,11 +172,11 @@ async fn poll_neuron(fleet: &CortexState, name: &str, endpoint: &str) {
|
||||
status = %resp.status(),
|
||||
"neuron returned non-success status"
|
||||
);
|
||||
node.healthy = false;
|
||||
record_poll_failure(node);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(node = name, error = %e, "failed to reach neuron");
|
||||
node.healthy = false;
|
||||
record_poll_failure(node);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -50,6 +50,10 @@ pub enum RouteError {
|
||||
"model '{model_id}' is in the catalogue but no healthy neuron's topology satisfies its constraints"
|
||||
)]
|
||||
NoFeasibleNeuron { model_id: String },
|
||||
#[error(
|
||||
"model '{model_id}' is feasible on a neuron that is currently unhealthy — retry shortly"
|
||||
)]
|
||||
FeasibleNodeUnhealthy { model_id: String },
|
||||
#[error("cold-load of '{model_id}' on '{node}' failed: {message}")]
|
||||
ColdLoadFailed {
|
||||
model_id: String,
|
||||
@@ -68,7 +72,9 @@ impl RouteError {
|
||||
/// safe to retry the same request); everything else is 404.
|
||||
pub fn http_status(&self) -> u16 {
|
||||
match self {
|
||||
RouteError::NoHealthyNodes | RouteError::ModelRecovering { .. } => 503,
|
||||
RouteError::NoHealthyNodes
|
||||
| RouteError::ModelRecovering { .. }
|
||||
| RouteError::FeasibleNodeUnhealthy { .. } => 503,
|
||||
_ => 404,
|
||||
}
|
||||
}
|
||||
@@ -81,7 +87,8 @@ impl RouteError {
|
||||
| RouteError::EndpointResolveFailed(_, _)
|
||||
| RouteError::NoFeasibleNeuron { .. }
|
||||
| RouteError::ColdLoadFailed { .. }
|
||||
| RouteError::ModelRecovering { .. } => "api_error",
|
||||
| RouteError::ModelRecovering { .. }
|
||||
| RouteError::FeasibleNodeUnhealthy { .. } => "api_error",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,6 +101,7 @@ impl RouteError {
|
||||
RouteError::NoFeasibleNeuron { .. } => "service_unavailable",
|
||||
RouteError::ColdLoadFailed { .. } => "service_unavailable",
|
||||
RouteError::ModelRecovering { .. } => "service_unavailable",
|
||||
RouteError::FeasibleNodeUnhealthy { .. } => "service_unavailable",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,6 +113,7 @@ impl RouteError {
|
||||
pub fn retry_after_secs(&self) -> Option<u64> {
|
||||
match self {
|
||||
RouteError::ModelRecovering { .. } => Some(2),
|
||||
RouteError::FeasibleNodeUnhealthy { .. } => Some(3),
|
||||
RouteError::NoHealthyNodes => Some(5),
|
||||
_ => None,
|
||||
}
|
||||
@@ -252,11 +261,32 @@ async fn pick_feasible_neuron(
|
||||
b.2.cmp(&a.2) // pinned first (true > false)
|
||||
.then(a.0.cmp(&b.0))
|
||||
});
|
||||
let pick = candidates.into_iter().next();
|
||||
pick.map(|(n, e, _)| (n, e))
|
||||
.ok_or_else(|| RouteError::NoFeasibleNeuron {
|
||||
if let Some((n, e, _)) = candidates.into_iter().next() {
|
||||
return Ok((n, e));
|
||||
}
|
||||
|
||||
// No *healthy* feasible neuron. Distinguish a transient outage from a
|
||||
// permanent misconfiguration: if some neuron is topologically feasible
|
||||
// but currently unhealthy (e.g. it briefly missed polls while busy),
|
||||
// this is retryable — return 503 + Retry-After so the client backs off
|
||||
// and retries instead of treating a 404 as a hard failure. Only when no
|
||||
// neuron could *ever* satisfy the topology is it a permanent 404.
|
||||
let feasible_but_unhealthy = nodes.values().any(|node| {
|
||||
!node.healthy
|
||||
&& node
|
||||
.discovery
|
||||
.as_ref()
|
||||
.is_some_and(|disc| profile.is_feasible_on(&node.name, &disc.devices))
|
||||
});
|
||||
if feasible_but_unhealthy {
|
||||
Err(RouteError::FeasibleNodeUnhealthy {
|
||||
model_id: profile.id.clone(),
|
||||
})
|
||||
} else {
|
||||
Err(RouteError::NoFeasibleNeuron {
|
||||
model_id: profile.id.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Issue `POST {endpoint}/models/load` for this profile on this neuron,
|
||||
|
||||
@@ -38,6 +38,7 @@ impl CortexState {
|
||||
discovery: None,
|
||||
activation: None,
|
||||
model_load: HashMap::new(),
|
||||
consecutive_poll_failures: 0,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
@@ -175,11 +175,33 @@ async fn missing_key_when_required_is_401_invalid_api_key() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn invalid_key_is_401_even_when_auth_not_required() {
|
||||
async fn unrecognized_key_is_ignored_when_auth_not_required() {
|
||||
let (neuron, seen) = spawn_capturing_neuron().await;
|
||||
// A present-but-wrong credential is always an error.
|
||||
// allow-anonymous mode: a placeholder/unknown bearer (as opencode,
|
||||
// Open WebUI, Agent Zero, litellm all send by default) must NOT be
|
||||
// rejected — it's ignored and the request is served anonymously.
|
||||
let gateway = spawn_gateway(&neuron, one_key_config(false)).await;
|
||||
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("{gateway}/v1/chat/completions"))
|
||||
.bearer_auth("sk-dummy-placeholder")
|
||||
.json(&chat_body())
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::OK);
|
||||
let _ = resp.bytes().await.unwrap();
|
||||
// Served, but anonymous — no principal stamped from the bogus key.
|
||||
assert!(seen.lock().unwrap().account_id.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn invalid_key_is_401_when_auth_required() {
|
||||
let (neuron, seen) = spawn_capturing_neuron().await;
|
||||
// With auth required, a present-but-wrong credential is rejected.
|
||||
let gateway = spawn_gateway(&neuron, one_key_config(true)).await;
|
||||
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("{gateway}/v1/chat/completions"))
|
||||
.bearer_auth("sk-wrong")
|
||||
|
||||
124
crates/cortex-gateway/tests/feasibility_routing.rs
Normal file
124
crates/cortex-gateway/tests/feasibility_routing.rs
Normal file
@@ -0,0 +1,124 @@
|
||||
//! Router: a catalogued model whose only topologically-feasible neuron is
|
||||
//! currently unhealthy is a *transient* condition (retryable 503), not a
|
||||
//! permanent 404. This is the exact shape of the beast incident: benjy/
|
||||
//! quadbrat (1 GPU, healthy) can't host the 27B, and beast (2 GPU) — the
|
||||
//! sole feasible node — briefly drops out → clients must back off and retry,
|
||||
//! not hard-fail.
|
||||
|
||||
use cortex_core::config::{
|
||||
EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NeuronEndpoint,
|
||||
};
|
||||
use cortex_core::discovery::{DeviceInfo, DiscoveryResponse};
|
||||
use cortex_gateway::router::{self, RouteError};
|
||||
use cortex_gateway::state::CortexState;
|
||||
use std::sync::Arc;
|
||||
|
||||
fn devices(n: usize) -> Vec<DeviceInfo> {
|
||||
(0..n)
|
||||
.map(|i| DeviceInfo {
|
||||
index: i as u32,
|
||||
name: "RTX 5090".into(),
|
||||
vram_total_mb: 32_768,
|
||||
compute_capability: "9.0".into(),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn discovery(host: &str, n_devices: usize) -> DiscoveryResponse {
|
||||
DiscoveryResponse {
|
||||
hostname: host.into(),
|
||||
os: "Linux".into(),
|
||||
kernel: "7.0".into(),
|
||||
cuda_version: Some("13.0".into()),
|
||||
driver_version: Some("999".into()),
|
||||
devices: devices(n_devices),
|
||||
harnesses: vec!["candle".into()],
|
||||
cuda_unavailable_reason: None,
|
||||
max_prompt_tokens: 49_152,
|
||||
}
|
||||
}
|
||||
|
||||
/// Catalogue with one model needing 2 devices. Returns a temp path.
|
||||
fn write_catalogue() -> std::path::PathBuf {
|
||||
let toml = r#"
|
||||
[[models]]
|
||||
id = "big-model"
|
||||
harness = "candle"
|
||||
min_devices = 2
|
||||
"#;
|
||||
let path = std::env::temp_dir().join("cortex_test_feasibility_models.toml");
|
||||
std::fs::write(&path, toml).unwrap();
|
||||
path
|
||||
}
|
||||
|
||||
async fn fleet_with(big_healthy: bool, big_devices: usize) -> Arc<CortexState> {
|
||||
let cat = write_catalogue();
|
||||
let config = GatewayConfig {
|
||||
gateway: GatewaySettings {
|
||||
listen: "127.0.0.1:0".into(),
|
||||
metrics_listen: "127.0.0.1:0".into(),
|
||||
},
|
||||
eviction: EvictionSettings {
|
||||
strategy: EvictionStrategy::Lru,
|
||||
defrag_after_cycles: 0,
|
||||
},
|
||||
neurons: vec![
|
||||
NeuronEndpoint {
|
||||
name: "small".into(),
|
||||
endpoint: "http://127.0.0.1:1".into(),
|
||||
},
|
||||
NeuronEndpoint {
|
||||
name: "big".into(),
|
||||
endpoint: "http://127.0.0.1:2".into(),
|
||||
},
|
||||
],
|
||||
models_config: cat.to_string_lossy().into_owned(),
|
||||
entitlements: Default::default(),
|
||||
};
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
{
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
// "small" is healthy but only has 1 GPU → not feasible for the model.
|
||||
let small = nodes.get_mut("small").unwrap();
|
||||
small.healthy = true;
|
||||
small.discovery = Some(discovery("small", 1));
|
||||
// "big" has enough GPUs but its health is the variable under test.
|
||||
let big = nodes.get_mut("big").unwrap();
|
||||
big.healthy = big_healthy;
|
||||
big.discovery = Some(discovery("big", big_devices));
|
||||
}
|
||||
fleet
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn feasible_node_unhealthy_is_transient_503() {
|
||||
// big (2 GPU, the only feasible node) is unhealthy; small (1 GPU) is
|
||||
// healthy but can't host the model → retryable, not a permanent 404.
|
||||
let fleet = fleet_with(false, 2).await;
|
||||
let err = router::resolve(&fleet, "big-model")
|
||||
.await
|
||||
.expect_err("model can't be served right now");
|
||||
assert!(
|
||||
matches!(err, RouteError::FeasibleNodeUnhealthy { .. }),
|
||||
"expected FeasibleNodeUnhealthy, got {err:?}"
|
||||
);
|
||||
assert_eq!(err.http_status(), 503);
|
||||
assert_eq!(err.retry_after_secs(), Some(3));
|
||||
assert_eq!(err.code(), "service_unavailable");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_node_can_ever_satisfy_is_permanent_404() {
|
||||
// big is healthy but only has 1 GPU now (e.g. topology genuinely can't
|
||||
// satisfy min_devices=2 anywhere) → permanent, non-retryable 404.
|
||||
let fleet = fleet_with(true, 1).await;
|
||||
let err = router::resolve(&fleet, "big-model")
|
||||
.await
|
||||
.expect_err("no feasible topology");
|
||||
assert!(
|
||||
matches!(err, RouteError::NoFeasibleNeuron { .. }),
|
||||
"expected NoFeasibleNeuron, got {err:?}"
|
||||
);
|
||||
assert_eq!(err.http_status(), 404);
|
||||
assert_eq!(err.retry_after_secs(), None);
|
||||
}
|
||||
@@ -228,10 +228,26 @@ async fn test_poller_marks_unreachable_node_unhealthy() {
|
||||
nodes.get_mut("dead-node").unwrap().healthy = true;
|
||||
}
|
||||
|
||||
// Debounce (#53 follow-up): a single missed poll must NOT evict a
|
||||
// previously-healthy node — a busy neuron briefly slow to answer
|
||||
// shouldn't yank its models out of routing.
|
||||
cortex_gateway::poller::poll_once(&fleet).await;
|
||||
assert!(
|
||||
fleet.nodes.read().await.get("dead-node").unwrap().healthy,
|
||||
"one failed poll should not mark a healthy node unhealthy"
|
||||
);
|
||||
|
||||
let nodes = fleet.nodes.read().await;
|
||||
assert!(!nodes.get("dead-node").unwrap().healthy);
|
||||
// It flips unhealthy only after POLL_FAILURE_THRESHOLD (3) consecutive
|
||||
// failures.
|
||||
cortex_gateway::poller::poll_once(&fleet).await;
|
||||
cortex_gateway::poller::poll_once(&fleet).await;
|
||||
assert!(
|
||||
!fleet.nodes.read().await.get("dead-node").unwrap().healthy,
|
||||
"three consecutive failed polls should mark the node unhealthy"
|
||||
);
|
||||
|
||||
// A subsequent successful poll would reset the counter and restore
|
||||
// health; covered implicitly by the discovery tests above.
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
174
crates/cortex-gateway/tests/prompt_prevalidation.rs
Normal file
174
crates/cortex-gateway/tests/prompt_prevalidation.rs
Normal file
@@ -0,0 +1,174 @@
|
||||
//! Fail-fast prompt pre-validation + advisory client hints (#56).
|
||||
//!
|
||||
//! cortex refuses a prompt that already exceeds the model's advertised
|
||||
//! context window before dispatching to neuron — the same #60
|
||||
//! `context_length_exceeded` envelope neuron would emit, just earlier — and
|
||||
//! attaches an advisory `X-Helexa-Advice` header for fingerprinted clients.
|
||||
|
||||
use axum::Json;
|
||||
use axum::extract::Path;
|
||||
use axum::routing::{get, post};
|
||||
use cortex_core::config::{
|
||||
EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NeuronEndpoint,
|
||||
};
|
||||
use cortex_core::harness::ModelLimit;
|
||||
use cortex_core::node::{ModelEntry, ModelStatus};
|
||||
use cortex_gateway::state::CortexState;
|
||||
use serde_json::{Value, json};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
/// Mock neuron with a hit counter, so a test can prove a request was (or
|
||||
/// wasn't) dispatched past the gateway's pre-validation.
|
||||
async fn spawn_counting_neuron() -> (String, Arc<AtomicU64>) {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let base_url = format!("http://{addr}");
|
||||
let inference_url = base_url.clone();
|
||||
let hits = Arc::new(AtomicU64::new(0));
|
||||
let sink = Arc::clone(&hits);
|
||||
let app = axum::Router::new()
|
||||
.route(
|
||||
"/models/{model_id}/endpoint",
|
||||
get(move |Path(_): Path<String>| {
|
||||
let url = inference_url.clone();
|
||||
async move { Json(json!({ "url": url })) }
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/v1/chat/completions",
|
||||
post(move || {
|
||||
let sink = Arc::clone(&sink);
|
||||
async move {
|
||||
sink.fetch_add(1, Ordering::SeqCst);
|
||||
Json(json!({
|
||||
"id": "c", "object": "chat.completion", "created": 1_700_000_000_u64,
|
||||
"model": "test-model",
|
||||
"choices": [{"index": 0, "message": {"role": "assistant", "content": "ok"}, "finish_reason": "stop"}],
|
||||
"usage": {"prompt_tokens": 3, "completion_tokens": 1, "total_tokens": 4}
|
||||
}))
|
||||
}
|
||||
}),
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
(base_url, hits)
|
||||
}
|
||||
|
||||
/// Gateway over one neuron with `test-model` loaded and a tiny advertised
|
||||
/// context window (so a modest prompt overflows it).
|
||||
async fn spawn_gateway(neuron: &str, context: usize) -> String {
|
||||
let config = GatewayConfig {
|
||||
gateway: GatewaySettings {
|
||||
listen: "127.0.0.1:0".into(),
|
||||
metrics_listen: "127.0.0.1:0".into(),
|
||||
},
|
||||
eviction: EvictionSettings {
|
||||
strategy: EvictionStrategy::Lru,
|
||||
defrag_after_cycles: 0,
|
||||
},
|
||||
neurons: vec![NeuronEndpoint {
|
||||
name: "mock-node".into(),
|
||||
endpoint: neuron.to_string(),
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
};
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
{
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
let n = nodes.get_mut("mock-node").unwrap();
|
||||
n.healthy = true;
|
||||
n.models.insert(
|
||||
"test-model".into(),
|
||||
ModelEntry {
|
||||
id: "test-model".into(),
|
||||
status: ModelStatus::Loaded,
|
||||
last_accessed: None,
|
||||
vram_estimate_mb: Some(8000),
|
||||
capabilities: Vec::new(),
|
||||
tool_call: false,
|
||||
reasoning: false,
|
||||
limit: Some(ModelLimit {
|
||||
context,
|
||||
input: None,
|
||||
output: 16,
|
||||
}),
|
||||
},
|
||||
);
|
||||
}
|
||||
let app = cortex_gateway::build_app(Arc::clone(&fleet));
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
format!("http://{addr}")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn over_long_prompt_is_rejected_before_dispatch() {
|
||||
let (neuron, hits) = spawn_counting_neuron().await;
|
||||
let gateway = spawn_gateway(&neuron, 50).await; // tiny 50-token window
|
||||
|
||||
// ~1200 chars → ~300 est tokens, well over 50.
|
||||
let big = "word ".repeat(240);
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("{gateway}/v1/chat/completions"))
|
||||
.header("user-agent", "litellm/1.0")
|
||||
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": big}]}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
|
||||
// Advisory hint for the fingerprinted client (header only, never body).
|
||||
assert!(
|
||||
resp.headers().get("x-helexa-advice").is_some(),
|
||||
"litellm should get advice"
|
||||
);
|
||||
let body: Value = resp.json().await.unwrap();
|
||||
assert_eq!(body["error"]["code"], "context_length_exceeded");
|
||||
assert_eq!(body["error"]["max"], 50);
|
||||
// Refused at the edge — neuron never saw it.
|
||||
assert_eq!(hits.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn within_context_passes_through() {
|
||||
let (neuron, hits) = spawn_counting_neuron().await;
|
||||
let gateway = spawn_gateway(&neuron, 4096).await;
|
||||
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("{gateway}/v1/chat/completions"))
|
||||
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::OK);
|
||||
let _ = resp.bytes().await.unwrap();
|
||||
assert_eq!(hits.load(Ordering::SeqCst), 1, "served by neuron");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unknown_client_gets_no_advice_header() {
|
||||
let (neuron, _hits) = spawn_counting_neuron().await;
|
||||
let gateway = spawn_gateway(&neuron, 50).await;
|
||||
|
||||
let big = "word ".repeat(240);
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("{gateway}/v1/chat/completions"))
|
||||
// no/unknown User-Agent → no advice, but still a clean 400
|
||||
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": big}]}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
|
||||
assert!(resp.headers().get("x-helexa-advice").is_none());
|
||||
let body: Value = resp.json().await.unwrap();
|
||||
assert_eq!(body["error"]["code"], "context_length_exceeded");
|
||||
}
|
||||
@@ -13,6 +13,7 @@ use axum::response::sse::{Event, KeepAlive, Sse};
|
||||
use axum::response::{IntoResponse, Json};
|
||||
use axum::routing::{get, post};
|
||||
use cortex_core::discovery::{DiscoveryResponse, HealthResponse};
|
||||
use cortex_core::entitlements::{HEADER_ACCOUNT_ID, HEADER_KEY_ID};
|
||||
use cortex_core::harness::ModelSpec;
|
||||
use cortex_core::openai::{ChatCompletionRequest, MessageContent};
|
||||
use cortex_core::responses::{ResponsesRequest, ResponsesUsage};
|
||||
@@ -234,6 +235,17 @@ fn default_enable_thinking(req: &mut ChatCompletionRequest, include_thinking: bo
|
||||
}
|
||||
}
|
||||
|
||||
/// The request's principal for fair-share admission (#54), reconstructed
|
||||
/// from the internal headers cortex stamps (#49). cortex strips any
|
||||
/// client-supplied copy and asserts the authoritative value, so over the
|
||||
/// trusted WireGuard link these are safe to key fair-share on. `None` for an
|
||||
/// unauthenticated/direct request — exempt from the per-principal cap.
|
||||
fn principal_key(headers: &axum::http::HeaderMap) -> Option<String> {
|
||||
let account = headers.get(HEADER_ACCOUNT_ID)?.to_str().ok()?;
|
||||
let key = headers.get(HEADER_KEY_ID)?.to_str().ok()?;
|
||||
Some(format!("{account}/{key}"))
|
||||
}
|
||||
|
||||
/// OpenAI-compatible chat completions. Dispatches to streaming SSE when
|
||||
/// `stream: true` is set on the request; otherwise returns a single
|
||||
/// `ChatCompletionResponse`.
|
||||
@@ -277,8 +289,14 @@ async fn chat_completions(
|
||||
// true`) keep reasoning on.
|
||||
default_enable_thinking(&mut req, include_thinking);
|
||||
|
||||
// Fair-share admission principal (#54), from cortex's stamped headers.
|
||||
let principal = principal_key(&headers);
|
||||
|
||||
if req.stream.unwrap_or(false) {
|
||||
match candle.chat_completion_stream_with(req, chat_config).await {
|
||||
match candle
|
||||
.chat_completion_stream_with(req, chat_config, principal)
|
||||
.await
|
||||
{
|
||||
Ok(rx) => {
|
||||
// Each chunk → one SSE `data: {json}` line. After the
|
||||
// channel closes, append the OpenAI [DONE] terminator.
|
||||
@@ -295,7 +313,7 @@ async fn chat_completions(
|
||||
Err(e) => inference_error_response(e),
|
||||
}
|
||||
} else {
|
||||
match candle.chat_completion(req).await {
|
||||
match candle.chat_completion(req, principal).await {
|
||||
Ok(resp) => Json(resp).into_response(),
|
||||
Err(e) => inference_error_response(e),
|
||||
}
|
||||
@@ -308,6 +326,7 @@ async fn chat_completions(
|
||||
/// event stream into the Responses event family.
|
||||
async fn responses(
|
||||
State(state): State<Arc<NeuronState>>,
|
||||
headers: axum::http::HeaderMap,
|
||||
Json(req): Json<ResponsesRequest>,
|
||||
) -> impl IntoResponse {
|
||||
let Some(candle) = state.candle.as_ref().map(Arc::clone) else {
|
||||
@@ -342,9 +361,12 @@ async fn responses(
|
||||
};
|
||||
chat_req.stream = Some(stream_requested);
|
||||
|
||||
// Fair-share admission principal (#54), from cortex's stamped headers.
|
||||
let principal = principal_key(&headers);
|
||||
|
||||
if stream_requested {
|
||||
match candle
|
||||
.responses_stream(chat_req, response_id, message_item_id)
|
||||
.responses_stream(chat_req, response_id, message_item_id, principal)
|
||||
.await
|
||||
{
|
||||
Ok(rx) => {
|
||||
@@ -368,7 +390,7 @@ async fn responses(
|
||||
// and translate the result. We don't currently re-tokenise
|
||||
// to compute usage; the harness returns it via the chat
|
||||
// response and we pass it through.
|
||||
match candle.chat_completion(chat_req).await {
|
||||
match candle.chat_completion(chat_req, principal).await {
|
||||
Ok(chat_resp) => {
|
||||
// Extract the assistant text (chat completions
|
||||
// always emits one choice on the candle path).
|
||||
@@ -492,8 +514,8 @@ fn inference_error_response(err: InferenceError) -> axum::response::Response {
|
||||
"template_render_failed",
|
||||
format!("chat template could not render this request: {detail}"),
|
||||
),
|
||||
// Admission control refused (#53): a fast, retryable "busy" signal.
|
||||
// 503 (service busy) + Retry-After; opencode/AI SDK back off.
|
||||
// Admission control refused on load (#53): a fast, retryable "busy"
|
||||
// signal. 503 (service busy) + Retry-After; opencode/AI SDK back off.
|
||||
InferenceError::Overloaded { retry_after_secs } => OpenAiError::new(
|
||||
503,
|
||||
"rate_limit_error",
|
||||
@@ -501,6 +523,15 @@ fn inference_error_response(err: InferenceError) -> axum::response::Response {
|
||||
"model is busy (admission queue full); retry shortly",
|
||||
)
|
||||
.with_retry_after(retry_after_secs),
|
||||
// Per-principal fair-share cap (#54): 429 rate_limit_exceeded +
|
||||
// Retry-After — the caller is sending too many concurrent requests.
|
||||
InferenceError::PerPrincipalLimit { retry_after_secs } => OpenAiError::new(
|
||||
429,
|
||||
"rate_limit_error",
|
||||
"rate_limit_exceeded",
|
||||
"too many concurrent requests for this key; retry shortly",
|
||||
)
|
||||
.with_retry_after(retry_after_secs),
|
||||
InferenceError::Other(e) => OpenAiError::without_code(500, "api_error", format!("{e:#}")),
|
||||
};
|
||||
envelope_response(env)
|
||||
|
||||
@@ -113,6 +113,13 @@ pub struct AdmissionConfig {
|
||||
/// honest signal).
|
||||
#[serde(default = "default_admission_max_wait_secs")]
|
||||
pub max_wait_secs: u64,
|
||||
/// Per-principal fair-share cap (#54): max in-flight + queued requests
|
||||
/// for any single principal (resolved from the `x-helexa-*` headers
|
||||
/// cortex stamps), so one client can't monopolize the queue while others
|
||||
/// wait. Over-cap → `429 rate_limit_exceeded` + `Retry-After`. `0`
|
||||
/// disables the cap; anonymous requests are always exempt.
|
||||
#[serde(default = "default_admission_max_per_principal")]
|
||||
pub max_per_principal: usize,
|
||||
}
|
||||
|
||||
impl Default for AdmissionConfig {
|
||||
@@ -121,6 +128,7 @@ impl Default for AdmissionConfig {
|
||||
max_in_flight: default_admission_max_in_flight(),
|
||||
max_queue_depth: default_admission_max_queue_depth(),
|
||||
max_wait_secs: default_admission_max_wait_secs(),
|
||||
max_per_principal: default_admission_max_per_principal(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -137,6 +145,10 @@ fn default_admission_max_wait_secs() -> u64 {
|
||||
30
|
||||
}
|
||||
|
||||
fn default_admission_max_per_principal() -> usize {
|
||||
2
|
||||
}
|
||||
|
||||
/// `[harness.candle.prefix_cache]` settings.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PrefixCacheConfig {
|
||||
|
||||
@@ -21,40 +21,56 @@
|
||||
//! `/health` can read live load without contending with inference.
|
||||
|
||||
use crate::config::AdmissionConfig;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
|
||||
/// Why admission was refused. Both map to the #63 backpressure envelope
|
||||
/// (`429`/`503` + `rate_limit_exceeded` + `Retry-After`); they differ only
|
||||
/// in cause, for logging.
|
||||
/// Why admission was refused. All map to the #63 backpressure envelope
|
||||
/// (`rate_limit_exceeded` + `Retry-After`); they differ in cause (and HTTP
|
||||
/// status — load → `503`, per-principal → `429`).
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum AdmissionRejection {
|
||||
/// The bounded wait queue was already full.
|
||||
/// The bounded wait queue was already full (server-side load).
|
||||
QueueFull { retry_after_secs: u64 },
|
||||
/// A queue slot was taken but the in-flight slot didn't free within
|
||||
/// `max_wait`.
|
||||
/// `max_wait` (server-side load).
|
||||
Timeout { retry_after_secs: u64 },
|
||||
/// This principal already has `max_per_principal` requests in flight or
|
||||
/// queued (#54 fair-share) — one principal can't monopolize the model.
|
||||
PrincipalCap { retry_after_secs: u64 },
|
||||
}
|
||||
|
||||
impl AdmissionRejection {
|
||||
pub fn retry_after_secs(&self) -> u64 {
|
||||
match self {
|
||||
AdmissionRejection::QueueFull { retry_after_secs }
|
||||
| AdmissionRejection::Timeout { retry_after_secs } => *retry_after_secs,
|
||||
| AdmissionRejection::Timeout { retry_after_secs }
|
||||
| AdmissionRejection::PrincipalCap { retry_after_secs } => *retry_after_secs,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Bounded batch-1 scheduler for one loaded model.
|
||||
/// Admission accounting, mutated under a brief lock (never held across an
|
||||
/// await). `pending` is queued + in-flight overall; `per_principal` is the
|
||||
/// same count keyed by principal for fair-share (#54).
|
||||
#[derive(Default, Debug)]
|
||||
struct AdmissionState {
|
||||
pending: usize,
|
||||
per_principal: HashMap<String, usize>,
|
||||
}
|
||||
|
||||
/// Bounded batch-1 scheduler for one loaded model, with per-principal
|
||||
/// fair-share.
|
||||
pub struct AdmissionController {
|
||||
/// In-flight slots — `max_in_flight` permits (1 for batch-1).
|
||||
slots: Arc<Semaphore>,
|
||||
/// Queued + in-flight count, for fast rejection and load reporting.
|
||||
pending: Arc<AtomicUsize>,
|
||||
/// `max_in_flight + max_queue_depth` — the rejection threshold.
|
||||
/// Queued + in-flight accounting (overall + per principal).
|
||||
state: Arc<Mutex<AdmissionState>>,
|
||||
/// `max_in_flight + max_queue_depth` — the overall rejection threshold.
|
||||
max_pending: usize,
|
||||
/// Max in-flight + queued for any single principal (#54). `0` disables.
|
||||
max_per_principal: usize,
|
||||
max_in_flight: usize,
|
||||
max_wait: Duration,
|
||||
}
|
||||
@@ -65,42 +81,69 @@ impl AdmissionController {
|
||||
let max_in_flight = cfg.max_in_flight.max(1);
|
||||
Self {
|
||||
slots: Arc::new(Semaphore::new(max_in_flight)),
|
||||
pending: Arc::new(AtomicUsize::new(0)),
|
||||
state: Arc::new(Mutex::new(AdmissionState::default())),
|
||||
max_pending: max_in_flight + cfg.max_queue_depth,
|
||||
max_per_principal: cfg.max_per_principal,
|
||||
max_in_flight,
|
||||
max_wait: Duration::from_secs(cfg.max_wait_secs),
|
||||
}
|
||||
}
|
||||
|
||||
/// Admit a request: reserve a queue slot (fast-rejecting if full), then
|
||||
/// wait up to `max_wait` for an in-flight slot. The returned permit must
|
||||
/// be held for the request's lifetime; dropping it frees both slots.
|
||||
pub async fn enter(&self) -> Result<AdmissionPermit, AdmissionRejection> {
|
||||
// Reserve a pending slot up front so concurrent callers can't all
|
||||
// slip past the threshold check. Roll back if we're over capacity.
|
||||
let prev = self.pending.fetch_add(1, Ordering::AcqRel);
|
||||
if prev >= self.max_pending {
|
||||
self.pending.fetch_sub(1, Ordering::AcqRel);
|
||||
/// Admit a request for `principal` (`None` = anonymous, exempt from the
|
||||
/// per-principal cap). Reserves a queue slot — fast-rejecting if the
|
||||
/// overall queue is full or the principal is over its fair-share cap —
|
||||
/// then waits up to `max_wait` for an in-flight slot. The returned permit
|
||||
/// must be held for the request's lifetime; dropping it frees the slots.
|
||||
pub async fn enter(
|
||||
&self,
|
||||
principal: Option<&str>,
|
||||
) -> Result<AdmissionPermit, AdmissionRejection> {
|
||||
// Decision + reservation under one brief lock so concurrent callers
|
||||
// can't both slip past the thresholds. No await is held here.
|
||||
{
|
||||
let mut st = self.state.lock().expect("admission state poisoned");
|
||||
if st.pending >= self.max_pending {
|
||||
return Err(AdmissionRejection::QueueFull {
|
||||
retry_after_secs: self.retry_hint(),
|
||||
retry_after_secs: self.retry_hint(st.pending),
|
||||
});
|
||||
}
|
||||
if let Some(p) = principal
|
||||
&& self.max_per_principal > 0
|
||||
&& st.per_principal.get(p).copied().unwrap_or(0) >= self.max_per_principal
|
||||
{
|
||||
return Err(AdmissionRejection::PrincipalCap {
|
||||
retry_after_secs: self.retry_hint(st.pending),
|
||||
});
|
||||
}
|
||||
st.pending += 1;
|
||||
if let Some(p) = principal {
|
||||
*st.per_principal.entry(p.to_string()).or_insert(0) += 1;
|
||||
}
|
||||
}
|
||||
|
||||
match tokio::time::timeout(self.max_wait, Arc::clone(&self.slots).acquire_owned()).await {
|
||||
Ok(Ok(permit)) => Ok(AdmissionPermit {
|
||||
_permit: permit,
|
||||
pending: Arc::clone(&self.pending),
|
||||
state: Arc::clone(&self.state),
|
||||
principal: principal.map(str::to_string),
|
||||
}),
|
||||
// Semaphore is never closed; treat a closed/elapsed wait the same.
|
||||
Ok(Err(_)) | Err(_) => {
|
||||
self.pending.fetch_sub(1, Ordering::AcqRel);
|
||||
self.release(principal);
|
||||
Err(AdmissionRejection::Timeout {
|
||||
retry_after_secs: self.retry_hint(),
|
||||
retry_after_secs: self.retry_hint(self.max_pending),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Roll back a reserved-but-not-admitted slot (wait timed out).
|
||||
fn release(&self, principal: Option<&str>) {
|
||||
let mut st = self.state.lock().expect("admission state poisoned");
|
||||
st.pending = st.pending.saturating_sub(1);
|
||||
decrement_principal(&mut st.per_principal, principal);
|
||||
}
|
||||
|
||||
/// Requests currently running (holding an in-flight slot).
|
||||
pub fn in_flight(&self) -> usize {
|
||||
self.max_in_flight
|
||||
@@ -109,29 +152,45 @@ impl AdmissionController {
|
||||
|
||||
/// Requests waiting for an in-flight slot.
|
||||
pub fn queue_depth(&self) -> usize {
|
||||
self.pending
|
||||
.load(Ordering::Acquire)
|
||||
.saturating_sub(self.in_flight())
|
||||
let pending = self.state.lock().expect("admission state poisoned").pending;
|
||||
pending.saturating_sub(self.in_flight())
|
||||
}
|
||||
|
||||
/// Rough `Retry-After`: scale with how backed-up the model is, clamped to
|
||||
/// a sane band. Without per-request timing this is a heuristic, but it
|
||||
/// gives well-behaved clients (opencode/AI SDK) a sensible backoff.
|
||||
fn retry_hint(&self) -> u64 {
|
||||
((self.queue_depth() as u64 + 1) * 2).clamp(1, 120)
|
||||
fn retry_hint(&self, pending: usize) -> u64 {
|
||||
let queued = pending.saturating_sub(self.max_in_flight) as u64;
|
||||
((queued + 1) * 2).clamp(1, 120)
|
||||
}
|
||||
}
|
||||
|
||||
/// Held for a request's lifetime; frees the in-flight + queue slot on drop.
|
||||
/// Decrement (and prune at zero) a principal's outstanding count.
|
||||
fn decrement_principal(map: &mut HashMap<String, usize>, principal: Option<&str>) {
|
||||
if let Some(p) = principal
|
||||
&& let Some(count) = map.get_mut(p)
|
||||
{
|
||||
*count -= 1;
|
||||
if *count == 0 {
|
||||
map.remove(p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Held for a request's lifetime; frees the in-flight + queue slot (and the
|
||||
/// principal's fair-share slot) on drop.
|
||||
#[derive(Debug)]
|
||||
pub struct AdmissionPermit {
|
||||
_permit: OwnedSemaphorePermit,
|
||||
pending: Arc<AtomicUsize>,
|
||||
state: Arc<Mutex<AdmissionState>>,
|
||||
principal: Option<String>,
|
||||
}
|
||||
|
||||
impl Drop for AdmissionPermit {
|
||||
fn drop(&mut self) {
|
||||
self.pending.fetch_sub(1, Ordering::AcqRel);
|
||||
let mut st = self.state.lock().expect("admission state poisoned");
|
||||
st.pending = st.pending.saturating_sub(1);
|
||||
decrement_principal(&mut st.per_principal, self.principal.as_deref());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,11 +198,14 @@ impl Drop for AdmissionPermit {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
/// Config with the per-principal cap disabled (0) — most tests exercise
|
||||
/// the overall queue with anonymous (`None`) callers.
|
||||
fn cfg(max_in_flight: usize, max_queue_depth: usize, max_wait_secs: u64) -> AdmissionConfig {
|
||||
AdmissionConfig {
|
||||
max_in_flight,
|
||||
max_queue_depth,
|
||||
max_wait_secs,
|
||||
max_per_principal: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,7 +213,7 @@ mod tests {
|
||||
async fn admits_up_to_in_flight_and_reports_load() {
|
||||
let ctrl = AdmissionController::new(&cfg(1, 4, 30));
|
||||
assert_eq!(ctrl.in_flight(), 0);
|
||||
let p = ctrl.enter().await.expect("first admits");
|
||||
let p = ctrl.enter(None).await.expect("first admits");
|
||||
assert_eq!(ctrl.in_flight(), 1);
|
||||
assert_eq!(ctrl.queue_depth(), 0);
|
||||
drop(p);
|
||||
@@ -162,17 +224,17 @@ mod tests {
|
||||
async fn rejects_when_queue_full() {
|
||||
// 1 in-flight + 1 queue slot = capacity 2; the 3rd is refused fast.
|
||||
let ctrl = Arc::new(AdmissionController::new(&cfg(1, 1, 30)));
|
||||
let _running = ctrl.enter().await.expect("admit running");
|
||||
let _running = ctrl.enter(None).await.expect("admit running");
|
||||
|
||||
// Fill the single queue slot with a waiter that parks on the semaphore.
|
||||
let ctrl2 = Arc::clone(&ctrl);
|
||||
let waiter = tokio::spawn(async move { ctrl2.enter().await.map(|p| drop(p)) });
|
||||
let waiter = tokio::spawn(async move { ctrl2.enter(None).await.map(|p| drop(p)) });
|
||||
// Give the waiter a moment to occupy the queue slot.
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
assert_eq!(ctrl.queue_depth(), 1);
|
||||
|
||||
// Queue full → immediate QueueFull with a Retry-After hint.
|
||||
match ctrl.enter().await {
|
||||
match ctrl.enter(None).await {
|
||||
Err(AdmissionRejection::QueueFull { retry_after_secs }) => {
|
||||
assert!(retry_after_secs >= 1)
|
||||
}
|
||||
@@ -190,13 +252,47 @@ mod tests {
|
||||
// request can't even queue, so it's QueueFull, not Timeout. Use a
|
||||
// queue of 1 and a tiny max_wait to exercise the timeout path.
|
||||
let ctrl = Arc::new(AdmissionController::new(&cfg(1, 1, 0)));
|
||||
let _running = ctrl.enter().await.expect("admit running");
|
||||
let _running = ctrl.enter(None).await.expect("admit running");
|
||||
// max_wait 0 → the queued request times out almost immediately.
|
||||
match ctrl.enter().await {
|
||||
match ctrl.enter(None).await {
|
||||
Err(AdmissionRejection::Timeout { .. }) => {}
|
||||
other => panic!("expected Timeout, got {other:?}"),
|
||||
}
|
||||
// The timed-out request released its queue slot.
|
||||
assert_eq!(ctrl.queue_depth(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn per_principal_cap_protects_other_principals() {
|
||||
// Generous overall queue, but each principal capped at 1 in-flight+
|
||||
// queued. Principal A holds the running slot; A's second request is
|
||||
// refused (PrincipalCap) rather than occupying the queue, so B's
|
||||
// single request still gets a queue slot and proceeds.
|
||||
let cfg = AdmissionConfig {
|
||||
max_in_flight: 1,
|
||||
max_queue_depth: 8,
|
||||
max_wait_secs: 30,
|
||||
max_per_principal: 1,
|
||||
};
|
||||
let ctrl = Arc::new(AdmissionController::new(&cfg));
|
||||
|
||||
let _a1 = ctrl.enter(Some("acct-a/key-a")).await.expect("A admits");
|
||||
|
||||
// A is over its fair-share cap → fast PrincipalCap, no queue slot taken.
|
||||
match ctrl.enter(Some("acct-a/key-a")).await {
|
||||
Err(AdmissionRejection::PrincipalCap { retry_after_secs }) => {
|
||||
assert!(retry_after_secs >= 1)
|
||||
}
|
||||
other => panic!("expected PrincipalCap, got {other:?}"),
|
||||
}
|
||||
|
||||
// B (a different principal) is admitted to the queue and proceeds
|
||||
// once A releases — it was never stuck behind A's backlog.
|
||||
let ctrl2 = Arc::clone(&ctrl);
|
||||
let b = tokio::spawn(async move { ctrl2.enter(Some("acct-b/key-b")).await.map(drop) });
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
assert_eq!(ctrl.queue_depth(), 1, "B is queued, not rejected");
|
||||
drop(_a1);
|
||||
b.await.unwrap().expect("B is served after A releases");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ use crate::wire::{
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
|
||||
#[cfg(feature = "cuda")]
|
||||
use std::time::Duration;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
@@ -205,23 +205,50 @@ impl LoadedHandle {
|
||||
/// `NEURON_MAX_PROMPT_TOKENS`, when explicitly set, is applied as a
|
||||
/// clamp-only upper bound on the derived `context` — a backstop, not
|
||||
/// the authority. Unset → no clamp; the derivation stands alone.
|
||||
pub async fn derived_limit(
|
||||
/// Refresh the cached free-VRAM reading used by [`Self::derived_limit`]
|
||||
/// (#53). Queries the device worker — so it MUST run off the request
|
||||
/// path (background refresher / load-time seed), never from a control
|
||||
/// endpoint, since the query queues behind inference on the worker.
|
||||
/// Single-GPU caches the device's free VRAM; TP caches the tightest
|
||||
/// free across ranks (the same value `derived_limit` used pre-cache).
|
||||
pub async fn refresh_free_mb(&self) {
|
||||
let free = match self {
|
||||
LoadedHandle::Single(m) => m.query_vram().await.0,
|
||||
#[cfg(feature = "cuda")]
|
||||
LoadedHandle::Tp(m) => m.query_vram_tightest_free_mb().await,
|
||||
};
|
||||
// Don't clobber a good cached value with a transient `0`
|
||||
// (worker gone/poisoned sentinel).
|
||||
if free > 0 {
|
||||
match self {
|
||||
LoadedHandle::Single(m) => m.last_free_mb.store(free, Ordering::Release),
|
||||
#[cfg(feature = "cuda")]
|
||||
LoadedHandle::Tp(m) => m.last_free_mb.store(free, Ordering::Release),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn derived_limit(
|
||||
&self,
|
||||
cfg: &crate::config::ContextLimitConfig,
|
||||
) -> Option<cortex_core::harness::ModelLimit> {
|
||||
if !cfg.enabled {
|
||||
return None;
|
||||
}
|
||||
// Read the *cached* free VRAM — never query the device worker here.
|
||||
// This runs on `GET /models`; a live query would queue behind
|
||||
// inference on the worker thread and stall the control plane (#53).
|
||||
// The cache is refreshed off the request path (load + background task).
|
||||
let (profile, free_mb, rate) = match self {
|
||||
LoadedHandle::Single(m) => (
|
||||
m.context_profile?,
|
||||
m.query_vram().await.0,
|
||||
m.last_free_mb.load(Ordering::Acquire),
|
||||
m.prefill_rate.get(),
|
||||
),
|
||||
#[cfg(feature = "cuda")]
|
||||
LoadedHandle::Tp(m) => (
|
||||
m.context_profile?,
|
||||
m.query_vram_tightest_free_mb().await,
|
||||
m.last_free_mb.load(Ordering::Acquire),
|
||||
m.prefill_rate.get(),
|
||||
),
|
||||
};
|
||||
@@ -391,6 +418,13 @@ pub struct LoadedModel {
|
||||
/// request-path enforcement reads this — `0` means "not derived yet"
|
||||
/// → fall back to the static `NEURON_MAX_PROMPT_TOKENS`.
|
||||
pub derived_input_cap: AtomicUsize,
|
||||
/// Cached free VRAM (MiB) for the control plane (#53). `derived_limit`
|
||||
/// (served by `GET /models`) reads this instead of querying the device
|
||||
/// worker, which during inference is saturated processing forward jobs —
|
||||
/// a live query would queue behind them and stall `/models`, tripping
|
||||
/// cortex's health poller into marking the node unhealthy. Refreshed off
|
||||
/// the request path: seeded at load, then by a background task.
|
||||
pub last_free_mb: AtomicU64,
|
||||
}
|
||||
|
||||
impl LoadedModel {
|
||||
@@ -503,6 +537,10 @@ pub struct TpLoadedModel {
|
||||
/// Mint for pool-wide snapshot ids. Plain counter; uniqueness only
|
||||
/// needs to hold per model lifetime (snapshots die with the model).
|
||||
pub next_snapshot_id: std::sync::atomic::AtomicU64,
|
||||
/// Cached tightest free VRAM (MiB) for the control plane (#53) — see
|
||||
/// [`LoadedModel::last_free_mb`]. Read by `derived_limit` so `GET /models`
|
||||
/// never fans a VRAM query out to the (inference-saturated) TP workers.
|
||||
pub last_free_mb: AtomicU64,
|
||||
}
|
||||
|
||||
#[cfg(feature = "cuda")]
|
||||
@@ -1109,6 +1147,32 @@ fn debug_poison_armed(model_id: &str) -> bool {
|
||||
armed && !FIRED.swap(true, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Background control-plane VRAM cache refresher (#53). Every few seconds,
|
||||
/// refreshes each loaded model's `last_free_mb` so `derived_limit` (served
|
||||
/// by `GET /models`) reads a cached value and never queries the device
|
||||
/// worker on the request path — a live query would queue behind inference
|
||||
/// forward jobs on the worker thread, stalling `/models` for seconds and
|
||||
/// tripping cortex's health poller into evicting the node from routing.
|
||||
/// Holds a `Weak` so a shutting-down harness lets the task exit. The query
|
||||
/// itself may queue behind inference, but that only delays this background
|
||||
/// refresh — no request-path caller is ever blocked.
|
||||
async fn vram_cache_refresh_loop(weak: std::sync::Weak<CandleHarness>) {
|
||||
const REFRESH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
|
||||
loop {
|
||||
tokio::time::sleep(REFRESH_INTERVAL).await;
|
||||
let Some(this) = weak.upgrade() else {
|
||||
return; // harness dropped — exit
|
||||
};
|
||||
// Snapshot handles, then release the read lock before awaiting the
|
||||
// (possibly slow) worker queries so we never hold it across an await.
|
||||
let handles: Vec<LoadedHandle> = this.models.read().await.values().cloned().collect();
|
||||
drop(this);
|
||||
for handle in handles {
|
||||
handle.refresh_free_mb().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Background auto-recovery task (#17). Drains poisoned model ids and
|
||||
/// rebuilds each via [`CandleHarness::recover_one`]. Holds a `Weak` so a
|
||||
/// shutting-down harness lets the task exit; processes one id at a time,
|
||||
@@ -1595,6 +1659,11 @@ impl CandleHarness {
|
||||
if tokio::runtime::Handle::try_current().is_ok() {
|
||||
let weak = Arc::downgrade(&this);
|
||||
tokio::spawn(recovery_loop(weak, recovery_rx));
|
||||
// Control-plane VRAM cache refresher (#53): keeps each loaded
|
||||
// model's `last_free_mb` current off the request path, so
|
||||
// `derived_limit` / `GET /models` never query the device worker
|
||||
// (which is saturated during inference) and never stall.
|
||||
tokio::spawn(vram_cache_refresh_loop(Arc::downgrade(&this)));
|
||||
}
|
||||
this
|
||||
}
|
||||
@@ -2028,6 +2097,7 @@ impl CandleHarness {
|
||||
pub async fn chat_completion(
|
||||
&self,
|
||||
request: ChatCompletionRequest,
|
||||
principal: Option<String>,
|
||||
) -> Result<ChatCompletionResponse, InferenceError> {
|
||||
let handle = {
|
||||
let models = self.models.read().await;
|
||||
@@ -2052,7 +2122,7 @@ impl CandleHarness {
|
||||
LoadedHandle::Single(m) => m,
|
||||
#[cfg(feature = "cuda")]
|
||||
LoadedHandle::Tp(m) => {
|
||||
return self.chat_completion_tp(m, request).await;
|
||||
return self.chat_completion_tp(m, request, principal).await;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -2086,7 +2156,7 @@ impl CandleHarness {
|
||||
// The permit is held for the whole request (released on drop).
|
||||
let _admit = loaded
|
||||
.admission
|
||||
.enter()
|
||||
.enter(principal.as_deref())
|
||||
.await
|
||||
.map_err(InferenceError::from)?;
|
||||
|
||||
@@ -2409,8 +2479,13 @@ impl CandleHarness {
|
||||
pub async fn chat_completion_stream(
|
||||
&self,
|
||||
request: ChatCompletionRequest,
|
||||
principal: Option<String>,
|
||||
) -> Result<mpsc::Receiver<ChatCompletionChunk>, InferenceError> {
|
||||
self.chat_completion_stream_with(request, wire_chat::ChatProjectionConfig::default())
|
||||
self.chat_completion_stream_with(
|
||||
request,
|
||||
wire_chat::ChatProjectionConfig::default(),
|
||||
principal,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -2422,8 +2497,9 @@ impl CandleHarness {
|
||||
&self,
|
||||
request: ChatCompletionRequest,
|
||||
mut config: wire_chat::ChatProjectionConfig,
|
||||
principal: Option<String>,
|
||||
) -> Result<mpsc::Receiver<ChatCompletionChunk>, InferenceError> {
|
||||
let stream = self.inference_stream(request).await?;
|
||||
let stream = self.inference_stream(request, principal).await?;
|
||||
// Fill in the model's reasoning markers if the caller
|
||||
// didn't pre-populate them — they're a property of the
|
||||
// loaded model (which the HTTP handler doesn't reach into
|
||||
@@ -2450,9 +2526,10 @@ impl CandleHarness {
|
||||
request: ChatCompletionRequest,
|
||||
response_id: String,
|
||||
message_item_id: String,
|
||||
principal: Option<String>,
|
||||
) -> Result<mpsc::Receiver<crate::wire::openai_responses::ResponseStreamFrame>, InferenceError>
|
||||
{
|
||||
let stream = self.inference_stream(request).await?;
|
||||
let stream = self.inference_stream(request, principal).await?;
|
||||
let meta = crate::wire::openai_responses::ResponseMeta {
|
||||
response_id,
|
||||
created_at: stream.created,
|
||||
@@ -2473,6 +2550,7 @@ impl CandleHarness {
|
||||
async fn inference_stream(
|
||||
&self,
|
||||
request: ChatCompletionRequest,
|
||||
principal: Option<String>,
|
||||
) -> Result<InferenceStream, InferenceError> {
|
||||
let handle = {
|
||||
let models = self.models.read().await;
|
||||
@@ -2497,7 +2575,7 @@ impl CandleHarness {
|
||||
LoadedHandle::Single(m) => m,
|
||||
#[cfg(feature = "cuda")]
|
||||
LoadedHandle::Tp(m) => {
|
||||
return self.inference_tp_stream(m, request).await;
|
||||
return self.inference_tp_stream(m, request, principal).await;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -2646,7 +2724,7 @@ impl CandleHarness {
|
||||
// into the inference task and is held until it completes.
|
||||
let admit = loaded
|
||||
.admission
|
||||
.enter()
|
||||
.enter(principal.as_deref())
|
||||
.await
|
||||
.map_err(InferenceError::from)?;
|
||||
|
||||
@@ -2950,7 +3028,7 @@ impl Harness for CandleHarness {
|
||||
// physics + live free VRAM + measured prefill rate. `None`
|
||||
// for arches without a context profile. `cost` stays
|
||||
// operator-set in the catalogue, filled by the gateway.
|
||||
let limit = h.derived_limit(&self.context_limit_cfg).await;
|
||||
let limit = h.derived_limit(&self.context_limit_cfg);
|
||||
out.push(ModelInfo {
|
||||
id: h.model_id().into(),
|
||||
harness: "candle".into(),
|
||||
@@ -3200,6 +3278,7 @@ impl Harness for CandleHarness {
|
||||
context_profile,
|
||||
prefill_rate: super::context_limit::PrefillRateEma::new(),
|
||||
derived_input_cap: AtomicUsize::new(0),
|
||||
last_free_mb: AtomicU64::new(0),
|
||||
});
|
||||
if loaded.prefix_cache.is_some() {
|
||||
tracing::info!(
|
||||
@@ -3210,6 +3289,14 @@ impl Harness for CandleHarness {
|
||||
);
|
||||
}
|
||||
|
||||
// Seed the control-plane VRAM cache (#53) while the worker is idle
|
||||
// (load just finished), so `/models` has a value before the
|
||||
// background refresher's first tick and never queries the worker.
|
||||
let (free_mb, _) = loaded.query_vram().await;
|
||||
if free_mb > 0 {
|
||||
loaded.last_free_mb.store(free_mb, Ordering::Release);
|
||||
}
|
||||
|
||||
let mut models = self.models.write().await;
|
||||
models.insert(spec.model_id.clone(), LoadedHandle::Single(loaded));
|
||||
tracing::info!(model = %spec.model_id, "model loaded");
|
||||
@@ -3460,6 +3547,7 @@ impl CandleHarness {
|
||||
),
|
||||
prefill_rate: super::context_limit::PrefillRateEma::new(),
|
||||
derived_input_cap: AtomicUsize::new(0),
|
||||
last_free_mb: AtomicU64::new(0),
|
||||
next_snapshot_id: std::sync::atomic::AtomicU64::new(1),
|
||||
});
|
||||
if tp_loaded.prefix_cache.is_some() {
|
||||
@@ -3471,6 +3559,14 @@ impl CandleHarness {
|
||||
);
|
||||
}
|
||||
|
||||
// Seed the control-plane VRAM cache (#53) — tightest free across
|
||||
// ranks, while the workers are idle post-load — so `/models` never
|
||||
// fans a query out to the inference-busy TP workers.
|
||||
let free_mb = tp_loaded.query_vram_tightest_free_mb().await;
|
||||
if free_mb > 0 {
|
||||
tp_loaded.last_free_mb.store(free_mb, Ordering::Release);
|
||||
}
|
||||
|
||||
let mut models = self.models.write().await;
|
||||
models.insert(spec.model_id.clone(), LoadedHandle::Tp(tp_loaded));
|
||||
tracing::info!(
|
||||
@@ -3500,6 +3596,7 @@ impl CandleHarness {
|
||||
&self,
|
||||
tp: Arc<TpLoadedModel>,
|
||||
request: ChatCompletionRequest,
|
||||
principal: Option<String>,
|
||||
) -> Result<ChatCompletionResponse, InferenceError> {
|
||||
// Tag every line of this request with a short req_id so a
|
||||
// grep over journalctl reconstructs one request even when
|
||||
@@ -3536,7 +3633,8 @@ impl CandleHarness {
|
||||
}
|
||||
|
||||
let tp_for_marker = Arc::clone(&tp);
|
||||
let handle = tokio::spawn(chat_completion_tp_inner(tp, request).instrument(span.clone()));
|
||||
let handle =
|
||||
tokio::spawn(chat_completion_tp_inner(tp, request, principal).instrument(span.clone()));
|
||||
match handle.await {
|
||||
Ok(Ok(resp)) => Ok(resp),
|
||||
Ok(Err(e)) => {
|
||||
@@ -3607,6 +3705,7 @@ impl CandleHarness {
|
||||
&self,
|
||||
tp: Arc<TpLoadedModel>,
|
||||
request: ChatCompletionRequest,
|
||||
principal: Option<String>,
|
||||
) -> Result<InferenceStream, InferenceError> {
|
||||
if tp.poisoned.load(Ordering::Acquire) {
|
||||
return Err(self.trigger_recovery(&request.model).await);
|
||||
@@ -3754,7 +3853,11 @@ impl CandleHarness {
|
||||
|
||||
// Admission control (#53): refuse before opening the stream; the
|
||||
// permit moves into the orchestration task and is held for its life.
|
||||
let admit = tp.admission.enter().await.map_err(InferenceError::from)?;
|
||||
let admit = tp
|
||||
.admission
|
||||
.enter(principal.as_deref())
|
||||
.await
|
||||
.map_err(InferenceError::from)?;
|
||||
|
||||
let tool_schemas = build_tool_schemas(&request);
|
||||
let tp_for_task = Arc::clone(&tp);
|
||||
@@ -4263,6 +4366,7 @@ impl CandleHarness {
|
||||
async fn chat_completion_tp_inner(
|
||||
tp: Arc<TpLoadedModel>,
|
||||
request: ChatCompletionRequest,
|
||||
principal: Option<String>,
|
||||
) -> Result<ChatCompletionResponse, InferenceError> {
|
||||
let req_start = std::time::Instant::now();
|
||||
let model_id = request.model.clone();
|
||||
@@ -4353,7 +4457,11 @@ async fn chat_completion_tp_inner(
|
||||
|
||||
// Admission control (#53): bounded queue + fast reject before joining
|
||||
// the pool-lock wait. Held for the whole request (released on drop).
|
||||
let _admit = tp.admission.enter().await.map_err(InferenceError::from)?;
|
||||
let _admit = tp
|
||||
.admission
|
||||
.enter(principal.as_deref())
|
||||
.await
|
||||
.map_err(InferenceError::from)?;
|
||||
|
||||
// Acquire the pool lock for the duration of the request. After
|
||||
// Phase 3 the leader's TpLeaderModel lives in the device worker
|
||||
@@ -4897,19 +5005,31 @@ pub enum InferenceError {
|
||||
/// failure mode that hid several client-compat bugs. Maps to 422.
|
||||
#[error("chat template could not render this request: {detail}")]
|
||||
TemplateRenderFailed { detail: String },
|
||||
/// Admission control (#53) refused the request: the model's bounded
|
||||
/// queue is full or the wait elapsed. Maps to `429 rate_limit_exceeded`
|
||||
/// + `Retry-After` — a fast, retryable "busy" signal, not a stall.
|
||||
/// Admission control (#53) refused on load: the model's bounded queue is
|
||||
/// full or the wait elapsed. Maps to `503 rate_limit_exceeded` +
|
||||
/// `Retry-After` — a fast, retryable "busy" signal, not a stall.
|
||||
#[error("model is busy; retry after {retry_after_secs}s")]
|
||||
Overloaded { retry_after_secs: u64 },
|
||||
/// Per-principal fair-share cap (#54) exceeded: this principal already
|
||||
/// has its max requests in flight/queued. Maps to `429
|
||||
/// rate_limit_exceeded` + `Retry-After`; a well-behaved client self-paces.
|
||||
#[error("per-principal in-flight limit reached; retry after {retry_after_secs}s")]
|
||||
PerPrincipalLimit { retry_after_secs: u64 },
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<super::admission::AdmissionRejection> for InferenceError {
|
||||
fn from(rejection: super::admission::AdmissionRejection) -> Self {
|
||||
InferenceError::Overloaded {
|
||||
retry_after_secs: rejection.retry_after_secs(),
|
||||
use super::admission::AdmissionRejection;
|
||||
match rejection {
|
||||
AdmissionRejection::QueueFull { retry_after_secs }
|
||||
| AdmissionRejection::Timeout { retry_after_secs } => {
|
||||
InferenceError::Overloaded { retry_after_secs }
|
||||
}
|
||||
AdmissionRejection::PrincipalCap { retry_after_secs } => {
|
||||
InferenceError::PerPrincipalLimit { retry_after_secs }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user