Compare commits
3 Commits
feat/47-ph
...
feat/47-ph
| Author | SHA1 | Date | |
|---|---|---|---|
|
dd31c3cd49
|
|||
|
a60c9f1075
|
|||
|
b2bd86bfa5
|
@@ -68,6 +68,57 @@ pub struct HealthResponse {
|
||||
pub devices: Vec<DeviceHealth>,
|
||||
#[serde(default)]
|
||||
pub activation: ActivationStatus,
|
||||
/// Per-model admission load (#53): how many requests are running vs.
|
||||
/// queued on each loaded model right now. Cortex's load-aware router
|
||||
/// (#55) reads this to spread traffic across replicas and to propagate
|
||||
/// honest backpressure. `#[serde(default)]` keeps older gateways/neurons
|
||||
/// interoperable (absent → empty → treated as no load info).
|
||||
#[serde(default)]
|
||||
pub models: Vec<ModelLoad>,
|
||||
}
|
||||
|
||||
/// Live admission load for one loaded model (#53).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ModelLoad {
|
||||
pub id: String,
|
||||
/// Requests currently running (batch-1 → 0 or 1).
|
||||
pub in_flight: usize,
|
||||
/// Requests waiting in the bounded admission queue.
|
||||
pub queue_depth: usize,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod health_load_tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn health_response_without_models_field_still_deserializes() {
|
||||
// A pre-#53 neuron's /health payload omits `models`; the gateway
|
||||
// must still parse it (serde default → empty).
|
||||
let json = r#"{"uptime_secs":42,"devices":[]}"#;
|
||||
let resp: HealthResponse = serde_json::from_str(json).expect("back-compat parse");
|
||||
assert_eq!(resp.uptime_secs, 42);
|
||||
assert!(resp.models.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn health_response_round_trips_model_load() {
|
||||
let resp = HealthResponse {
|
||||
uptime_secs: 1,
|
||||
devices: vec![],
|
||||
activation: ActivationStatus::default(),
|
||||
models: vec![ModelLoad {
|
||||
id: "Qwen/Qwen3.6-27B".into(),
|
||||
in_flight: 1,
|
||||
queue_depth: 3,
|
||||
}],
|
||||
};
|
||||
let s = serde_json::to_string(&resp).unwrap();
|
||||
let back: HealthResponse = serde_json::from_str(&s).unwrap();
|
||||
assert_eq!(back.models.len(), 1);
|
||||
assert_eq!(back.models[0].in_flight, 1);
|
||||
assert_eq!(back.models[0].queue_depth, 3);
|
||||
}
|
||||
}
|
||||
|
||||
/// High-level activation state of the neuron daemon. The HTTP listener
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::discovery::{ActivationStatus, DiscoveryResponse};
|
||||
use crate::discovery::{ActivationStatus, DiscoveryResponse, ModelLoad};
|
||||
use crate::harness::{ModelCost, ModelLimit};
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -27,6 +27,11 @@ pub struct NodeState {
|
||||
/// to synthesize `Loading` locations so clients see a catalogued
|
||||
/// model that's mid-prewarm as "loading", not "missing".
|
||||
pub activation: Option<ActivationStatus>,
|
||||
/// Last-seen per-model admission load from this neuron's `/health`
|
||||
/// (#53), keyed by model id. The router (#55) reads it to pick the
|
||||
/// least-busy replica when a model is loaded on more than one neuron.
|
||||
/// Empty until the first /health poll reports load.
|
||||
pub model_load: HashMap<String, ModelLoad>,
|
||||
}
|
||||
|
||||
/// A model registered on a node, with its runtime status.
|
||||
|
||||
@@ -200,6 +200,9 @@ async fn poll_health(fleet: &CortexState, name: &str, endpoint: &str) {
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
if let Some(node) = nodes.get_mut(name) {
|
||||
node.activation = Some(h.activation);
|
||||
// Per-model admission load (#53) → keyed by id for the
|
||||
// load-aware router (#55).
|
||||
node.model_load = h.models.into_iter().map(|m| (m.id.clone(), m)).collect();
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -132,7 +132,9 @@ pub async fn resolve(
|
||||
// Snapshot loaded / unloaded / recovering state from the poller cache.
|
||||
let (loaded_route, unloaded_route, recovering_node, any_healthy) = {
|
||||
let nodes = fleet.nodes.read().await;
|
||||
let mut loaded_route = None;
|
||||
// All healthy nodes with the model loaded, each with its current
|
||||
// admission load (#53) so we can pick the least-busy replica (#55).
|
||||
let mut loaded_candidates: Vec<(String, String, usize)> = Vec::new();
|
||||
let mut unloaded_route = None;
|
||||
let mut recovering_node = None;
|
||||
let mut any_healthy = false;
|
||||
@@ -144,8 +146,15 @@ pub async fn resolve(
|
||||
if let Some(entry) = node.models.get(model_id) {
|
||||
match entry.status {
|
||||
ModelStatus::Loaded | ModelStatus::Reloading => {
|
||||
loaded_route = Some((node.name.clone(), node.endpoint.clone(), false));
|
||||
break;
|
||||
// Least-busy score: in-flight + queued from the
|
||||
// neuron's last /health (#53). Unknown load (no poll
|
||||
// yet) scores 0 so the replica stays eligible.
|
||||
let score = node
|
||||
.model_load
|
||||
.get(model_id)
|
||||
.map(|l| l.in_flight + l.queue_depth)
|
||||
.unwrap_or(0);
|
||||
loaded_candidates.push((node.name.clone(), node.endpoint.clone(), score));
|
||||
}
|
||||
ModelStatus::Unloaded => {
|
||||
if unloaded_route.is_none() {
|
||||
@@ -175,6 +184,12 @@ pub async fn resolve(
|
||||
}
|
||||
}
|
||||
}
|
||||
// Pick the least-busy loaded replica; ties break by node name for
|
||||
// deterministic routing. `false` = not a cold start.
|
||||
let loaded_route = loaded_candidates
|
||||
.into_iter()
|
||||
.min_by(|a, b| a.2.cmp(&b.2).then_with(|| a.0.cmp(&b.0)))
|
||||
.map(|(name, endpoint, _score)| (name, endpoint, false));
|
||||
(loaded_route, unloaded_route, recovering_node, any_healthy)
|
||||
};
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ impl CortexState {
|
||||
last_poll: None,
|
||||
discovery: None,
|
||||
activation: None,
|
||||
model_load: HashMap::new(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
189
crates/cortex-gateway/tests/load_routing.rs
Normal file
189
crates/cortex-gateway/tests/load_routing.rs
Normal file
@@ -0,0 +1,189 @@
|
||||
//! Load-aware routing across replicas (#55).
|
||||
//!
|
||||
//! When a model is loaded on more than one healthy neuron, the router picks
|
||||
//! the least-busy replica using the per-model admission load each neuron
|
||||
//! reports on `GET /health` (#53), rather than always taking the first.
|
||||
|
||||
mod common;
|
||||
|
||||
use axum::Json;
|
||||
use axum::extract::Path;
|
||||
use axum::http::{StatusCode, header};
|
||||
use axum::response::IntoResponse;
|
||||
use axum::routing::{get, post};
|
||||
use cortex_core::config::{
|
||||
EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NeuronEndpoint,
|
||||
};
|
||||
use cortex_core::discovery::ModelLoad;
|
||||
use cortex_core::node::{ModelEntry, ModelStatus};
|
||||
use cortex_gateway::state::CortexState;
|
||||
use serde_json::{Value, json};
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
/// Seed a node as healthy with `test-model` loaded and a given admission load.
|
||||
async fn seed_loaded(fleet: &CortexState, node: &str, in_flight: usize, queue_depth: usize) {
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
let n = nodes.get_mut(node).expect("node exists");
|
||||
n.healthy = true;
|
||||
n.models.insert(
|
||||
"test-model".into(),
|
||||
ModelEntry {
|
||||
id: "test-model".into(),
|
||||
status: ModelStatus::Loaded,
|
||||
last_accessed: None,
|
||||
vram_estimate_mb: Some(8000),
|
||||
capabilities: Vec::new(),
|
||||
tool_call: false,
|
||||
reasoning: false,
|
||||
limit: None,
|
||||
},
|
||||
);
|
||||
n.model_load.insert(
|
||||
"test-model".into(),
|
||||
ModelLoad {
|
||||
id: "test-model".into(),
|
||||
in_flight,
|
||||
queue_depth,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/// Build a gateway state over two mock neurons (no poller; we seed state).
|
||||
async fn two_neuron_fleet(endpoint_a: &str, endpoint_b: &str) -> Arc<CortexState> {
|
||||
let config = GatewayConfig {
|
||||
gateway: GatewaySettings {
|
||||
listen: "127.0.0.1:0".into(),
|
||||
metrics_listen: "127.0.0.1:0".into(),
|
||||
},
|
||||
eviction: EvictionSettings {
|
||||
strategy: EvictionStrategy::Lru,
|
||||
defrag_after_cycles: 0,
|
||||
},
|
||||
neurons: vec![
|
||||
NeuronEndpoint {
|
||||
name: "node-a".into(),
|
||||
endpoint: endpoint_a.to_string(),
|
||||
},
|
||||
NeuronEndpoint {
|
||||
name: "node-b".into(),
|
||||
endpoint: endpoint_b.to_string(),
|
||||
},
|
||||
],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
};
|
||||
Arc::new(CortexState::from_config(&config))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn routes_to_least_busy_replica() {
|
||||
let neuron_a = common::spawn_mock_neuron().await;
|
||||
let neuron_b = common::spawn_mock_neuron().await;
|
||||
let fleet = two_neuron_fleet(&neuron_a, &neuron_b).await;
|
||||
|
||||
// A is busy (1 running + 3 queued), B is idle.
|
||||
seed_loaded(&fleet, "node-a", 1, 3).await;
|
||||
seed_loaded(&fleet, "node-b", 0, 0).await;
|
||||
|
||||
let route = cortex_gateway::router::resolve(&fleet, "test-model")
|
||||
.await
|
||||
.expect("model is loaded on both nodes");
|
||||
assert_eq!(route.node_name, "node-b", "should pick the idle replica");
|
||||
|
||||
// Flip the load: now B is the busy one.
|
||||
seed_loaded(&fleet, "node-a", 0, 0).await;
|
||||
seed_loaded(&fleet, "node-b", 1, 5).await;
|
||||
let route = cortex_gateway::router::resolve(&fleet, "test-model")
|
||||
.await
|
||||
.expect("still loaded");
|
||||
assert_eq!(route.node_name, "node-a", "should follow the lighter load");
|
||||
}
|
||||
|
||||
/// Mock neuron whose inference endpoint always returns a #63 backpressure
|
||||
/// envelope (503 + Retry-After) — simulating a saturated neuron.
|
||||
async fn spawn_busy_neuron() -> String {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let base_url = format!("http://{addr}");
|
||||
let inference_url = base_url.clone();
|
||||
let app = axum::Router::new()
|
||||
.route(
|
||||
"/models/{model_id}/endpoint",
|
||||
get(move |Path(_): Path<String>| {
|
||||
let url = inference_url.clone();
|
||||
async move { Json(json!({ "url": url })) }
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/v1/chat/completions",
|
||||
post(|| async {
|
||||
let body = json!({"error": {
|
||||
"message": "model is busy (admission queue full); retry shortly",
|
||||
"type": "rate_limit_error",
|
||||
"code": "rate_limit_exceeded",
|
||||
"param": null
|
||||
}});
|
||||
(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
[(header::RETRY_AFTER, "6")],
|
||||
Json(body),
|
||||
)
|
||||
.into_response()
|
||||
}),
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
base_url
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn neuron_backpressure_is_propagated_intact() {
|
||||
// A saturated neuron's 503 + Retry-After + envelope must reach the client
|
||||
// verbatim — not unwrapped, remapped, or stripped (#55 / #63).
|
||||
let neuron = spawn_busy_neuron().await;
|
||||
let fleet = two_neuron_fleet(&neuron, &neuron).await;
|
||||
seed_loaded(&fleet, "node-a", 1, 8).await;
|
||||
|
||||
let app = cortex_gateway::build_app(Arc::clone(&fleet));
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("http://{addr}/v1/chat/completions"))
|
||||
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE);
|
||||
assert_eq!(
|
||||
resp.headers()
|
||||
.get(reqwest::header::RETRY_AFTER)
|
||||
.and_then(|v| v.to_str().ok()),
|
||||
Some("6"),
|
||||
"Retry-After must survive the proxy"
|
||||
);
|
||||
let body: Value = resp.json().await.unwrap();
|
||||
assert_eq!(body["error"]["code"], "rate_limit_exceeded");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ties_break_deterministically_by_name() {
|
||||
let neuron_a = common::spawn_mock_neuron().await;
|
||||
let neuron_b = common::spawn_mock_neuron().await;
|
||||
let fleet = two_neuron_fleet(&neuron_a, &neuron_b).await;
|
||||
|
||||
// Equal load on both → stable pick (lowest node name).
|
||||
seed_loaded(&fleet, "node-a", 0, 0).await;
|
||||
seed_loaded(&fleet, "node-b", 0, 0).await;
|
||||
|
||||
let route = cortex_gateway::router::resolve(&fleet, "test-model")
|
||||
.await
|
||||
.expect("loaded");
|
||||
assert_eq!(route.node_name, "node-a", "ties break by name");
|
||||
}
|
||||
@@ -71,6 +71,12 @@ async fn health_handler(State(state): State<Arc<NeuronState>>) -> Json<HealthRes
|
||||
// know about activation lifecycle.
|
||||
let mut snapshot = state.health_cache.snapshot().await;
|
||||
snapshot.activation = state.activation.snapshot().await;
|
||||
// Per-model admission load (#53) — read live from the candle harness so
|
||||
// cortex's load-aware router (#55) can spread traffic and propagate
|
||||
// backpressure. Absent when no candle harness is present.
|
||||
if let Some(candle) = &state.candle {
|
||||
snapshot.models = candle.load_snapshot().await;
|
||||
}
|
||||
Json(snapshot)
|
||||
}
|
||||
|
||||
@@ -486,6 +492,15 @@ fn inference_error_response(err: InferenceError) -> axum::response::Response {
|
||||
"template_render_failed",
|
||||
format!("chat template could not render this request: {detail}"),
|
||||
),
|
||||
// Admission control refused (#53): a fast, retryable "busy" signal.
|
||||
// 503 (service busy) + Retry-After; opencode/AI SDK back off.
|
||||
InferenceError::Overloaded { retry_after_secs } => OpenAiError::new(
|
||||
503,
|
||||
"rate_limit_error",
|
||||
"rate_limit_exceeded",
|
||||
"model is busy (admission queue full); retry shortly",
|
||||
)
|
||||
.with_retry_after(retry_after_secs),
|
||||
InferenceError::Other(e) => OpenAiError::without_code(500, "api_error", format!("{e:#}")),
|
||||
};
|
||||
envelope_response(env)
|
||||
@@ -660,6 +675,26 @@ mod error_envelope_tests {
|
||||
assert_eq!(error["required_mb"], 8_192);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn overloaded_is_503_rate_limited_with_retry_after() {
|
||||
// Admission rejection (#53) → fast, retryable backpressure.
|
||||
let resp = inference_error_response(InferenceError::Overloaded {
|
||||
retry_after_secs: 7,
|
||||
});
|
||||
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
|
||||
let retry = resp
|
||||
.headers()
|
||||
.get(axum::http::header::RETRY_AFTER)
|
||||
.expect("admission rejection must advertise Retry-After");
|
||||
assert_eq!(retry.to_str().unwrap(), "7");
|
||||
|
||||
let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX)
|
||||
.await
|
||||
.unwrap();
|
||||
let body: Value = serde_json::from_slice(&bytes).unwrap();
|
||||
assert_eq!(body["error"]["code"], "rate_limit_exceeded");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insufficient_vram_carries_retry_after() {
|
||||
// Transient 503 — VRAM frees as in-flight requests finish, so the
|
||||
|
||||
@@ -85,6 +85,56 @@ pub struct CandleHarnessConfig {
|
||||
/// `/models`, and enforces it. These knobs tune that derivation.
|
||||
#[serde(default)]
|
||||
pub context_limit: ContextLimitConfig,
|
||||
|
||||
/// Admission control (#53): bounds the per-model wait queue so a busy
|
||||
/// model returns a fast, retryable `429`/`503` instead of stalling new
|
||||
/// requests until their client times out.
|
||||
#[serde(default)]
|
||||
pub admission: AdmissionConfig,
|
||||
}
|
||||
|
||||
/// `[harness.candle.admission]` settings (#53).
|
||||
///
|
||||
/// Inference is batch-1, so `max_in_flight` is 1 in practice; the queue
|
||||
/// (`max_queue_depth`) absorbs short bursts, and `max_wait_secs` caps how
|
||||
/// long a queued request waits before it's refused with backpressure.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AdmissionConfig {
|
||||
/// Concurrent running requests per model. Batch-1 inference → 1.
|
||||
#[serde(default = "default_admission_max_in_flight")]
|
||||
pub max_in_flight: usize,
|
||||
/// Queued (waiting) requests allowed beyond the in-flight one. The
|
||||
/// `(max_in_flight + max_queue_depth + 1)`-th request is refused
|
||||
/// immediately with `429`/`503` + `Retry-After`.
|
||||
#[serde(default = "default_admission_max_queue_depth")]
|
||||
pub max_queue_depth: usize,
|
||||
/// Maximum seconds a queued request waits for the in-flight slot before
|
||||
/// it is refused (turns the old ~300s client-side hang into a fast,
|
||||
/// honest signal).
|
||||
#[serde(default = "default_admission_max_wait_secs")]
|
||||
pub max_wait_secs: u64,
|
||||
}
|
||||
|
||||
impl Default for AdmissionConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_in_flight: default_admission_max_in_flight(),
|
||||
max_queue_depth: default_admission_max_queue_depth(),
|
||||
max_wait_secs: default_admission_max_wait_secs(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn default_admission_max_in_flight() -> usize {
|
||||
1
|
||||
}
|
||||
|
||||
fn default_admission_max_queue_depth() -> usize {
|
||||
8
|
||||
}
|
||||
|
||||
fn default_admission_max_wait_secs() -> u64 {
|
||||
30
|
||||
}
|
||||
|
||||
/// `[harness.candle.prefix_cache]` settings.
|
||||
|
||||
202
crates/neuron/src/harness/admission.rs
Normal file
202
crates/neuron/src/harness/admission.rs
Normal file
@@ -0,0 +1,202 @@
|
||||
//! Per-model admission control (#53).
|
||||
//!
|
||||
//! Inference against a loaded model is batch-1: one request runs at a time,
|
||||
//! serialized by the model's `inference_lock` (single-GPU) / `pool` mutex
|
||||
//! (TP). Before this, the wait for that lock was an **unbounded FIFO of
|
||||
//! mutex waiters with no timeout** — a busy model made every new request
|
||||
//! hang until its client gave up (~300s) with an opaque error.
|
||||
//!
|
||||
//! [`AdmissionController`] replaces that implicit unbounded wait with an
|
||||
//! explicit bounded scheduler: at most `max_in_flight` running (1, batch-1)
|
||||
//! plus a bounded queue of `max_queue_depth` waiters, each waiting at most
|
||||
//! `max_wait`. When the queue is full or the wait elapses, the request is
|
||||
//! rejected *immediately* — an honest, fast, retryable "busy" signal
|
||||
//! (`429`/`503` + `Retry-After` per #63) instead of a silent stall.
|
||||
//!
|
||||
//! The controller is pure async (no CUDA), so the inference paths just call
|
||||
//! [`AdmissionController::enter`] before taking the inference lock and hold
|
||||
//! the returned [`AdmissionPermit`] for the request's lifetime. Its counters
|
||||
//! ([`in_flight`](AdmissionController::in_flight) /
|
||||
//! [`queue_depth`](AdmissionController::queue_depth)) are lock-free, so
|
||||
//! `/health` can read live load without contending with inference.
|
||||
|
||||
use crate::config::AdmissionConfig;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
|
||||
/// Why admission was refused. Both map to the #63 backpressure envelope
|
||||
/// (`429`/`503` + `rate_limit_exceeded` + `Retry-After`); they differ only
|
||||
/// in cause, for logging.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum AdmissionRejection {
|
||||
/// The bounded wait queue was already full.
|
||||
QueueFull { retry_after_secs: u64 },
|
||||
/// A queue slot was taken but the in-flight slot didn't free within
|
||||
/// `max_wait`.
|
||||
Timeout { retry_after_secs: u64 },
|
||||
}
|
||||
|
||||
impl AdmissionRejection {
|
||||
pub fn retry_after_secs(&self) -> u64 {
|
||||
match self {
|
||||
AdmissionRejection::QueueFull { retry_after_secs }
|
||||
| AdmissionRejection::Timeout { retry_after_secs } => *retry_after_secs,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Bounded batch-1 scheduler for one loaded model.
|
||||
pub struct AdmissionController {
|
||||
/// In-flight slots — `max_in_flight` permits (1 for batch-1).
|
||||
slots: Arc<Semaphore>,
|
||||
/// Queued + in-flight count, for fast rejection and load reporting.
|
||||
pending: Arc<AtomicUsize>,
|
||||
/// `max_in_flight + max_queue_depth` — the rejection threshold.
|
||||
max_pending: usize,
|
||||
max_in_flight: usize,
|
||||
max_wait: Duration,
|
||||
}
|
||||
|
||||
impl AdmissionController {
|
||||
pub fn new(cfg: &AdmissionConfig) -> Self {
|
||||
// A controller with zero in-flight slots would deadlock; clamp.
|
||||
let max_in_flight = cfg.max_in_flight.max(1);
|
||||
Self {
|
||||
slots: Arc::new(Semaphore::new(max_in_flight)),
|
||||
pending: Arc::new(AtomicUsize::new(0)),
|
||||
max_pending: max_in_flight + cfg.max_queue_depth,
|
||||
max_in_flight,
|
||||
max_wait: Duration::from_secs(cfg.max_wait_secs),
|
||||
}
|
||||
}
|
||||
|
||||
/// Admit a request: reserve a queue slot (fast-rejecting if full), then
|
||||
/// wait up to `max_wait` for an in-flight slot. The returned permit must
|
||||
/// be held for the request's lifetime; dropping it frees both slots.
|
||||
pub async fn enter(&self) -> Result<AdmissionPermit, AdmissionRejection> {
|
||||
// Reserve a pending slot up front so concurrent callers can't all
|
||||
// slip past the threshold check. Roll back if we're over capacity.
|
||||
let prev = self.pending.fetch_add(1, Ordering::AcqRel);
|
||||
if prev >= self.max_pending {
|
||||
self.pending.fetch_sub(1, Ordering::AcqRel);
|
||||
return Err(AdmissionRejection::QueueFull {
|
||||
retry_after_secs: self.retry_hint(),
|
||||
});
|
||||
}
|
||||
|
||||
match tokio::time::timeout(self.max_wait, Arc::clone(&self.slots).acquire_owned()).await {
|
||||
Ok(Ok(permit)) => Ok(AdmissionPermit {
|
||||
_permit: permit,
|
||||
pending: Arc::clone(&self.pending),
|
||||
}),
|
||||
// Semaphore is never closed; treat a closed/elapsed wait the same.
|
||||
Ok(Err(_)) | Err(_) => {
|
||||
self.pending.fetch_sub(1, Ordering::AcqRel);
|
||||
Err(AdmissionRejection::Timeout {
|
||||
retry_after_secs: self.retry_hint(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests currently running (holding an in-flight slot).
|
||||
pub fn in_flight(&self) -> usize {
|
||||
self.max_in_flight
|
||||
.saturating_sub(self.slots.available_permits())
|
||||
}
|
||||
|
||||
/// Requests waiting for an in-flight slot.
|
||||
pub fn queue_depth(&self) -> usize {
|
||||
self.pending
|
||||
.load(Ordering::Acquire)
|
||||
.saturating_sub(self.in_flight())
|
||||
}
|
||||
|
||||
/// Rough `Retry-After`: scale with how backed-up the model is, clamped to
|
||||
/// a sane band. Without per-request timing this is a heuristic, but it
|
||||
/// gives well-behaved clients (opencode/AI SDK) a sensible backoff.
|
||||
fn retry_hint(&self) -> u64 {
|
||||
((self.queue_depth() as u64 + 1) * 2).clamp(1, 120)
|
||||
}
|
||||
}
|
||||
|
||||
/// Held for a request's lifetime; frees the in-flight + queue slot on drop.
|
||||
#[derive(Debug)]
|
||||
pub struct AdmissionPermit {
|
||||
_permit: OwnedSemaphorePermit,
|
||||
pending: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl Drop for AdmissionPermit {
|
||||
fn drop(&mut self) {
|
||||
self.pending.fetch_sub(1, Ordering::AcqRel);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn cfg(max_in_flight: usize, max_queue_depth: usize, max_wait_secs: u64) -> AdmissionConfig {
|
||||
AdmissionConfig {
|
||||
max_in_flight,
|
||||
max_queue_depth,
|
||||
max_wait_secs,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn admits_up_to_in_flight_and_reports_load() {
|
||||
let ctrl = AdmissionController::new(&cfg(1, 4, 30));
|
||||
assert_eq!(ctrl.in_flight(), 0);
|
||||
let p = ctrl.enter().await.expect("first admits");
|
||||
assert_eq!(ctrl.in_flight(), 1);
|
||||
assert_eq!(ctrl.queue_depth(), 0);
|
||||
drop(p);
|
||||
assert_eq!(ctrl.in_flight(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_when_queue_full() {
|
||||
// 1 in-flight + 1 queue slot = capacity 2; the 3rd is refused fast.
|
||||
let ctrl = Arc::new(AdmissionController::new(&cfg(1, 1, 30)));
|
||||
let _running = ctrl.enter().await.expect("admit running");
|
||||
|
||||
// Fill the single queue slot with a waiter that parks on the semaphore.
|
||||
let ctrl2 = Arc::clone(&ctrl);
|
||||
let waiter = tokio::spawn(async move { ctrl2.enter().await.map(|p| drop(p)) });
|
||||
// Give the waiter a moment to occupy the queue slot.
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
assert_eq!(ctrl.queue_depth(), 1);
|
||||
|
||||
// Queue full → immediate QueueFull with a Retry-After hint.
|
||||
match ctrl.enter().await {
|
||||
Err(AdmissionRejection::QueueFull { retry_after_secs }) => {
|
||||
assert!(retry_after_secs >= 1)
|
||||
}
|
||||
other => panic!("expected QueueFull, got {other:?}"),
|
||||
}
|
||||
|
||||
// Release the runner so the parked waiter can proceed and finish.
|
||||
drop(_running);
|
||||
waiter.await.unwrap().unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_on_wait_timeout() {
|
||||
// Zero queue depth + a runner holding the only slot → a second
|
||||
// request can't even queue, so it's QueueFull, not Timeout. Use a
|
||||
// queue of 1 and a tiny max_wait to exercise the timeout path.
|
||||
let ctrl = Arc::new(AdmissionController::new(&cfg(1, 1, 0)));
|
||||
let _running = ctrl.enter().await.expect("admit running");
|
||||
// max_wait 0 → the queued request times out almost immediately.
|
||||
match ctrl.enter().await {
|
||||
Err(AdmissionRejection::Timeout { .. }) => {}
|
||||
other => panic!("expected Timeout, got {other:?}"),
|
||||
}
|
||||
// The timed-out request released its queue slot.
|
||||
assert_eq!(ctrl.queue_depth(), 0);
|
||||
}
|
||||
}
|
||||
@@ -81,6 +81,9 @@ pub struct CandleHarness {
|
||||
/// Context-limit derivation settings (#67), read in `list_models`
|
||||
/// to compute each model's advertised `limit{context,input,output}`.
|
||||
context_limit_cfg: crate::config::ContextLimitConfig,
|
||||
/// Admission-control settings (#53), used to build each loaded model's
|
||||
/// [`super::admission::AdmissionController`] at load time.
|
||||
admission_cfg: crate::config::AdmissionConfig,
|
||||
}
|
||||
|
||||
/// Devices/capabilities snapshot of a model entering auto-recovery
|
||||
@@ -146,6 +149,16 @@ impl LoadedHandle {
|
||||
}
|
||||
}
|
||||
|
||||
/// Current admission load (#53): `(in_flight, queue_depth)`. Lock-free,
|
||||
/// so `/health` can read it without contending with inference.
|
||||
pub fn load(&self) -> (usize, usize) {
|
||||
match self {
|
||||
LoadedHandle::Single(m) => (m.admission.in_flight(), m.admission.queue_depth()),
|
||||
#[cfg(feature = "cuda")]
|
||||
LoadedHandle::Tp(m) => (m.admission.in_flight(), m.admission.queue_depth()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Modalities the loaded model supports. Stage B7 (single-GPU) +
|
||||
/// TP-vision (#12) — both single-GPU and TP loads advertise
|
||||
/// `"vision"` when a replicated vision tower materialised.
|
||||
@@ -305,6 +318,10 @@ pub struct LoadedModel {
|
||||
/// for the TP path (which already had this invariant by accident
|
||||
/// because the pool lock covered the same window).
|
||||
pub inference_lock: tokio::sync::Mutex<()>,
|
||||
/// Bounded admission scheduler (#53). Gated *before* `inference_lock`
|
||||
/// so a busy model refuses overflow fast instead of growing an
|
||||
/// unbounded, untimed queue of lock waiters.
|
||||
pub admission: super::admission::AdmissionController,
|
||||
/// Open/close token IDs for the reasoning marker this model
|
||||
/// emits, populated once at load time by probing the tokenizer's
|
||||
/// added-tokens table. `None` for non-reasoning models or
|
||||
@@ -422,6 +439,10 @@ pub struct TpLoadedModel {
|
||||
/// serialises subprocess RPC traffic on the pool's
|
||||
/// `Vec<Worker>` channels.
|
||||
pub pool: tokio::sync::Mutex<super::tp::WorkerPool>,
|
||||
/// Bounded admission scheduler (#53), mirroring the single-GPU path.
|
||||
/// Gated before the pool lock so an overloaded TP model returns fast
|
||||
/// backpressure instead of an unbounded, untimed wait.
|
||||
pub admission: super::admission::AdmissionController,
|
||||
/// Handle into the leader device worker's TP slab. The boxed
|
||||
/// `TpLeaderModel` (with its embedded `Arc<Comm>` clones and
|
||||
/// per-rank CUDA tensors) lives on the worker thread; we hold an
|
||||
@@ -1565,6 +1586,7 @@ impl CandleHarness {
|
||||
recovery_tx,
|
||||
prefix_cache_cfg: config.prefix_cache.clone(),
|
||||
context_limit_cfg: config.context_limit.clone(),
|
||||
admission_cfg: config.admission.clone(),
|
||||
});
|
||||
// Background auto-recovery task (#17). Holds a `Weak` so it can't
|
||||
// keep the harness alive. Spawned only when a tokio runtime is
|
||||
@@ -2059,6 +2081,15 @@ impl CandleHarness {
|
||||
return Err(self.trigger_recovery(&model_id).await);
|
||||
}
|
||||
|
||||
// Admission control (#53): refuse fast if the bounded queue is full
|
||||
// or the wait elapses, rather than joining an unbounded lock-wait.
|
||||
// The permit is held for the whole request (released on drop).
|
||||
let _admit = loaded
|
||||
.admission
|
||||
.enter()
|
||||
.await
|
||||
.map_err(InferenceError::from)?;
|
||||
|
||||
// Serialise concurrent requests against this model. Holds for
|
||||
// the duration of clear_kv_cache → prefill → decode so two
|
||||
// requests' chunked-prefill sequences can't interleave on the
|
||||
@@ -2610,6 +2641,15 @@ impl CandleHarness {
|
||||
// role chunk was already sent above, so the client sees
|
||||
// immediate "stream open" feedback even when this request
|
||||
// queues behind another for the lock.
|
||||
// Admission control (#53): refuse before opening the stream if the
|
||||
// model's bounded queue is full / the wait elapses. The permit moves
|
||||
// into the inference task and is held until it completes.
|
||||
let admit = loaded
|
||||
.admission
|
||||
.enter()
|
||||
.await
|
||||
.map_err(InferenceError::from)?;
|
||||
|
||||
let tool_schemas = build_tool_schemas(&request);
|
||||
if let (Some(worker), Some(handle)) = (loaded.worker.clone(), loaded.arch_handle) {
|
||||
#[cfg(feature = "cuda")]
|
||||
@@ -2620,6 +2660,7 @@ impl CandleHarness {
|
||||
let tool_schemas_inner = tool_schemas.clone();
|
||||
tokio::spawn(
|
||||
async move {
|
||||
let _admit = admit;
|
||||
let _inference_guard = loaded_for_task.inference_lock.lock().await;
|
||||
match stream_inference_via_worker(
|
||||
worker,
|
||||
@@ -2680,6 +2721,7 @@ impl CandleHarness {
|
||||
let tool_call_tokens_inner = loaded.tool_call_tokens.clone();
|
||||
let tool_schemas_inner = tool_schemas.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _admit = admit;
|
||||
let _g = span_for_task.enter();
|
||||
// `blocking_lock` is safe here: spawn_blocking runs on
|
||||
// a dedicated thread, not on the async runtime, so
|
||||
@@ -2779,6 +2821,24 @@ pub struct InferenceStream {
|
||||
/// Auto-recovery (#17) — rebuild a poisoned model's device context
|
||||
/// automatically instead of leaving it bricked until a human reloads.
|
||||
impl CandleHarness {
|
||||
/// Per-model admission load for `GET /health` (#53): in-flight + queued
|
||||
/// counts for every resident model. Lock-free per-model reads, so this
|
||||
/// only briefly holds the registry read lock to enumerate handles.
|
||||
pub async fn load_snapshot(&self) -> Vec<cortex_core::discovery::ModelLoad> {
|
||||
let models = self.models.read().await;
|
||||
models
|
||||
.values()
|
||||
.map(|handle| {
|
||||
let (in_flight, queue_depth) = handle.load();
|
||||
cortex_core::discovery::ModelLoad {
|
||||
id: handle.model_id().to_string(),
|
||||
in_flight,
|
||||
queue_depth,
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// True while `model_id` is being auto-recovered (its slot is briefly
|
||||
/// absent from the registry during the reload).
|
||||
pub async fn is_recovering(&self, model_id: &str) -> bool {
|
||||
@@ -3128,6 +3188,7 @@ impl Harness for CandleHarness {
|
||||
worker,
|
||||
arch_handle,
|
||||
inference_lock: tokio::sync::Mutex::new(()),
|
||||
admission: super::admission::AdmissionController::new(&self.admission_cfg),
|
||||
reasoning_tokens,
|
||||
tool_call_tokens,
|
||||
chat_template,
|
||||
@@ -3372,6 +3433,7 @@ impl CandleHarness {
|
||||
tokenizer,
|
||||
devices: devices.clone(),
|
||||
pool: TMutex::new(pool),
|
||||
admission: super::admission::AdmissionController::new(&self.admission_cfg),
|
||||
leader_handle,
|
||||
leader_device: leader_device.clone(),
|
||||
poisoned: AtomicBool::new(false),
|
||||
@@ -3690,10 +3752,15 @@ impl CandleHarness {
|
||||
validate_vision_prefill(prompt_len, vram_free_mb)?;
|
||||
}
|
||||
|
||||
// Admission control (#53): refuse before opening the stream; the
|
||||
// permit moves into the orchestration task and is held for its life.
|
||||
let admit = tp.admission.enter().await.map_err(InferenceError::from)?;
|
||||
|
||||
let tool_schemas = build_tool_schemas(&request);
|
||||
let tp_for_task = Arc::clone(&tp);
|
||||
tokio::spawn(
|
||||
async move {
|
||||
let _admit = admit;
|
||||
let mut failure: Option<String> = None;
|
||||
let mut pool = acquire_pool_lock(&tp_for_task.pool, &model_id).await;
|
||||
let leader_handle = tp_for_task.leader_handle;
|
||||
@@ -4284,6 +4351,10 @@ async fn chat_completion_tp_inner(
|
||||
validate_vision_prefill(prompt_len, vram_free_mb)?;
|
||||
}
|
||||
|
||||
// Admission control (#53): bounded queue + fast reject before joining
|
||||
// the pool-lock wait. Held for the whole request (released on drop).
|
||||
let _admit = tp.admission.enter().await.map_err(InferenceError::from)?;
|
||||
|
||||
// Acquire the pool lock for the duration of the request. After
|
||||
// Phase 3 the leader's TpLeaderModel lives in the device worker
|
||||
// thread, so the pool lock now serialises only subprocess RPC
|
||||
@@ -4826,10 +4897,23 @@ pub enum InferenceError {
|
||||
/// failure mode that hid several client-compat bugs. Maps to 422.
|
||||
#[error("chat template could not render this request: {detail}")]
|
||||
TemplateRenderFailed { detail: String },
|
||||
/// Admission control (#53) refused the request: the model's bounded
|
||||
/// queue is full or the wait elapsed. Maps to `429 rate_limit_exceeded`
|
||||
/// + `Retry-After` — a fast, retryable "busy" signal, not a stall.
|
||||
#[error("model is busy; retry after {retry_after_secs}s")]
|
||||
Overloaded { retry_after_secs: u64 },
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<super::admission::AdmissionRejection> for InferenceError {
|
||||
fn from(rejection: super::admission::AdmissionRejection) -> Self {
|
||||
InferenceError::Overloaded {
|
||||
retry_after_secs: rejection.retry_after_secs(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the model's prompt from a [`ChatCompletionRequest`].
|
||||
///
|
||||
/// Prefers the model's own `chat_template` when one was loaded
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
//! Harness registry — maps harness names to trait implementations.
|
||||
|
||||
pub mod admission;
|
||||
pub mod arch;
|
||||
pub mod candle;
|
||||
pub mod chat_template;
|
||||
|
||||
@@ -30,6 +30,9 @@ impl HealthCache {
|
||||
// direct read from the cache stays a well-typed
|
||||
// HealthResponse on the wire.
|
||||
activation: Default::default(),
|
||||
// Per-model admission load is overlaid by the api handler
|
||||
// from the candle harness (#53); the cache doesn't own it.
|
||||
models: Vec::new(),
|
||||
}),
|
||||
has_gpus: RwLock::new(false),
|
||||
}
|
||||
|
||||
@@ -114,6 +114,12 @@ async fn test_health_endpoint() {
|
||||
|
||||
let body: serde_json::Value = resp.json().await.unwrap();
|
||||
assert_eq!(body["uptime_secs"], 0);
|
||||
// Per-model admission load (#53) is always present, even with no models
|
||||
// loaded (empty array) — cortex's load-aware router (#55) relies on it.
|
||||
assert!(
|
||||
body["models"].is_array(),
|
||||
"/health must expose a models load array"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
Reference in New Issue
Block a user