Compare commits

..

3 Commits

Author SHA1 Message Date
c83f1eb98c feat(#47 #54 phase 2c): neuron per-principal in-flight cap (fair-share)
Some checks failed
CI / Format (push) Successful in 37s
CI / CUDA type-check (push) Successful in 1m37s
CI / Clippy (push) Successful in 2m13s
CI / Test (push) Successful in 4m50s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Test (push) Blocked by required conditions
build-prerelease / Build neuron-ampere (push) Blocked by required conditions
build-prerelease / Build neuron-ada (push) Blocked by required conditions
build-prerelease / Resolve version stamps + change detection (push) Successful in 37s
build-prerelease / Build neuron-blackwell (push) Successful in 1m28s
build-prerelease / Lint (fmt + clippy) (push) Successful in 3m0s
build-prerelease / Build cortex binary (push) Has been skipped
build-prerelease / Build helexa-bench binary (push) Has been skipped
build-prerelease / Package cortex RPM (push) Has been skipped
build-prerelease / Package helexa-bench RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-ada RPM (push) Has been cancelled
build-prerelease / Package helexa-neuron-ampere RPM (push) Has been cancelled
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been cancelled
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Has been cancelled
Budget caps total spend over time (#52); this caps instantaneous
starvation so one principal's burst can't monopolize a model while others
wait.

- AdmissionController gains per-principal accounting (moved from a lone
  atomic to a Mutex<AdmissionState> holding the overall pending count + a
  per-principal map). enter(principal) now also fast-rejects when a
  principal already has max_per_principal requests in flight/queued →
  AdmissionRejection::PrincipalCap. Anonymous (None) requests are exempt.
- Config [harness.candle.admission].max_per_principal (default 2 = one
  running + one queued; 0 disables). A bursting principal's overflow is
  refused while a different principal still gets a queue slot.
- The principal (account/key) is reconstructed on the neuron side from the
  x-helexa-account-id/key-id headers cortex stamps (#49) — trusted over
  WireGuard, never from the request body — and threaded explicitly through
  all inference entry points (chat_completion, *_stream(_with),
  responses_stream, and the TP variants) to the admission gate.
- InferenceError::PerPrincipalLimit → 429 rate_limit_exceeded + Retry-After
  (distinct from load-shedding's 503 Overloaded); opencode/AI SDK self-pace.

Tests: fair-share unit test (A floods → A's 2nd is PrincipalCap, B still
queues + is served) + the existing admission tests adapted to enter(None).
Non-CUDA build green locally; TP entry points (cuda-gated) validated by CI.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 20:40:25 +03:00
a60c9f1075 feat(#47 #53 phase 2b): expose per-model admission load in GET /health
All checks were successful
CI / Format (push) Successful in 30s
CI / CUDA type-check (push) Successful in 1m30s
CI / Clippy (push) Successful in 2m18s
CI / Test (push) Successful in 4m17s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Resolve version stamps + change detection (push) Successful in 33s
build-prerelease / Build neuron-blackwell (push) Successful in 1m42s
build-prerelease / Build neuron-ampere (push) Successful in 2m18s
build-prerelease / Build neuron-ada (push) Successful in 2m19s
build-prerelease / Build helexa-bench binary (push) Successful in 2m18s
build-prerelease / Lint (fmt + clippy) (push) Successful in 2m27s
build-prerelease / Build cortex binary (push) Successful in 2m45s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 2m2s
build-prerelease / Test (push) Successful in 4m50s
build-prerelease / Package helexa-bench RPM (push) Successful in 1m18s
build-prerelease / Package cortex RPM (push) Successful in 1m22s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 1m37s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 1m43s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 56s
Completes #53: the bounded scheduler's lock-free counters are now visible
to the fleet, which is what cortex's load-aware router (#55) consumes to
spread traffic across replicas and propagate honest backpressure.

- cortex-core::discovery: HealthResponse gains `models: Vec<ModelLoad>`
  (#[serde(default)] — back-compatible; older gateways/neurons interop).
  ModelLoad { id, in_flight, queue_depth }.
- LoadedHandle::load() → (in_flight, queue_depth), lock-free for both
  single-GPU and TP; CandleHarness::load_snapshot() enumerates resident
  models; the /health handler overlays it from the candle harness.

Tests: /health always exposes a models array (api integration test); a
pre-#53 payload without `models` still deserializes, and ModelLoad
round-trips (cortex-core serde tests). Local fmt/clippy/test green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 20:13:07 +03:00
b2bd86bfa5 feat(#47 #53 phase 2a): neuron admission control — bounded queue + backpressure
All checks were successful
CI / Format (push) Successful in 41s
CI / CUDA type-check (push) Successful in 1m40s
CI / Clippy (push) Successful in 2m18s
CI / Test (push) Successful in 4m53s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Resolve version stamps + change detection (push) Successful in 32s
build-prerelease / Build cortex binary (push) Has been skipped
build-prerelease / Build helexa-bench binary (push) Has been skipped
build-prerelease / Package cortex RPM (push) Has been skipped
build-prerelease / Package helexa-bench RPM (push) Has been skipped
build-prerelease / Build neuron-blackwell (push) Successful in 1m43s
build-prerelease / Build neuron-ampere (push) Successful in 2m18s
build-prerelease / Build neuron-ada (push) Successful in 2m19s
build-prerelease / Lint (fmt + clippy) (push) Successful in 2m29s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 1m46s
build-prerelease / Test (push) Successful in 4m48s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 1m49s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 1m53s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 1m7s
Replaces the per-model unbounded, untimed FIFO of inference-lock waiters
(a busy model made new requests hang ~300s until the client gave up with
an opaque error) with an explicit bounded scheduler.

- harness::admission::AdmissionController: batch-1 scheduler — max_in_flight
  running (1) + a bounded queue (max_queue_depth) with a max_wait. enter()
  fast-rejects when the queue is full (QueueFull) or the wait elapses
  (Timeout); the returned AdmissionPermit is held for the request and frees
  both slots on drop. Pure async (no CUDA), lock-free in_flight/queue_depth
  counters for future /health reporting. Configurable via
  [harness.candle.admission] (max_in_flight=1, max_queue_depth=8,
  max_wait_secs=30).
- Gated at all four inference entry points before the inference_lock/pool
  lock: single-GPU non-streaming + streaming, TP non-streaming + streaming.
  The streaming paths acquire the permit before opening the SSE (so a
  rejection is a clean error, not a half-open stream) and move it into the
  inference task.
- InferenceError::Overloaded { retry_after_secs } → 503 rate_limit_exceeded
  + Retry-After via the #60/#63 envelope: a fast, retryable "busy" signal
  opencode/AI SDK back off on, not a stall.

Scope: this branch is the admission *core* (the hang→backpressure fix).
Exposing in_flight/queue_depth in GET /health (consumed by cortex
load-aware routing #55) is the next focused branch under #53.

4 unit tests (admit/report load, queue-full reject, wait-timeout reject)
+ Overloaded envelope mapping test. Non-CUDA build green locally; the
CUDA + TP sites are validated by branch CI.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 20:03:07 +03:00
8 changed files with 615 additions and 11 deletions

View File

@@ -68,6 +68,57 @@ pub struct HealthResponse {
pub devices: Vec<DeviceHealth>,
#[serde(default)]
pub activation: ActivationStatus,
/// Per-model admission load (#53): how many requests are running vs.
/// queued on each loaded model right now. Cortex's load-aware router
/// (#55) reads this to spread traffic across replicas and to propagate
/// honest backpressure. `#[serde(default)]` keeps older gateways/neurons
/// interoperable (absent → empty → treated as no load info).
#[serde(default)]
pub models: Vec<ModelLoad>,
}
/// Live admission load for one loaded model (#53).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelLoad {
pub id: String,
/// Requests currently running (batch-1 → 0 or 1).
pub in_flight: usize,
/// Requests waiting in the bounded admission queue.
pub queue_depth: usize,
}
#[cfg(test)]
mod health_load_tests {
use super::*;
#[test]
fn health_response_without_models_field_still_deserializes() {
// A pre-#53 neuron's /health payload omits `models`; the gateway
// must still parse it (serde default → empty).
let json = r#"{"uptime_secs":42,"devices":[]}"#;
let resp: HealthResponse = serde_json::from_str(json).expect("back-compat parse");
assert_eq!(resp.uptime_secs, 42);
assert!(resp.models.is_empty());
}
#[test]
fn health_response_round_trips_model_load() {
let resp = HealthResponse {
uptime_secs: 1,
devices: vec![],
activation: ActivationStatus::default(),
models: vec![ModelLoad {
id: "Qwen/Qwen3.6-27B".into(),
in_flight: 1,
queue_depth: 3,
}],
};
let s = serde_json::to_string(&resp).unwrap();
let back: HealthResponse = serde_json::from_str(&s).unwrap();
assert_eq!(back.models.len(), 1);
assert_eq!(back.models[0].in_flight, 1);
assert_eq!(back.models[0].queue_depth, 3);
}
}
/// High-level activation state of the neuron daemon. The HTTP listener

View File

@@ -13,6 +13,7 @@ use axum::response::sse::{Event, KeepAlive, Sse};
use axum::response::{IntoResponse, Json};
use axum::routing::{get, post};
use cortex_core::discovery::{DiscoveryResponse, HealthResponse};
use cortex_core::entitlements::{HEADER_ACCOUNT_ID, HEADER_KEY_ID};
use cortex_core::harness::ModelSpec;
use cortex_core::openai::{ChatCompletionRequest, MessageContent};
use cortex_core::responses::{ResponsesRequest, ResponsesUsage};
@@ -71,6 +72,12 @@ async fn health_handler(State(state): State<Arc<NeuronState>>) -> Json<HealthRes
// know about activation lifecycle.
let mut snapshot = state.health_cache.snapshot().await;
snapshot.activation = state.activation.snapshot().await;
// Per-model admission load (#53) — read live from the candle harness so
// cortex's load-aware router (#55) can spread traffic and propagate
// backpressure. Absent when no candle harness is present.
if let Some(candle) = &state.candle {
snapshot.models = candle.load_snapshot().await;
}
Json(snapshot)
}
@@ -228,6 +235,17 @@ fn default_enable_thinking(req: &mut ChatCompletionRequest, include_thinking: bo
}
}
/// The request's principal for fair-share admission (#54), reconstructed
/// from the internal headers cortex stamps (#49). cortex strips any
/// client-supplied copy and asserts the authoritative value, so over the
/// trusted WireGuard link these are safe to key fair-share on. `None` for an
/// unauthenticated/direct request — exempt from the per-principal cap.
fn principal_key(headers: &axum::http::HeaderMap) -> Option<String> {
let account = headers.get(HEADER_ACCOUNT_ID)?.to_str().ok()?;
let key = headers.get(HEADER_KEY_ID)?.to_str().ok()?;
Some(format!("{account}/{key}"))
}
/// OpenAI-compatible chat completions. Dispatches to streaming SSE when
/// `stream: true` is set on the request; otherwise returns a single
/// `ChatCompletionResponse`.
@@ -271,8 +289,14 @@ async fn chat_completions(
// true`) keep reasoning on.
default_enable_thinking(&mut req, include_thinking);
// Fair-share admission principal (#54), from cortex's stamped headers.
let principal = principal_key(&headers);
if req.stream.unwrap_or(false) {
match candle.chat_completion_stream_with(req, chat_config).await {
match candle
.chat_completion_stream_with(req, chat_config, principal)
.await
{
Ok(rx) => {
// Each chunk → one SSE `data: {json}` line. After the
// channel closes, append the OpenAI [DONE] terminator.
@@ -289,7 +313,7 @@ async fn chat_completions(
Err(e) => inference_error_response(e),
}
} else {
match candle.chat_completion(req).await {
match candle.chat_completion(req, principal).await {
Ok(resp) => Json(resp).into_response(),
Err(e) => inference_error_response(e),
}
@@ -302,6 +326,7 @@ async fn chat_completions(
/// event stream into the Responses event family.
async fn responses(
State(state): State<Arc<NeuronState>>,
headers: axum::http::HeaderMap,
Json(req): Json<ResponsesRequest>,
) -> impl IntoResponse {
let Some(candle) = state.candle.as_ref().map(Arc::clone) else {
@@ -336,9 +361,12 @@ async fn responses(
};
chat_req.stream = Some(stream_requested);
// Fair-share admission principal (#54), from cortex's stamped headers.
let principal = principal_key(&headers);
if stream_requested {
match candle
.responses_stream(chat_req, response_id, message_item_id)
.responses_stream(chat_req, response_id, message_item_id, principal)
.await
{
Ok(rx) => {
@@ -362,7 +390,7 @@ async fn responses(
// and translate the result. We don't currently re-tokenise
// to compute usage; the harness returns it via the chat
// response and we pass it through.
match candle.chat_completion(chat_req).await {
match candle.chat_completion(chat_req, principal).await {
Ok(chat_resp) => {
// Extract the assistant text (chat completions
// always emits one choice on the candle path).
@@ -486,6 +514,24 @@ fn inference_error_response(err: InferenceError) -> axum::response::Response {
"template_render_failed",
format!("chat template could not render this request: {detail}"),
),
// Admission control refused on load (#53): a fast, retryable "busy"
// signal. 503 (service busy) + Retry-After; opencode/AI SDK back off.
InferenceError::Overloaded { retry_after_secs } => OpenAiError::new(
503,
"rate_limit_error",
"rate_limit_exceeded",
"model is busy (admission queue full); retry shortly",
)
.with_retry_after(retry_after_secs),
// Per-principal fair-share cap (#54): 429 rate_limit_exceeded +
// Retry-After — the caller is sending too many concurrent requests.
InferenceError::PerPrincipalLimit { retry_after_secs } => OpenAiError::new(
429,
"rate_limit_error",
"rate_limit_exceeded",
"too many concurrent requests for this key; retry shortly",
)
.with_retry_after(retry_after_secs),
InferenceError::Other(e) => OpenAiError::without_code(500, "api_error", format!("{e:#}")),
};
envelope_response(env)
@@ -660,6 +706,26 @@ mod error_envelope_tests {
assert_eq!(error["required_mb"], 8_192);
}
#[tokio::test]
async fn overloaded_is_503_rate_limited_with_retry_after() {
// Admission rejection (#53) → fast, retryable backpressure.
let resp = inference_error_response(InferenceError::Overloaded {
retry_after_secs: 7,
});
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
let retry = resp
.headers()
.get(axum::http::header::RETRY_AFTER)
.expect("admission rejection must advertise Retry-After");
assert_eq!(retry.to_str().unwrap(), "7");
let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap();
let body: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(body["error"]["code"], "rate_limit_exceeded");
}
#[tokio::test]
async fn insufficient_vram_carries_retry_after() {
// Transient 503 — VRAM frees as in-flight requests finish, so the

View File

@@ -85,6 +85,68 @@ pub struct CandleHarnessConfig {
/// `/models`, and enforces it. These knobs tune that derivation.
#[serde(default)]
pub context_limit: ContextLimitConfig,
/// Admission control (#53): bounds the per-model wait queue so a busy
/// model returns a fast, retryable `429`/`503` instead of stalling new
/// requests until their client times out.
#[serde(default)]
pub admission: AdmissionConfig,
}
/// `[harness.candle.admission]` settings (#53).
///
/// Inference is batch-1, so `max_in_flight` is 1 in practice; the queue
/// (`max_queue_depth`) absorbs short bursts, and `max_wait_secs` caps how
/// long a queued request waits before it's refused with backpressure.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdmissionConfig {
/// Concurrent running requests per model. Batch-1 inference → 1.
#[serde(default = "default_admission_max_in_flight")]
pub max_in_flight: usize,
/// Queued (waiting) requests allowed beyond the in-flight one. The
/// `(max_in_flight + max_queue_depth + 1)`-th request is refused
/// immediately with `429`/`503` + `Retry-After`.
#[serde(default = "default_admission_max_queue_depth")]
pub max_queue_depth: usize,
/// Maximum seconds a queued request waits for the in-flight slot before
/// it is refused (turns the old ~300s client-side hang into a fast,
/// honest signal).
#[serde(default = "default_admission_max_wait_secs")]
pub max_wait_secs: u64,
/// Per-principal fair-share cap (#54): max in-flight + queued requests
/// for any single principal (resolved from the `x-helexa-*` headers
/// cortex stamps), so one client can't monopolize the queue while others
/// wait. Over-cap → `429 rate_limit_exceeded` + `Retry-After`. `0`
/// disables the cap; anonymous requests are always exempt.
#[serde(default = "default_admission_max_per_principal")]
pub max_per_principal: usize,
}
impl Default for AdmissionConfig {
fn default() -> Self {
Self {
max_in_flight: default_admission_max_in_flight(),
max_queue_depth: default_admission_max_queue_depth(),
max_wait_secs: default_admission_max_wait_secs(),
max_per_principal: default_admission_max_per_principal(),
}
}
}
fn default_admission_max_in_flight() -> usize {
1
}
fn default_admission_max_queue_depth() -> usize {
8
}
fn default_admission_max_wait_secs() -> u64 {
30
}
fn default_admission_max_per_principal() -> usize {
2
}
/// `[harness.candle.prefix_cache]` settings.

View File

@@ -0,0 +1,298 @@
//! Per-model admission control (#53).
//!
//! Inference against a loaded model is batch-1: one request runs at a time,
//! serialized by the model's `inference_lock` (single-GPU) / `pool` mutex
//! (TP). Before this, the wait for that lock was an **unbounded FIFO of
//! mutex waiters with no timeout** — a busy model made every new request
//! hang until its client gave up (~300s) with an opaque error.
//!
//! [`AdmissionController`] replaces that implicit unbounded wait with an
//! explicit bounded scheduler: at most `max_in_flight` running (1, batch-1)
//! plus a bounded queue of `max_queue_depth` waiters, each waiting at most
//! `max_wait`. When the queue is full or the wait elapses, the request is
//! rejected *immediately* — an honest, fast, retryable "busy" signal
//! (`429`/`503` + `Retry-After` per #63) instead of a silent stall.
//!
//! The controller is pure async (no CUDA), so the inference paths just call
//! [`AdmissionController::enter`] before taking the inference lock and hold
//! the returned [`AdmissionPermit`] for the request's lifetime. Its counters
//! ([`in_flight`](AdmissionController::in_flight) /
//! [`queue_depth`](AdmissionController::queue_depth)) are lock-free, so
//! `/health` can read live load without contending with inference.
use crate::config::AdmissionConfig;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
/// Why admission was refused. All map to the #63 backpressure envelope
/// (`rate_limit_exceeded` + `Retry-After`); they differ in cause (and HTTP
/// status — load → `503`, per-principal → `429`).
#[derive(Debug, Clone, Copy)]
pub enum AdmissionRejection {
/// The bounded wait queue was already full (server-side load).
QueueFull { retry_after_secs: u64 },
/// A queue slot was taken but the in-flight slot didn't free within
/// `max_wait` (server-side load).
Timeout { retry_after_secs: u64 },
/// This principal already has `max_per_principal` requests in flight or
/// queued (#54 fair-share) — one principal can't monopolize the model.
PrincipalCap { retry_after_secs: u64 },
}
impl AdmissionRejection {
pub fn retry_after_secs(&self) -> u64 {
match self {
AdmissionRejection::QueueFull { retry_after_secs }
| AdmissionRejection::Timeout { retry_after_secs }
| AdmissionRejection::PrincipalCap { retry_after_secs } => *retry_after_secs,
}
}
}
/// Admission accounting, mutated under a brief lock (never held across an
/// await). `pending` is queued + in-flight overall; `per_principal` is the
/// same count keyed by principal for fair-share (#54).
#[derive(Default, Debug)]
struct AdmissionState {
pending: usize,
per_principal: HashMap<String, usize>,
}
/// Bounded batch-1 scheduler for one loaded model, with per-principal
/// fair-share.
pub struct AdmissionController {
/// In-flight slots — `max_in_flight` permits (1 for batch-1).
slots: Arc<Semaphore>,
/// Queued + in-flight accounting (overall + per principal).
state: Arc<Mutex<AdmissionState>>,
/// `max_in_flight + max_queue_depth` — the overall rejection threshold.
max_pending: usize,
/// Max in-flight + queued for any single principal (#54). `0` disables.
max_per_principal: usize,
max_in_flight: usize,
max_wait: Duration,
}
impl AdmissionController {
pub fn new(cfg: &AdmissionConfig) -> Self {
// A controller with zero in-flight slots would deadlock; clamp.
let max_in_flight = cfg.max_in_flight.max(1);
Self {
slots: Arc::new(Semaphore::new(max_in_flight)),
state: Arc::new(Mutex::new(AdmissionState::default())),
max_pending: max_in_flight + cfg.max_queue_depth,
max_per_principal: cfg.max_per_principal,
max_in_flight,
max_wait: Duration::from_secs(cfg.max_wait_secs),
}
}
/// Admit a request for `principal` (`None` = anonymous, exempt from the
/// per-principal cap). Reserves a queue slot — fast-rejecting if the
/// overall queue is full or the principal is over its fair-share cap —
/// then waits up to `max_wait` for an in-flight slot. The returned permit
/// must be held for the request's lifetime; dropping it frees the slots.
pub async fn enter(
&self,
principal: Option<&str>,
) -> Result<AdmissionPermit, AdmissionRejection> {
// Decision + reservation under one brief lock so concurrent callers
// can't both slip past the thresholds. No await is held here.
{
let mut st = self.state.lock().expect("admission state poisoned");
if st.pending >= self.max_pending {
return Err(AdmissionRejection::QueueFull {
retry_after_secs: self.retry_hint(st.pending),
});
}
if let Some(p) = principal
&& self.max_per_principal > 0
&& st.per_principal.get(p).copied().unwrap_or(0) >= self.max_per_principal
{
return Err(AdmissionRejection::PrincipalCap {
retry_after_secs: self.retry_hint(st.pending),
});
}
st.pending += 1;
if let Some(p) = principal {
*st.per_principal.entry(p.to_string()).or_insert(0) += 1;
}
}
match tokio::time::timeout(self.max_wait, Arc::clone(&self.slots).acquire_owned()).await {
Ok(Ok(permit)) => Ok(AdmissionPermit {
_permit: permit,
state: Arc::clone(&self.state),
principal: principal.map(str::to_string),
}),
// Semaphore is never closed; treat a closed/elapsed wait the same.
Ok(Err(_)) | Err(_) => {
self.release(principal);
Err(AdmissionRejection::Timeout {
retry_after_secs: self.retry_hint(self.max_pending),
})
}
}
}
/// Roll back a reserved-but-not-admitted slot (wait timed out).
fn release(&self, principal: Option<&str>) {
let mut st = self.state.lock().expect("admission state poisoned");
st.pending = st.pending.saturating_sub(1);
decrement_principal(&mut st.per_principal, principal);
}
/// Requests currently running (holding an in-flight slot).
pub fn in_flight(&self) -> usize {
self.max_in_flight
.saturating_sub(self.slots.available_permits())
}
/// Requests waiting for an in-flight slot.
pub fn queue_depth(&self) -> usize {
let pending = self.state.lock().expect("admission state poisoned").pending;
pending.saturating_sub(self.in_flight())
}
/// Rough `Retry-After`: scale with how backed-up the model is, clamped to
/// a sane band. Without per-request timing this is a heuristic, but it
/// gives well-behaved clients (opencode/AI SDK) a sensible backoff.
fn retry_hint(&self, pending: usize) -> u64 {
let queued = pending.saturating_sub(self.max_in_flight) as u64;
((queued + 1) * 2).clamp(1, 120)
}
}
/// Decrement (and prune at zero) a principal's outstanding count.
fn decrement_principal(map: &mut HashMap<String, usize>, principal: Option<&str>) {
if let Some(p) = principal
&& let Some(count) = map.get_mut(p)
{
*count -= 1;
if *count == 0 {
map.remove(p);
}
}
}
/// Held for a request's lifetime; frees the in-flight + queue slot (and the
/// principal's fair-share slot) on drop.
#[derive(Debug)]
pub struct AdmissionPermit {
_permit: OwnedSemaphorePermit,
state: Arc<Mutex<AdmissionState>>,
principal: Option<String>,
}
impl Drop for AdmissionPermit {
fn drop(&mut self) {
let mut st = self.state.lock().expect("admission state poisoned");
st.pending = st.pending.saturating_sub(1);
decrement_principal(&mut st.per_principal, self.principal.as_deref());
}
}
#[cfg(test)]
mod tests {
use super::*;
/// Config with the per-principal cap disabled (0) — most tests exercise
/// the overall queue with anonymous (`None`) callers.
fn cfg(max_in_flight: usize, max_queue_depth: usize, max_wait_secs: u64) -> AdmissionConfig {
AdmissionConfig {
max_in_flight,
max_queue_depth,
max_wait_secs,
max_per_principal: 0,
}
}
#[tokio::test]
async fn admits_up_to_in_flight_and_reports_load() {
let ctrl = AdmissionController::new(&cfg(1, 4, 30));
assert_eq!(ctrl.in_flight(), 0);
let p = ctrl.enter(None).await.expect("first admits");
assert_eq!(ctrl.in_flight(), 1);
assert_eq!(ctrl.queue_depth(), 0);
drop(p);
assert_eq!(ctrl.in_flight(), 0);
}
#[tokio::test]
async fn rejects_when_queue_full() {
// 1 in-flight + 1 queue slot = capacity 2; the 3rd is refused fast.
let ctrl = Arc::new(AdmissionController::new(&cfg(1, 1, 30)));
let _running = ctrl.enter(None).await.expect("admit running");
// Fill the single queue slot with a waiter that parks on the semaphore.
let ctrl2 = Arc::clone(&ctrl);
let waiter = tokio::spawn(async move { ctrl2.enter(None).await.map(|p| drop(p)) });
// Give the waiter a moment to occupy the queue slot.
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(ctrl.queue_depth(), 1);
// Queue full → immediate QueueFull with a Retry-After hint.
match ctrl.enter(None).await {
Err(AdmissionRejection::QueueFull { retry_after_secs }) => {
assert!(retry_after_secs >= 1)
}
other => panic!("expected QueueFull, got {other:?}"),
}
// Release the runner so the parked waiter can proceed and finish.
drop(_running);
waiter.await.unwrap().unwrap();
}
#[tokio::test]
async fn rejects_on_wait_timeout() {
// Zero queue depth + a runner holding the only slot → a second
// request can't even queue, so it's QueueFull, not Timeout. Use a
// queue of 1 and a tiny max_wait to exercise the timeout path.
let ctrl = Arc::new(AdmissionController::new(&cfg(1, 1, 0)));
let _running = ctrl.enter(None).await.expect("admit running");
// max_wait 0 → the queued request times out almost immediately.
match ctrl.enter(None).await {
Err(AdmissionRejection::Timeout { .. }) => {}
other => panic!("expected Timeout, got {other:?}"),
}
// The timed-out request released its queue slot.
assert_eq!(ctrl.queue_depth(), 0);
}
#[tokio::test]
async fn per_principal_cap_protects_other_principals() {
// Generous overall queue, but each principal capped at 1 in-flight+
// queued. Principal A holds the running slot; A's second request is
// refused (PrincipalCap) rather than occupying the queue, so B's
// single request still gets a queue slot and proceeds.
let cfg = AdmissionConfig {
max_in_flight: 1,
max_queue_depth: 8,
max_wait_secs: 30,
max_per_principal: 1,
};
let ctrl = Arc::new(AdmissionController::new(&cfg));
let _a1 = ctrl.enter(Some("acct-a/key-a")).await.expect("A admits");
// A is over its fair-share cap → fast PrincipalCap, no queue slot taken.
match ctrl.enter(Some("acct-a/key-a")).await {
Err(AdmissionRejection::PrincipalCap { retry_after_secs }) => {
assert!(retry_after_secs >= 1)
}
other => panic!("expected PrincipalCap, got {other:?}"),
}
// B (a different principal) is admitted to the queue and proceeds
// once A releases — it was never stuck behind A's backlog.
let ctrl2 = Arc::clone(&ctrl);
let b = tokio::spawn(async move { ctrl2.enter(Some("acct-b/key-b")).await.map(drop) });
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(ctrl.queue_depth(), 1, "B is queued, not rejected");
drop(_a1);
b.await.unwrap().expect("B is served after A releases");
}
}

View File

@@ -81,6 +81,9 @@ pub struct CandleHarness {
/// Context-limit derivation settings (#67), read in `list_models`
/// to compute each model's advertised `limit{context,input,output}`.
context_limit_cfg: crate::config::ContextLimitConfig,
/// Admission-control settings (#53), used to build each loaded model's
/// [`super::admission::AdmissionController`] at load time.
admission_cfg: crate::config::AdmissionConfig,
}
/// Devices/capabilities snapshot of a model entering auto-recovery
@@ -146,6 +149,16 @@ impl LoadedHandle {
}
}
/// Current admission load (#53): `(in_flight, queue_depth)`. Lock-free,
/// so `/health` can read it without contending with inference.
pub fn load(&self) -> (usize, usize) {
match self {
LoadedHandle::Single(m) => (m.admission.in_flight(), m.admission.queue_depth()),
#[cfg(feature = "cuda")]
LoadedHandle::Tp(m) => (m.admission.in_flight(), m.admission.queue_depth()),
}
}
/// Modalities the loaded model supports. Stage B7 (single-GPU) +
/// TP-vision (#12) — both single-GPU and TP loads advertise
/// `"vision"` when a replicated vision tower materialised.
@@ -305,6 +318,10 @@ pub struct LoadedModel {
/// for the TP path (which already had this invariant by accident
/// because the pool lock covered the same window).
pub inference_lock: tokio::sync::Mutex<()>,
/// Bounded admission scheduler (#53). Gated *before* `inference_lock`
/// so a busy model refuses overflow fast instead of growing an
/// unbounded, untimed queue of lock waiters.
pub admission: super::admission::AdmissionController,
/// Open/close token IDs for the reasoning marker this model
/// emits, populated once at load time by probing the tokenizer's
/// added-tokens table. `None` for non-reasoning models or
@@ -422,6 +439,10 @@ pub struct TpLoadedModel {
/// serialises subprocess RPC traffic on the pool's
/// `Vec<Worker>` channels.
pub pool: tokio::sync::Mutex<super::tp::WorkerPool>,
/// Bounded admission scheduler (#53), mirroring the single-GPU path.
/// Gated before the pool lock so an overloaded TP model returns fast
/// backpressure instead of an unbounded, untimed wait.
pub admission: super::admission::AdmissionController,
/// Handle into the leader device worker's TP slab. The boxed
/// `TpLeaderModel` (with its embedded `Arc<Comm>` clones and
/// per-rank CUDA tensors) lives on the worker thread; we hold an
@@ -1565,6 +1586,7 @@ impl CandleHarness {
recovery_tx,
prefix_cache_cfg: config.prefix_cache.clone(),
context_limit_cfg: config.context_limit.clone(),
admission_cfg: config.admission.clone(),
});
// Background auto-recovery task (#17). Holds a `Weak` so it can't
// keep the harness alive. Spawned only when a tokio runtime is
@@ -2006,6 +2028,7 @@ impl CandleHarness {
pub async fn chat_completion(
&self,
request: ChatCompletionRequest,
principal: Option<String>,
) -> Result<ChatCompletionResponse, InferenceError> {
let handle = {
let models = self.models.read().await;
@@ -2030,7 +2053,7 @@ impl CandleHarness {
LoadedHandle::Single(m) => m,
#[cfg(feature = "cuda")]
LoadedHandle::Tp(m) => {
return self.chat_completion_tp(m, request).await;
return self.chat_completion_tp(m, request, principal).await;
}
};
@@ -2059,6 +2082,15 @@ impl CandleHarness {
return Err(self.trigger_recovery(&model_id).await);
}
// Admission control (#53): refuse fast if the bounded queue is full
// or the wait elapses, rather than joining an unbounded lock-wait.
// The permit is held for the whole request (released on drop).
let _admit = loaded
.admission
.enter(principal.as_deref())
.await
.map_err(InferenceError::from)?;
// Serialise concurrent requests against this model. Holds for
// the duration of clear_kv_cache → prefill → decode so two
// requests' chunked-prefill sequences can't interleave on the
@@ -2378,9 +2410,14 @@ impl CandleHarness {
pub async fn chat_completion_stream(
&self,
request: ChatCompletionRequest,
principal: Option<String>,
) -> Result<mpsc::Receiver<ChatCompletionChunk>, InferenceError> {
self.chat_completion_stream_with(request, wire_chat::ChatProjectionConfig::default())
.await
self.chat_completion_stream_with(
request,
wire_chat::ChatProjectionConfig::default(),
principal,
)
.await
}
/// Same as [`Self::chat_completion_stream`] but lets the caller
@@ -2391,8 +2428,9 @@ impl CandleHarness {
&self,
request: ChatCompletionRequest,
mut config: wire_chat::ChatProjectionConfig,
principal: Option<String>,
) -> Result<mpsc::Receiver<ChatCompletionChunk>, InferenceError> {
let stream = self.inference_stream(request).await?;
let stream = self.inference_stream(request, principal).await?;
// Fill in the model's reasoning markers if the caller
// didn't pre-populate them — they're a property of the
// loaded model (which the HTTP handler doesn't reach into
@@ -2419,9 +2457,10 @@ impl CandleHarness {
request: ChatCompletionRequest,
response_id: String,
message_item_id: String,
principal: Option<String>,
) -> Result<mpsc::Receiver<crate::wire::openai_responses::ResponseStreamFrame>, InferenceError>
{
let stream = self.inference_stream(request).await?;
let stream = self.inference_stream(request, principal).await?;
let meta = crate::wire::openai_responses::ResponseMeta {
response_id,
created_at: stream.created,
@@ -2442,6 +2481,7 @@ impl CandleHarness {
async fn inference_stream(
&self,
request: ChatCompletionRequest,
principal: Option<String>,
) -> Result<InferenceStream, InferenceError> {
let handle = {
let models = self.models.read().await;
@@ -2466,7 +2506,7 @@ impl CandleHarness {
LoadedHandle::Single(m) => m,
#[cfg(feature = "cuda")]
LoadedHandle::Tp(m) => {
return self.inference_tp_stream(m, request).await;
return self.inference_tp_stream(m, request, principal).await;
}
};
@@ -2610,6 +2650,15 @@ impl CandleHarness {
// role chunk was already sent above, so the client sees
// immediate "stream open" feedback even when this request
// queues behind another for the lock.
// Admission control (#53): refuse before opening the stream if the
// model's bounded queue is full / the wait elapses. The permit moves
// into the inference task and is held until it completes.
let admit = loaded
.admission
.enter(principal.as_deref())
.await
.map_err(InferenceError::from)?;
let tool_schemas = build_tool_schemas(&request);
if let (Some(worker), Some(handle)) = (loaded.worker.clone(), loaded.arch_handle) {
#[cfg(feature = "cuda")]
@@ -2620,6 +2669,7 @@ impl CandleHarness {
let tool_schemas_inner = tool_schemas.clone();
tokio::spawn(
async move {
let _admit = admit;
let _inference_guard = loaded_for_task.inference_lock.lock().await;
match stream_inference_via_worker(
worker,
@@ -2680,6 +2730,7 @@ impl CandleHarness {
let tool_call_tokens_inner = loaded.tool_call_tokens.clone();
let tool_schemas_inner = tool_schemas.clone();
tokio::task::spawn_blocking(move || {
let _admit = admit;
let _g = span_for_task.enter();
// `blocking_lock` is safe here: spawn_blocking runs on
// a dedicated thread, not on the async runtime, so
@@ -2779,6 +2830,24 @@ pub struct InferenceStream {
/// Auto-recovery (#17) — rebuild a poisoned model's device context
/// automatically instead of leaving it bricked until a human reloads.
impl CandleHarness {
/// Per-model admission load for `GET /health` (#53): in-flight + queued
/// counts for every resident model. Lock-free per-model reads, so this
/// only briefly holds the registry read lock to enumerate handles.
pub async fn load_snapshot(&self) -> Vec<cortex_core::discovery::ModelLoad> {
let models = self.models.read().await;
models
.values()
.map(|handle| {
let (in_flight, queue_depth) = handle.load();
cortex_core::discovery::ModelLoad {
id: handle.model_id().to_string(),
in_flight,
queue_depth,
}
})
.collect()
}
/// True while `model_id` is being auto-recovered (its slot is briefly
/// absent from the registry during the reload).
pub async fn is_recovering(&self, model_id: &str) -> bool {
@@ -3128,6 +3197,7 @@ impl Harness for CandleHarness {
worker,
arch_handle,
inference_lock: tokio::sync::Mutex::new(()),
admission: super::admission::AdmissionController::new(&self.admission_cfg),
reasoning_tokens,
tool_call_tokens,
chat_template,
@@ -3372,6 +3442,7 @@ impl CandleHarness {
tokenizer,
devices: devices.clone(),
pool: TMutex::new(pool),
admission: super::admission::AdmissionController::new(&self.admission_cfg),
leader_handle,
leader_device: leader_device.clone(),
poisoned: AtomicBool::new(false),
@@ -3438,6 +3509,7 @@ impl CandleHarness {
&self,
tp: Arc<TpLoadedModel>,
request: ChatCompletionRequest,
principal: Option<String>,
) -> Result<ChatCompletionResponse, InferenceError> {
// Tag every line of this request with a short req_id so a
// grep over journalctl reconstructs one request even when
@@ -3474,7 +3546,8 @@ impl CandleHarness {
}
let tp_for_marker = Arc::clone(&tp);
let handle = tokio::spawn(chat_completion_tp_inner(tp, request).instrument(span.clone()));
let handle =
tokio::spawn(chat_completion_tp_inner(tp, request, principal).instrument(span.clone()));
match handle.await {
Ok(Ok(resp)) => Ok(resp),
Ok(Err(e)) => {
@@ -3545,6 +3618,7 @@ impl CandleHarness {
&self,
tp: Arc<TpLoadedModel>,
request: ChatCompletionRequest,
principal: Option<String>,
) -> Result<InferenceStream, InferenceError> {
if tp.poisoned.load(Ordering::Acquire) {
return Err(self.trigger_recovery(&request.model).await);
@@ -3690,10 +3764,19 @@ impl CandleHarness {
validate_vision_prefill(prompt_len, vram_free_mb)?;
}
// Admission control (#53): refuse before opening the stream; the
// permit moves into the orchestration task and is held for its life.
let admit = tp
.admission
.enter(principal.as_deref())
.await
.map_err(InferenceError::from)?;
let tool_schemas = build_tool_schemas(&request);
let tp_for_task = Arc::clone(&tp);
tokio::spawn(
async move {
let _admit = admit;
let mut failure: Option<String> = None;
let mut pool = acquire_pool_lock(&tp_for_task.pool, &model_id).await;
let leader_handle = tp_for_task.leader_handle;
@@ -4196,6 +4279,7 @@ impl CandleHarness {
async fn chat_completion_tp_inner(
tp: Arc<TpLoadedModel>,
request: ChatCompletionRequest,
principal: Option<String>,
) -> Result<ChatCompletionResponse, InferenceError> {
let req_start = std::time::Instant::now();
let model_id = request.model.clone();
@@ -4284,6 +4368,14 @@ async fn chat_completion_tp_inner(
validate_vision_prefill(prompt_len, vram_free_mb)?;
}
// Admission control (#53): bounded queue + fast reject before joining
// the pool-lock wait. Held for the whole request (released on drop).
let _admit = tp
.admission
.enter(principal.as_deref())
.await
.map_err(InferenceError::from)?;
// Acquire the pool lock for the duration of the request. After
// Phase 3 the leader's TpLeaderModel lives in the device worker
// thread, so the pool lock now serialises only subprocess RPC
@@ -4826,10 +4918,35 @@ pub enum InferenceError {
/// failure mode that hid several client-compat bugs. Maps to 422.
#[error("chat template could not render this request: {detail}")]
TemplateRenderFailed { detail: String },
/// Admission control (#53) refused on load: the model's bounded queue is
/// full or the wait elapsed. Maps to `503 rate_limit_exceeded` +
/// `Retry-After` — a fast, retryable "busy" signal, not a stall.
#[error("model is busy; retry after {retry_after_secs}s")]
Overloaded { retry_after_secs: u64 },
/// Per-principal fair-share cap (#54) exceeded: this principal already
/// has its max requests in flight/queued. Maps to `429
/// rate_limit_exceeded` + `Retry-After`; a well-behaved client self-paces.
#[error("per-principal in-flight limit reached; retry after {retry_after_secs}s")]
PerPrincipalLimit { retry_after_secs: u64 },
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl From<super::admission::AdmissionRejection> for InferenceError {
fn from(rejection: super::admission::AdmissionRejection) -> Self {
use super::admission::AdmissionRejection;
match rejection {
AdmissionRejection::QueueFull { retry_after_secs }
| AdmissionRejection::Timeout { retry_after_secs } => {
InferenceError::Overloaded { retry_after_secs }
}
AdmissionRejection::PrincipalCap { retry_after_secs } => {
InferenceError::PerPrincipalLimit { retry_after_secs }
}
}
}
}
/// Build the model's prompt from a [`ChatCompletionRequest`].
///
/// Prefers the model's own `chat_template` when one was loaded

View File

@@ -1,5 +1,6 @@
//! Harness registry — maps harness names to trait implementations.
pub mod admission;
pub mod arch;
pub mod candle;
pub mod chat_template;

View File

@@ -30,6 +30,9 @@ impl HealthCache {
// direct read from the cache stays a well-typed
// HealthResponse on the wire.
activation: Default::default(),
// Per-model admission load is overlaid by the api handler
// from the candle harness (#53); the cache doesn't own it.
models: Vec::new(),
}),
has_gpus: RwLock::new(false),
}

View File

@@ -114,6 +114,12 @@ async fn test_health_endpoint() {
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["uptime_secs"], 0);
// Per-model admission load (#53) is always present, even with no models
// loaded (empty array) — cortex's load-aware router (#55) relies on it.
assert!(
body["models"].is_array(),
"/health must expose a models load array"
);
}
#[tokio::test]