Compare commits

..

14 Commits

Author SHA1 Message Date
3b9a6e37f6 Merge fix/cortex-poll-debounce-retryable: poll debounce + retryable 503 for feasible-but-unhealthy node
All checks were successful
build-prerelease / Resolve version stamps + change detection (push) Successful in 58s
build-prerelease / Build neuron-blackwell (push) Successful in 1m31s
build-prerelease / Lint (fmt + clippy) (push) Successful in 2m31s
build-prerelease / Build cortex binary (push) Successful in 3m0s
build-prerelease / Test (push) Successful in 5m3s
build-prerelease / Package cortex RPM (push) Successful in 1m20s
build-prerelease / Build neuron-ada (push) Successful in 2m2s
build-prerelease / Build neuron-ampere (push) Successful in 2m12s
build-prerelease / Build helexa-bench binary (push) Successful in 2m9s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 1m39s
build-prerelease / Package helexa-bench RPM (push) Successful in 1m20s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 1m41s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 1m43s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 53s
2026-06-18 12:46:30 +03:00
526b662c5e fix(cortex): poll-failure debounce + retryable 503 for feasible-but-unhealthy node
All checks were successful
CI / Format (push) Successful in 44s
CI / CUDA type-check (push) Successful in 1m31s
CI / Clippy (push) Successful in 2m13s
CI / Test (push) Successful in 5m9s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
Defense-in-depth for the agent0 NoFeasibleNeuron storm (root cause fixed in
neuron). Two cortex resilience gaps this incident exposed:

1. Brittle health flip: the poller marked a node unhealthy on a SINGLE missed
   /models poll, instantly yanking the node and all its models from routing.
   A busy neuron briefly slow to answer shouldn't be declared dead. Now
   debounced: NodeState.consecutive_poll_failures must reach
   POLL_FAILURE_THRESHOLD (3) before the node flips unhealthy (~20s at the 10s
   poll interval); any successful poll resets it. A never-healthy node stays
   unhealthy (the counter only protects an already-healthy node from blips).

2. Transient surfaced as permanent: when a catalogued model's only feasible
   neuron is momentarily unhealthy, the router returned 404 NoFeasibleNeuron —
   which litellm/clients treat as non-retryable, so agent0 hard-failed.
   pick_feasible_neuron now distinguishes "a feasible node exists but is
   unhealthy right now" → new RouteError::FeasibleNodeUnhealthy (503 +
   Retry-After: 3, retryable) from "no node could ever satisfy the topology" →
   404 NoFeasibleNeuron (permanent). Mirrors the beast case exactly: healthy
   1-GPU nodes + an unhealthy 2-GPU node → retry, don't fail.

Tests: poller test updated to assert debounce (1 miss keeps healthy, 3 flip);
new feasibility_routing tests cover transient-503 vs permanent-404. Local
fmt/clippy/test green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 12:39:18 +03:00
db7e373b90 fix(neuron): decouple GET /models from the inference worker (control-plane starvation)
Some checks failed
CI / CUDA type-check (push) Successful in 1m40s
CI / Format (push) Successful in 36s
CI / Clippy (push) Successful in 2m20s
CI / Test (push) Successful in 4m28s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Test (push) Blocked by required conditions
build-prerelease / Package helexa-neuron-ada RPM (push) Blocked by required conditions
build-prerelease / Package helexa-neuron-ampere RPM (push) Blocked by required conditions
build-prerelease / Resolve version stamps + change detection (push) Successful in 36s
build-prerelease / Build cortex binary (push) Has been skipped
build-prerelease / Package cortex RPM (push) Has been skipped
build-prerelease / Build helexa-bench binary (push) Has been skipped
build-prerelease / Package helexa-bench RPM (push) Has been skipped
build-prerelease / Build neuron-blackwell (push) Successful in 1m43s
build-prerelease / Build neuron-ampere (push) Successful in 2m20s
build-prerelease / Build neuron-ada (push) Successful in 2m21s
build-prerelease / Lint (fmt + clippy) (push) Successful in 2m34s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been cancelled
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Has been cancelled
Root cause of the agent0 `NoFeasibleNeuron` 404 storm: `GET /models` →
`LoadedHandle::derived_limit` (#67) queried free VRAM *synchronously through
the per-device worker thread* on every poll. During inference that worker is
saturated serially processing forward jobs, so the VRAM query queued behind
them and `/models` blocked for seconds. cortex's poller timed out on `/models`,
marked the (sole-feasible) node unhealthy, and the model fell out of routing →
404. Confirmed live: under load, `/version` and `/health` stayed ~4ms while
`/models` hit the 5s timeout.

Fix — the HTTP control plane never touches the inference worker:
- LoadedModel / TpLoadedModel gain `last_free_mb: AtomicU64`, a cached free-VRAM
  reading.
- `derived_limit` is now sync and reads `last_free_mb` instead of awaiting a
  worker query — so `/models` is a pure cache read regardless of inference load.
- The cache is refreshed off the request path: seeded at load (worker idle),
  then by a background `vram_cache_refresh_loop` every 5s. Single-GPU caches the
  device's free VRAM; TP caches the tightest free across ranks — the exact
  values `derived_limit` used before, just no longer on the request path. A
  transient `0` (worker gone/poisoned) never clobbers a good cached value.
- The request-path live VRAM check in `validate_request` is unchanged, so the
  real prefill OOM guard still uses fresh readings.

226 neuron unit tests pass; non-CUDA build + fmt + clippy green. CUDA/TP paths
validated by branch CI; live acceptance = `/models` stays responsive under
concurrent inference (re-run of the repro).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 12:32:15 +03:00
5c1623a817 fix(#49): allow-anonymous mode must ignore unrecognized keys, not 401
All checks were successful
CI / Format (push) Successful in 38s
CI / CUDA type-check (push) Successful in 1m41s
CI / Clippy (push) Successful in 2m20s
CI / Test (push) Successful in 4m37s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Resolve version stamps + change detection (push) Successful in 32s
build-prerelease / Lint (fmt + clippy) (push) Successful in 2m21s
build-prerelease / Build neuron-blackwell (push) Has been skipped
build-prerelease / Build neuron-ampere (push) Has been skipped
build-prerelease / Build neuron-ada (push) Has been skipped
build-prerelease / Package helexa-neuron-ada RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-ampere RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been skipped
build-prerelease / Build helexa-bench binary (push) Has been skipped
build-prerelease / Package helexa-bench RPM (push) Has been skipped
build-prerelease / Build cortex binary (push) Successful in 2m35s
build-prerelease / Test (push) Successful in 4m39s
build-prerelease / Package cortex RPM (push) Successful in 1m25s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 52s
Regression from #49: the auth middleware rejected ANY present-but-
unresolvable bearer token with 401 invalid_api_key, even when
require_auth=false. But OpenAI-compatible clients (opencode, Open WebUI,
Agent Zero, litellm) send a placeholder bearer by default — so enabling
the build broke every existing client even though the operator never
opted into auth. Pre-#49 the bearer was never inspected at all.

Fix: in allow-anonymous mode (require_auth=false, the default) an
unrecognized key is now ignored and the request is served anonymously,
restoring pre-#49 behaviour. A bad key only 401s when require_auth=true.
A valid key is still resolved + metered in both modes.

Test renamed/split: unrecognized_key_is_ignored_when_auth_not_required
(now 200, served anonymously) + invalid_key_is_401_when_auth_required.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 21:40:34 +03:00
3b60dd7a31 Merge #56 (phase 3): fail-fast prompt pre-validation + advisory hints
All checks were successful
build-prerelease / Resolve version stamps + change detection (push) Successful in 36s
build-prerelease / Lint (fmt + clippy) (push) Successful in 2m44s
build-prerelease / Build neuron-blackwell (push) Successful in 1m42s
build-prerelease / Build helexa-bench binary (push) Successful in 2m2s
build-prerelease / Build neuron-ada (push) Successful in 2m19s
build-prerelease / Build neuron-ampere (push) Successful in 2m20s
build-prerelease / Build cortex binary (push) Successful in 2m28s
build-prerelease / Test (push) Successful in 4m45s
build-prerelease / Package helexa-bench RPM (push) Successful in 1m24s
build-prerelease / Package cortex RPM (push) Successful in 1m21s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 1m42s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 1m41s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 1m44s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 55s
2026-06-17 20:57:55 +03:00
4feaaf1cfb Merge #55 (phase 2d): cortex load-aware routing across replicas
Some checks failed
build-prerelease / Test (push) Blocked by required conditions
build-prerelease / Package cortex RPM (push) Blocked by required conditions
build-prerelease / Package helexa-neuron-ampere RPM (push) Blocked by required conditions
build-prerelease / Package helexa-neuron-blackwell RPM (push) Blocked by required conditions
build-prerelease / Resolve version stamps + change detection (push) Successful in 34s
build-prerelease / Build neuron-blackwell (push) Successful in 1m39s
build-prerelease / Build neuron-ampere (push) Successful in 2m20s
build-prerelease / Build neuron-ada (push) Successful in 2m21s
build-prerelease / Build cortex binary (push) Successful in 2m52s
build-prerelease / Lint (fmt + clippy) (push) Successful in 3m1s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 2m9s
build-prerelease / Build helexa-bench binary (push) Has been cancelled
build-prerelease / Package helexa-bench RPM (push) Has been cancelled
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Has been cancelled
2026-06-17 20:51:26 +03:00
057bc71e80 feat(#47 #56 phase 3): fail-fast prompt pre-validation + advisory hints
All checks were successful
CI / Format (push) Successful in 29s
CI / CUDA type-check (push) Successful in 1m37s
CI / Clippy (push) Successful in 2m35s
CI / Test (push) Successful in 5m4s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
Stage 3 (DX): A0 burned an hour then failed deep in litellm with
prompt_too_long (35544 > 32768). cortex knows each model's real context
window (#62/#67) and can pre-empt that at the edge.

- Pre-validate the prompt against the model's advertised limit.context
  before dispatch (in proxy_with_metrics, covering chat/completions/
  responses). Over → 400 context_length_exceeded in the #60 envelope — the
  same shape neuron emits on overflow, just earlier and without burning a
  cold-load/queue slot. cortex has no tokenizer, so estimate_prompt_tokens
  under-counts (~4 chars/token over message text); neuron stays the exact
  wall and we only catch gross overages. Skipped when no limit is known.
- Advisory X-Helexa-Advice header: fingerprints User-Agent
  (litellm / Agent-Zero / Zed) and attaches client-specific guidance.
  Strictly advisory — header only, never in the error envelope, behaviour
  never depends on it; unknown clients get nothing.

3 integration tests: over-long prompt → 400 context_length_exceeded with
the advice header, refused before neuron is hit; within-context passes
through; unknown client gets a clean 400 with no advice header. cortex-side
(no CUDA); local fmt/clippy/test green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 20:50:38 +03:00
dd31c3cd49 feat(#47 #55 phase 2d): cortex load-aware routing across replicas
All checks were successful
CI / Format (push) Successful in 39s
CI / CUDA type-check (push) Successful in 1m50s
CI / Clippy (push) Successful in 2m24s
CI / Test (push) Successful in 4m51s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
Stage 2 completes: when a model is loaded on more than one healthy neuron,
the router picks the least-busy replica instead of always taking the first,
and neuron backpressure propagates to the client intact.

- NodeState.model_load: per-model admission load (in_flight + queue_depth),
  stashed by the poller from neuron's /health (#53/#2b).
- router::resolve collects all loaded replicas and picks the one with the
  lowest in_flight+queue_depth (ties break by node name for determinism),
  replacing the previous first-match-wins.
- Backpressure passthrough: the existing streaming proxy already forwards
  the upstream status + all headers verbatim, so a neuron 503/429 +
  Retry-After + #60 envelope reaches the client unmodified — now covered by
  a regression test so a future change can't silently unwrap it.

Tests (tests/load_routing.rs): routes to the idle replica and follows the
lighter load when it flips; ties break by name; a saturated neuron's 503 +
Retry-After + envelope propagates through the gateway intact. All
cortex-side (no CUDA); local fmt/clippy/test green.

Retry-route-to-another-replica-on-backpressure (the issue's stretch goal)
is deferred — least-busy spread + honest passthrough is the substantive win.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 20:45:50 +03:00
c83f1eb98c feat(#47 #54 phase 2c): neuron per-principal in-flight cap (fair-share)
Some checks failed
CI / Format (push) Successful in 37s
CI / CUDA type-check (push) Successful in 1m37s
CI / Clippy (push) Successful in 2m13s
CI / Test (push) Successful in 4m50s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Test (push) Blocked by required conditions
build-prerelease / Build neuron-ampere (push) Blocked by required conditions
build-prerelease / Build neuron-ada (push) Blocked by required conditions
build-prerelease / Resolve version stamps + change detection (push) Successful in 37s
build-prerelease / Build neuron-blackwell (push) Successful in 1m28s
build-prerelease / Lint (fmt + clippy) (push) Successful in 3m0s
build-prerelease / Build cortex binary (push) Has been skipped
build-prerelease / Build helexa-bench binary (push) Has been skipped
build-prerelease / Package cortex RPM (push) Has been skipped
build-prerelease / Package helexa-bench RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-ada RPM (push) Has been cancelled
build-prerelease / Package helexa-neuron-ampere RPM (push) Has been cancelled
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been cancelled
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Has been cancelled
Budget caps total spend over time (#52); this caps instantaneous
starvation so one principal's burst can't monopolize a model while others
wait.

- AdmissionController gains per-principal accounting (moved from a lone
  atomic to a Mutex<AdmissionState> holding the overall pending count + a
  per-principal map). enter(principal) now also fast-rejects when a
  principal already has max_per_principal requests in flight/queued →
  AdmissionRejection::PrincipalCap. Anonymous (None) requests are exempt.
- Config [harness.candle.admission].max_per_principal (default 2 = one
  running + one queued; 0 disables). A bursting principal's overflow is
  refused while a different principal still gets a queue slot.
- The principal (account/key) is reconstructed on the neuron side from the
  x-helexa-account-id/key-id headers cortex stamps (#49) — trusted over
  WireGuard, never from the request body — and threaded explicitly through
  all inference entry points (chat_completion, *_stream(_with),
  responses_stream, and the TP variants) to the admission gate.
- InferenceError::PerPrincipalLimit → 429 rate_limit_exceeded + Retry-After
  (distinct from load-shedding's 503 Overloaded); opencode/AI SDK self-pace.

Tests: fair-share unit test (A floods → A's 2nd is PrincipalCap, B still
queues + is served) + the existing admission tests adapted to enter(None).
Non-CUDA build green locally; TP entry points (cuda-gated) validated by CI.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 20:40:25 +03:00
a60c9f1075 feat(#47 #53 phase 2b): expose per-model admission load in GET /health
All checks were successful
CI / Format (push) Successful in 30s
CI / CUDA type-check (push) Successful in 1m30s
CI / Clippy (push) Successful in 2m18s
CI / Test (push) Successful in 4m17s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Resolve version stamps + change detection (push) Successful in 33s
build-prerelease / Build neuron-blackwell (push) Successful in 1m42s
build-prerelease / Build neuron-ampere (push) Successful in 2m18s
build-prerelease / Build neuron-ada (push) Successful in 2m19s
build-prerelease / Build helexa-bench binary (push) Successful in 2m18s
build-prerelease / Lint (fmt + clippy) (push) Successful in 2m27s
build-prerelease / Build cortex binary (push) Successful in 2m45s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 2m2s
build-prerelease / Test (push) Successful in 4m50s
build-prerelease / Package helexa-bench RPM (push) Successful in 1m18s
build-prerelease / Package cortex RPM (push) Successful in 1m22s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 1m37s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 1m43s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 56s
Completes #53: the bounded scheduler's lock-free counters are now visible
to the fleet, which is what cortex's load-aware router (#55) consumes to
spread traffic across replicas and propagate honest backpressure.

- cortex-core::discovery: HealthResponse gains `models: Vec<ModelLoad>`
  (#[serde(default)] — back-compatible; older gateways/neurons interop).
  ModelLoad { id, in_flight, queue_depth }.
- LoadedHandle::load() → (in_flight, queue_depth), lock-free for both
  single-GPU and TP; CandleHarness::load_snapshot() enumerates resident
  models; the /health handler overlays it from the candle harness.

Tests: /health always exposes a models array (api integration test); a
pre-#53 payload without `models` still deserializes, and ModelLoad
round-trips (cortex-core serde tests). Local fmt/clippy/test green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 20:13:07 +03:00
b2bd86bfa5 feat(#47 #53 phase 2a): neuron admission control — bounded queue + backpressure
All checks were successful
CI / Format (push) Successful in 41s
CI / CUDA type-check (push) Successful in 1m40s
CI / Clippy (push) Successful in 2m18s
CI / Test (push) Successful in 4m53s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Resolve version stamps + change detection (push) Successful in 32s
build-prerelease / Build cortex binary (push) Has been skipped
build-prerelease / Build helexa-bench binary (push) Has been skipped
build-prerelease / Package cortex RPM (push) Has been skipped
build-prerelease / Package helexa-bench RPM (push) Has been skipped
build-prerelease / Build neuron-blackwell (push) Successful in 1m43s
build-prerelease / Build neuron-ampere (push) Successful in 2m18s
build-prerelease / Build neuron-ada (push) Successful in 2m19s
build-prerelease / Lint (fmt + clippy) (push) Successful in 2m29s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 1m46s
build-prerelease / Test (push) Successful in 4m48s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 1m49s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 1m53s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 1m7s
Replaces the per-model unbounded, untimed FIFO of inference-lock waiters
(a busy model made new requests hang ~300s until the client gave up with
an opaque error) with an explicit bounded scheduler.

- harness::admission::AdmissionController: batch-1 scheduler — max_in_flight
  running (1) + a bounded queue (max_queue_depth) with a max_wait. enter()
  fast-rejects when the queue is full (QueueFull) or the wait elapses
  (Timeout); the returned AdmissionPermit is held for the request and frees
  both slots on drop. Pure async (no CUDA), lock-free in_flight/queue_depth
  counters for future /health reporting. Configurable via
  [harness.candle.admission] (max_in_flight=1, max_queue_depth=8,
  max_wait_secs=30).
- Gated at all four inference entry points before the inference_lock/pool
  lock: single-GPU non-streaming + streaming, TP non-streaming + streaming.
  The streaming paths acquire the permit before opening the SSE (so a
  rejection is a clean error, not a half-open stream) and move it into the
  inference task.
- InferenceError::Overloaded { retry_after_secs } → 503 rate_limit_exceeded
  + Retry-After via the #60/#63 envelope: a fast, retryable "busy" signal
  opencode/AI SDK back off on, not a stall.

Scope: this branch is the admission *core* (the hang→backpressure fix).
Exposing in_flight/queue_depth in GET /health (consumed by cortex
load-aware routing #55) is the next focused branch under #53.

4 unit tests (admit/report load, queue-full reject, wait-timeout reject)
+ Overloaded envelope mapping test. Non-CUDA build green locally; the
CUDA + TP sites are validated by branch CI.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 20:03:07 +03:00
cdf87284af feat(#47 phase 1d): budget enforcement — hard caps, reserve→settle, 429
All checks were successful
CI / Format (push) Successful in 1s
CI / CUDA type-check (push) Successful in 1m40s
CI / Clippy (push) Successful in 2m40s
CI / Test (push) Successful in 6m23s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Resolve version stamps + change detection (push) Successful in 34s
build-prerelease / Lint (fmt + clippy) (push) Successful in 2m19s
build-prerelease / Test (push) Successful in 4m28s
build-prerelease / Build neuron-blackwell (push) Has been skipped
build-prerelease / Build neuron-ampere (push) Has been skipped
build-prerelease / Build neuron-ada (push) Has been skipped
build-prerelease / Package helexa-neuron-ada RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-ampere RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been skipped
build-prerelease / Build helexa-bench binary (push) Has been skipped
build-prerelease / Package helexa-bench RPM (push) Has been skipped
build-prerelease / Build cortex binary (push) Successful in 2m27s
build-prerelease / Package cortex RPM (push) Successful in 1m23s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 50s
Stage 1 complete: the A0 seatbelt (#52). Flips the metering-only reserve(0)
from #51 to the request's real upper-bound cost and refuses over-cap
requests *before* neuron is hit.

- metering::reservation_estimate: prompt estimate (~4 chars/token over the
  body — cortex has no tokenizer, so a conservative over-estimate; neuron
  stays the exact context wall) + max output. Max output comes from
  max_completion_tokens / legacy max_tokens, else the model's advertised
  limit.output (#62), else FALLBACK_MAX_OUTPUT. Over-reserving is safe —
  settle reconciles to actual.
- metering::reserve_or_reject: reserve the estimate; on BudgetError map to
  the #63 envelope and the caller refuses before dispatch — rolling window →
  429 rate_limit_exceeded + Retry-After (until reset); hard balance → 429
  insufficient_quota (no Retry-After). Never 402.
- Wired into both the OpenAI proxy path (proxy_with_metrics) and the
  Anthropic path (estimate from the translated body). advertised_output_limit
  reads the loaded model's limit.output from fleet state.
- Reservation prevents overshoot under concurrency: a successful reserve
  gates on spent+reserved+estimate ≤ cap, and settle records actual ≤
  reserved, so spend can never exceed the hard cap.

4 integration tests with a hit-counting mock neuron: balance over-cap →
429 insufficient_quota (no Retry-After, not dispatched); rolling over-cap →
429 rate_limit_exceeded + Retry-After (not dispatched); within-cap served;
**A0 repro** — a capped key's 20-request fan-out drains the cap, then is
refused, neuron only saw the served ones, and spend never exceeds the cap.
Plus 5 metering unit tests. Local fmt/clippy/test all green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 19:35:04 +03:00
4f16b8c541 feat(#47 phase 1c): per-request token metering + spend ledger
All checks were successful
CI / Format (push) Successful in 40s
CI / CUDA type-check (push) Successful in 1m41s
CI / Clippy (push) Successful in 2m15s
CI / Test (push) Successful in 4m28s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Resolve version stamps + change detection (push) Successful in 32s
build-prerelease / Build neuron-blackwell (push) Has been skipped
build-prerelease / Build neuron-ampere (push) Has been skipped
build-prerelease / Build neuron-ada (push) Has been skipped
build-prerelease / Package helexa-neuron-ada RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-ampere RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been skipped
build-prerelease / Lint (fmt + clippy) (push) Successful in 2m30s
build-prerelease / Build cortex binary (push) Successful in 2m49s
build-prerelease / Package cortex RPM (push) Successful in 1m24s
build-prerelease / Test (push) Successful in 5m59s
build-prerelease / Build helexa-bench binary (push) Has been skipped
build-prerelease / Package helexa-bench RPM (push) Has been skipped
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 49s
Stage 1 accounting (#51): capture real per-request usage and feed it to
the spend ledger + per-principal metrics. Establishes the reserve→settle
lifecycle that budget enforcement (#52) will tighten.

- cortex-gateway::metering: ReservationGuard makes reservation leaks
  impossible — settle() records actual spend + releases the remainder;
  dropping an un-settled guard releases the whole reservation, so any
  early return / error / dropped stream resolves it. UsageSink is the
  completion hook; principal_from_headers reconstructs the principal from
  the middleware-stamped headers (uniform across all proxy paths, no
  handler-signature churn); record_spend emits per-principal counters.
- proxy::TokenMetrics gains an optional usage_sink, invoked exactly once
  in finish() with the observed (prompt, completion) — restructured so it
  always runs (even when no body/usage arrived → settle 0 → release),
  while preserving the existing per-model metric emissions unchanged.
- All proxy paths metered: chat/completions/responses via
  proxy_with_metrics (reserve 0 → forward_request → settle in finish);
  Anthropic non-streaming settles from the buffered body; Anthropic
  streaming (anthropic_sse) now scans the upstream frames for the usage
  object (#48) — it captured none before — and settles at pump end.
- This phase reserves 0 tokens (metering only, no enforcement); #52 flips
  the reserved amount to prompt+max_output and surfaces BudgetError. The
  settle/release plumbing is identical, so that change is localized.
- New Prometheus counters: cortex_spend_tokens_total (+ prompt/completion
  splits), labelled by account/key.

2 integration tests: cumulative per-key spend after N requests with
reservations settled to zero outstanding; anonymous requests record no
spend. Local fmt/clippy/test all green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 19:29:51 +03:00
486d7e9a8f feat(#47 phase 1b): API-key auth + principal resolution
All checks were successful
CI / Format (push) Successful in 36s
CI / CUDA type-check (push) Successful in 1m51s
CI / Clippy (push) Successful in 2m40s
CI / Test (push) Successful in 5m50s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Resolve version stamps + change detection (push) Successful in 31s
build-prerelease / Build neuron-blackwell (push) Successful in 1m41s
build-prerelease / Build neuron-ada (push) Successful in 2m15s
build-prerelease / Build neuron-ampere (push) Successful in 2m18s
build-prerelease / Build helexa-bench binary (push) Successful in 2m20s
build-prerelease / Build cortex binary (push) Successful in 2m22s
build-prerelease / Lint (fmt + clippy) (push) Successful in 3m10s
build-prerelease / Test (push) Successful in 5m19s
build-prerelease / Package helexa-bench RPM (push) Successful in 1m18s
build-prerelease / Package cortex RPM (push) Successful in 1m20s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 1m40s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 1m44s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 1m45s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 57s
Stage 1 identity (#49): cortex now knows who a request is for. Identity
rides standard bearer auth only (Authorization: Bearer <key>) — no custom
required headers or body fields — which is what keeps every tier
OpenAI-compatible by construction.

- cortex-gateway::auth: `require_principal` axum middleware
  (from_fn_with_state), wired in build_app outer-to-inner as
  trace → CORS → auth → handlers (CORS outer so preflight short-circuits).
  It resolves the bearer key via the EntitlementProvider, inserts the
  typed Principal into request extensions (for metering #51 / enforcement
  #52), and stamps internal x-helexa-account-id / x-helexa-key-id headers
  so the principal reaches neuron, which trusts cortex over WireGuard (#54).
- Anti-spoofing: client-supplied principal headers are stripped before the
  authoritative value is stamped — a client can never assert a principal
  it didn't authenticate as.
- Rejection contract (#63): missing key under require_auth, or any present
  but unresolvable key, → 401 invalid_api_key in the #60 envelope. /health
  and / stay public. require_auth=false (default) allows anonymous through
  but still 401s a present-but-invalid key.
- Header-name constants (HEADER_ACCOUNT_ID/KEY_ID) live in cortex-core so
  neuron (#54) shares them. The chat/completions/responses paths forward
  the stamped headers automatically via proxy::forward_request; the
  Anthropic streaming + non-streaming paths forward them explicitly via
  auth::forward_principal_headers (they build their own upstream requests).

5 integration tests: missing-key 401, invalid-key 401 (even when auth not
required, not dispatched), valid key reaches neuron with principal headers
+ spoofed header stripped, anonymous allowed when not required, /health
public. Local fmt/clippy/test all green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 19:07:10 +03:00
27 changed files with 2706 additions and 78 deletions

View File

@@ -68,6 +68,57 @@ pub struct HealthResponse {
pub devices: Vec<DeviceHealth>,
#[serde(default)]
pub activation: ActivationStatus,
/// Per-model admission load (#53): how many requests are running vs.
/// queued on each loaded model right now. Cortex's load-aware router
/// (#55) reads this to spread traffic across replicas and to propagate
/// honest backpressure. `#[serde(default)]` keeps older gateways/neurons
/// interoperable (absent → empty → treated as no load info).
#[serde(default)]
pub models: Vec<ModelLoad>,
}
/// Live admission load for one loaded model (#53).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelLoad {
pub id: String,
/// Requests currently running (batch-1 → 0 or 1).
pub in_flight: usize,
/// Requests waiting in the bounded admission queue.
pub queue_depth: usize,
}
#[cfg(test)]
mod health_load_tests {
use super::*;
#[test]
fn health_response_without_models_field_still_deserializes() {
// A pre-#53 neuron's /health payload omits `models`; the gateway
// must still parse it (serde default → empty).
let json = r#"{"uptime_secs":42,"devices":[]}"#;
let resp: HealthResponse = serde_json::from_str(json).expect("back-compat parse");
assert_eq!(resp.uptime_secs, 42);
assert!(resp.models.is_empty());
}
#[test]
fn health_response_round_trips_model_load() {
let resp = HealthResponse {
uptime_secs: 1,
devices: vec![],
activation: ActivationStatus::default(),
models: vec![ModelLoad {
id: "Qwen/Qwen3.6-27B".into(),
in_flight: 1,
queue_depth: 3,
}],
};
let s = serde_json::to_string(&resp).unwrap();
let back: HealthResponse = serde_json::from_str(&s).unwrap();
assert_eq!(back.models.len(), 1);
assert_eq!(back.models[0].in_flight, 1);
assert_eq!(back.models[0].queue_depth, 3);
}
}
/// High-level activation state of the neuron daemon. The HTTP listener

View File

@@ -23,6 +23,14 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
/// Internal header carrying the resolved account id from cortex to neuron.
/// neuron trusts these over the WireGuard link (#54); cortex **strips** any
/// client-supplied copy before stamping the authoritative value, so a client
/// can never assert a principal directly.
pub const HEADER_ACCOUNT_ID: &str = "x-helexa-account-id";
/// Internal header carrying the resolved key id from cortex to neuron.
pub const HEADER_KEY_ID: &str = "x-helexa-key-id";
/// Who a request is for. Resolved once at the edge from the bearer key and
/// carried through the request context. `account_id` is the billable owner
/// (spendable at any operator, by decision); `key_id` identifies the

View File

@@ -1,4 +1,4 @@
use crate::discovery::{ActivationStatus, DiscoveryResponse};
use crate::discovery::{ActivationStatus, DiscoveryResponse, ModelLoad};
use crate::harness::{ModelCost, ModelLimit};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
@@ -27,6 +27,17 @@ pub struct NodeState {
/// to synthesize `Loading` locations so clients see a catalogued
/// model that's mid-prewarm as "loading", not "missing".
pub activation: Option<ActivationStatus>,
/// Last-seen per-model admission load from this neuron's `/health`
/// (#53), keyed by model id. The router (#55) reads it to pick the
/// least-busy replica when a model is loaded on more than one neuron.
/// Empty until the first /health poll reports load.
pub model_load: HashMap<String, ModelLoad>,
/// Consecutive failed `/models` polls. The poller marks a node
/// unhealthy only once this crosses a threshold, so a single transient
/// miss (e.g. a neuron momentarily slow to answer while busy) doesn't
/// yank the node — and all its models — out of routing. Reset to 0 on
/// any successful poll.
pub consecutive_poll_failures: u32,
}
/// A model registered on a node, with its runtime status.

View File

@@ -32,6 +32,8 @@ pub async fn stream_translated(
openai_body: axum::body::Bytes,
model_id: &str,
node_name: &str,
inbound_headers: &axum::http::HeaderMap,
usage_sink: Option<crate::metering::UsageSink>,
) -> Response {
let url = format!("{endpoint}/v1/chat/completions");
tracing::info!(
@@ -42,13 +44,14 @@ pub async fn stream_translated(
"proxying streaming request (anthropic SSE translation)"
);
let upstream = match client
.post(&url)
.header("content-type", "application/json")
.body(openai_body)
.send()
.await
{
let request = crate::auth::forward_principal_headers(
client
.post(&url)
.header("content-type", "application/json")
.body(openai_body),
inbound_headers,
);
let upstream = match request.send().await {
Ok(r) => r,
Err(e) => {
tracing::warn!(
@@ -94,6 +97,10 @@ pub async fn stream_translated(
let mut saw_tool_call = false;
let mut last_finish: Option<String> = None;
let mut frames = 0u64;
// Engine-truth usage for metering (#51), scanned from the upstream
// frames (neuron emits a final `usage` object on the stream, #48).
let mut usage_prompt = 0u64;
let mut usage_completion = 0u64;
'outer: while let Some(block) = upstream.next().await {
let block = match block {
@@ -121,6 +128,15 @@ pub async fn stream_translated(
continue;
}
tracing::trace!(node = %node, frame = %data, "anthropic stream: upstream frame");
// Capture usage for metering before translation — the
// usage object rides on a late frame (often after the
// last content delta).
if let Some(p) = crate::proxy::last_count_for(data, "prompt_tokens") {
usage_prompt = p;
}
if let Some(c) = crate::proxy::last_count_for(data, "completion_tokens") {
usage_completion = c;
}
let Ok(chunk) = serde_json::from_str::<ChatCompletionChunk>(data) else {
tracing::debug!(node = %node, "anthropic stream: unparsable upstream frame skipped");
continue;
@@ -162,6 +178,14 @@ pub async fn stream_translated(
terminated = done,
"anthropic stream complete"
);
// Settle metering with the observed usage (#51). Runs on every exit
// path of the pump — clean end, early break, or upstream error — so
// the reservation is always resolved. `(0, 0)` when no usage frame
// was seen, which releases without recording spend.
if let Some(sink) = usage_sink {
sink(usage_prompt, usage_completion);
}
});
Response::builder()

View File

@@ -0,0 +1,133 @@
//! API-key authentication + principal resolution (#49).
//!
//! Identity rides standard bearer auth only — `Authorization: Bearer <key>`
//! — which is what keeps every tier OpenAI-compatible by construction (no
//! custom required headers or body fields, per #47). The middleware resolves
//! the key to a [`Principal`] via the [`EntitlementProvider`], carries it in
//! the request extensions for cortex-side metering/enforcement (#51/#52), and
//! stamps it as internal headers on the request so it reaches neuron, which
//! trusts cortex's assertion over WireGuard (#54).
//!
//! Anti-spoofing: any client-supplied principal header is **stripped** before
//! the authoritative value is stamped, so a client can never assert a
//! principal it didn't authenticate as.
//!
//! Rejection contract (#63): missing key under `require_auth`, or any present
//! but unresolvable key, yields `401 invalid_api_key` in the #60 envelope.
use crate::error::envelope_response;
use crate::state::CortexState;
use axum::extract::{Request, State};
use axum::http::header::AUTHORIZATION;
use axum::http::{HeaderMap, HeaderValue};
use axum::middleware::Next;
use axum::response::Response;
use cortex_core::entitlements::{HEADER_ACCOUNT_ID, HEADER_KEY_ID};
use cortex_core::error_envelope::OpenAiError;
use std::sync::Arc;
/// Endpoints that never require auth: liveness/readiness probes. Everything
/// else flows through resolution.
fn is_public(path: &str) -> bool {
path == "/health" || path == "/"
}
/// Extract the bearer token from an `Authorization` header value, if present
/// and well-formed. Scheme match is case-insensitive per RFC 7235.
fn parse_bearer(headers: &HeaderMap) -> Option<String> {
let raw = headers.get(AUTHORIZATION)?.to_str().ok()?;
let (scheme, token) = raw.split_once(' ')?;
if scheme.eq_ignore_ascii_case("bearer") {
let token = token.trim();
(!token.is_empty()).then(|| token.to_string())
} else {
None
}
}
/// Axum middleware: resolve the bearer key, attach the principal, stamp the
/// internal headers. Wired in `build_app` via `from_fn_with_state`.
pub async fn require_principal(
State(fleet): State<Arc<CortexState>>,
mut req: Request,
next: Next,
) -> Response {
if is_public(req.uri().path()) {
return next.run(req).await;
}
// Anti-spoof: drop any client-supplied principal headers up front.
{
let headers = req.headers_mut();
headers.remove(HEADER_ACCOUNT_ID);
headers.remove(HEADER_KEY_ID);
}
match parse_bearer(req.headers()) {
Some(key) => match fleet.entitlements.resolve(&key).await {
Ok(principal) => {
// Stamp the authoritative principal for neuron. Account/key
// ids come from operator config, so they're valid header
// values; guard anyway and skip a malformed one rather than
// panic.
if let (Ok(account), Ok(key_id)) = (
HeaderValue::from_str(&principal.account_id),
HeaderValue::from_str(&principal.key_id),
) {
let headers = req.headers_mut();
headers.insert(HEADER_ACCOUNT_ID, account);
headers.insert(HEADER_KEY_ID, key_id);
}
// Carry the typed principal for cortex-side metering (#51)
// and budget enforcement (#52).
req.extensions_mut().insert(principal);
next.run(req).await
}
// An unrecognized key only hard-fails when auth is *required*.
// In allow-anonymous mode (the default) we must IGNORE it and
// serve the request unauthenticated — otherwise the placeholder
// keys that OpenAI-compatible clients send by default (opencode,
// Open WebUI, Agent Zero, litellm) would all break, even though
// the operator never opted into auth. Pre-#49 the bearer was
// never inspected at all; this preserves that for require_auth=false.
Err(_) => {
if fleet.require_auth {
unauthorized("invalid API key")
} else {
tracing::debug!(
"ignoring unrecognized bearer token (require_auth=false): serving anonymously"
);
next.run(req).await
}
}
},
None => {
if fleet.require_auth {
unauthorized("missing API key; supply 'Authorization: Bearer <key>'")
} else {
next.run(req).await
}
}
}
}
/// `401 invalid_api_key` in the standard envelope (#63).
fn unauthorized(message: &str) -> Response {
envelope_response(OpenAiError::invalid_api_key(message))
}
/// Copy the cortex-stamped principal headers from an inbound [`HeaderMap`]
/// onto an outbound reqwest builder. Used by the Anthropic proxy paths,
/// which construct their own upstream requests instead of going through
/// [`crate::proxy::forward_request`] (which forwards all headers verbatim).
pub fn forward_principal_headers(
mut builder: reqwest::RequestBuilder,
headers: &HeaderMap,
) -> reqwest::RequestBuilder {
for name in [HEADER_ACCOUNT_ID, HEADER_KEY_ID] {
if let Some(value) = headers.get(name) {
builder = builder.header(name, value);
}
}
builder
}

View File

@@ -190,7 +190,7 @@ async fn completions(
/// `POST /v1/messages` — accept Anthropic format, translate, proxy, translate back.
async fn anthropic_messages(
State(fleet): State<Arc<CortexState>>,
_headers: HeaderMap,
headers: HeaderMap,
body: Bytes,
) -> Response {
// Parse as Anthropic request.
@@ -306,6 +306,29 @@ async fn anthropic_messages(
}
let start = Instant::now();
// Per-request metering + budget enforcement (#51/#52), same lifecycle as
// the OpenAI paths. Estimate from the translated OpenAI body (what neuron
// sees). Refuse over-cap before dispatch via the #63 envelope; otherwise
// build the sink consumed by whichever branch runs below.
let usage_sink = match crate::metering::principal_from_headers(&headers) {
Some(principal) => {
let advertised =
advertised_output_limit(&fleet, &route.node_name, &route.resolved_model_id).await;
let max_tokens = crate::metering::reservation_estimate(&openai_body, advertised);
match crate::metering::reserve_or_reject(
Arc::clone(&fleet.entitlements),
&principal,
max_tokens,
)
.await
{
Ok(guard) => Some(crate::metering::usage_sink(principal, guard)),
Err(env) => return crate::error::envelope_response(env),
}
}
None => None,
};
if is_streaming {
// Anthropic SSE translation (#24): upstream speaks OpenAI SSE;
// re-frame it event-by-event into Anthropic's message_start /
@@ -316,6 +339,8 @@ async fn anthropic_messages(
openai_body,
&model_id,
&route.node_name,
&headers,
usage_sink,
)
.await;
metrics::histogram!("cortex_request_duration_seconds", &labels)
@@ -335,13 +360,16 @@ async fn anthropic_messages(
cold_start = route.cold_start,
"proxying request"
);
let upstream_resp = fleet
.http_client
.post(&target_url)
.body(openai_body)
.header("content-type", "application/json")
.send()
.await;
let upstream_resp = crate::auth::forward_principal_headers(
fleet
.http_client
.post(&target_url)
.body(openai_body)
.header("content-type", "application/json"),
&headers,
)
.send()
.await;
let upstream_resp = match upstream_resp {
Ok(r) => r,
@@ -437,6 +465,15 @@ async fn anthropic_messages(
metrics::histogram!("cortex_request_duration_seconds", &labels)
.record(start.elapsed().as_secs_f64());
// Settle metering with the upstream usage (#51). Scanned from the
// raw body — same engine-truth source as the streaming path — so we
// don't depend on the typed usage struct's optionality.
if let Some(sink) = usage_sink {
let tail = String::from_utf8_lossy(&body_bytes);
let prompt = proxy::last_count_for(&tail, "prompt_tokens").unwrap_or(0);
let completion = proxy::last_count_for(&tail, "completion_tokens").unwrap_or(0);
sink(prompt, completion);
}
// Did the model actually produce a structured tool call, or just
// text? This is the single most useful signal for "is tool
// calling working end-to-end" — a `false` here alongside a
@@ -724,6 +761,19 @@ async fn proxy_with_metrics(
body: Bytes,
model_id: &str,
) -> Response {
// Fail-fast prompt pre-validation (#56): refuse a prompt that already
// exceeds the model's advertised context window *before* dispatching to
// neuron — the same `400 context_length_exceeded` neuron would emit on
// overflow, just earlier and without burning a cold-load/queue slot.
// cortex has no tokenizer, so the estimate under-counts and neuron stays
// the exact wall; we only catch gross overages (the A0 failure mode).
if let Some(context) = advertised_context(fleet, &route.node_name, model_id).await {
let est = estimate_prompt_tokens(&body);
if est > context {
return context_length_exceeded_response(context, est, &headers);
}
}
let labels = [
("model", model_id.to_string()),
("node", route.node_name.clone()),
@@ -734,9 +784,42 @@ async fn proxy_with_metrics(
metrics::counter!("cortex_cold_starts_total", &labels).increment(1);
}
// Per-request metering + budget enforcement (#51/#52): reconstruct the
// principal from the middleware-stamped headers, reserve the request's
// upper-bound cost (prompt estimate + max output), and build the
// completion sink that settles actual spend when the response finishes.
// A reservation over the hard cap is refused *before* dispatch with the
// #63 envelope. Anonymous requests skip all of this. Must happen before
// `headers`/`body` are moved into the proxy.
let usage_sink = match crate::metering::principal_from_headers(&headers) {
Some(principal) => {
let advertised = advertised_output_limit(fleet, &route.node_name, model_id).await;
let max_tokens = crate::metering::reservation_estimate(&body, advertised);
match crate::metering::reserve_or_reject(
Arc::clone(&fleet.entitlements),
&principal,
max_tokens,
)
.await
{
Ok(guard) => Some(crate::metering::usage_sink(principal, guard)),
Err(env) => return crate::error::envelope_response(env),
}
}
None => None,
};
let start = Instant::now();
let result =
proxy::forward_request(&fleet.http_client, route, path, headers, body, model_id).await;
let result = proxy::forward_request(
&fleet.http_client,
route,
path,
headers,
body,
model_id,
usage_sink,
)
.await;
let duration = start.elapsed();
match result {
@@ -755,6 +838,117 @@ async fn proxy_with_metrics(
}
}
/// The model's advertised `limit.output` (#62) on a given node, used as the
/// default output budget for budget reservations (#52) when the request
/// omits `max_(completion_)tokens`. `None` when the node/model/limit is
/// unknown — callers fall back to [`crate::metering::FALLBACK_MAX_OUTPUT`].
async fn advertised_output_limit(
fleet: &CortexState,
node_name: &str,
model_id: &str,
) -> Option<u64> {
let nodes = fleet.nodes.read().await;
nodes
.get(node_name)?
.models
.get(model_id)?
.limit
.as_ref()
.map(|l| l.output as u64)
}
/// The model's advertised hard context window (`limit.context`, #62/#67) on a
/// node, used for fail-fast prompt pre-validation (#56). `None` when no limit
/// is known — pre-validation is then skipped and neuron remains the wall.
async fn advertised_context(fleet: &CortexState, node_name: &str, model_id: &str) -> Option<u64> {
let nodes = fleet.nodes.read().await;
nodes
.get(node_name)?
.models
.get(model_id)?
.limit
.as_ref()
.map(|l| l.context as u64)
}
/// Conservative prompt-token estimate (~4 chars/token over message text).
/// cortex has no tokenizer; under-counting is the safe direction — we only
/// pre-reject gross overages (#56), and neuron enforces the exact wall.
fn estimate_prompt_tokens(body: &[u8]) -> u64 {
let Ok(v) = serde_json::from_slice::<Value>(body) else {
return (body.len() as u64 / 4).max(1);
};
let mut chars = 0usize;
if let Some(messages) = v.get("messages").and_then(Value::as_array) {
for m in messages {
match m.get("content") {
Some(Value::String(s)) => chars += s.len(),
Some(Value::Array(parts)) => {
for p in parts {
if let Some(t) = p.get("text").and_then(Value::as_str) {
chars += t.len();
}
}
}
_ => {}
}
chars += 8; // rough per-message role/formatting overhead
}
} else if let Some(prompt) = v.get("prompt").and_then(Value::as_str) {
chars += prompt.len(); // legacy /v1/completions
} else {
return (body.len() as u64 / 4).max(1);
}
(chars as u64 / 4).max(1)
}
/// Client-specific, advisory guidance for an over-long prompt (#56),
/// fingerprinted from `User-Agent`. Strictly advisory: it rides the
/// `X-Helexa-Advice` header only, never the error envelope, and behaviour
/// never depends on it. Unknown clients get nothing.
fn client_advice(headers: &HeaderMap) -> Option<&'static str> {
let ua = headers
.get(axum::http::header::USER_AGENT)?
.to_str()
.ok()?
.to_ascii_lowercase();
if ua.contains("litellm") {
Some(
"litellm forwards the full context; lower the configured context window or enable client-side compaction",
)
} else if ua.contains("agent-zero") || ua.contains("agent zero") {
Some("reduce the conversation/context size or summarize earlier turns before resending")
} else if ua.contains("zed") {
Some("reduce the assistant context window in Zed's settings")
} else {
None
}
}
/// `400 context_length_exceeded` for an over-long prompt caught at the edge
/// (#56), in the #60 envelope — the same shape neuron emits on overflow, so
/// clients (opencode auto-compacts) handle it identically. Attaches the
/// advisory `X-Helexa-Advice` header for fingerprinted clients.
fn context_length_exceeded_response(
context: u64,
prompt_est: u64,
headers: &HeaderMap,
) -> Response {
let env = OpenAiError::context_length_exceeded(format!(
"This model's maximum context length is {context} tokens. Your request is \
estimated at ~{prompt_est} tokens. Please reduce the length of the messages."
))
.with_extra("max", json!(context))
.with_extra("estimated_prompt_tokens", json!(prompt_est));
let mut response = crate::error::envelope_response(env);
if let Some(advice) = client_advice(headers)
&& let Ok(value) = axum::http::HeaderValue::from_str(advice)
{
response.headers_mut().insert("x-helexa-advice", value);
}
response
}
/// Update `last_accessed` timestamp for a model on a node (drives LRU eviction).
async fn touch_model(fleet: &CortexState, node_name: &str, model_id: &str) {
let mut nodes = fleet.nodes.write().await;

View File

@@ -1,8 +1,10 @@
pub mod anthropic_sse;
pub mod auth;
pub mod entitlements_local;
pub mod error;
pub mod evictor;
pub mod handlers;
pub mod metering;
pub mod metrics;
pub mod poller;
pub mod proxy;
@@ -11,15 +13,26 @@ pub mod state;
use anyhow::Result;
use axum::Router;
use axum::middleware::from_fn_with_state;
use cortex_core::config::GatewayConfig;
use std::sync::Arc;
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
/// Build the Axum application router with all routes wired up.
///
/// Layer order (outermost first): trace → CORS → auth → handlers. CORS is
/// outer to auth so preflight `OPTIONS` short-circuits before resolution;
/// auth (`require_principal`) resolves the bearer key, attaches the
/// principal, and stamps the internal principal headers before any handler
/// runs.
pub fn build_app(fleet: Arc<state::CortexState>) -> Router {
Router::new()
.merge(handlers::api_routes())
.layer(from_fn_with_state(
Arc::clone(&fleet),
auth::require_principal,
))
.layer(CorsLayer::permissive())
.layer(TraceLayer::new_for_http())
.with_state(fleet)

View File

@@ -0,0 +1,219 @@
//! Per-request token metering (#51).
//!
//! Captures the real `(prompt, completion)` usage of every request and feeds
//! it to two places: the [`EntitlementProvider`] spend ledger (via
//! reserve→settle) and per-principal Prometheus counters. The principal is
//! reconstructed from the internal headers the auth middleware stamped (#49),
//! so this works uniformly across every proxy path without threading the
//! typed principal through each handler.
//!
//! The reserve→settle lifecycle is established here but, in this phase,
//! reserves **zero** tokens — metering only, no enforcement. Budget
//! enforcement (#52) flips the reserved amount to the real
//! `prompt + max_output` and handles the [`BudgetError`] rejection; the
//! settle/release plumbing is identical, so that change is localized.
//!
//! [`ReservationGuard`] makes leaks impossible: settling records actual
//! spend and releases the unused remainder; dropping a guard that was never
//! settled releases the whole reservation. So an early return, error path,
//! or dropped stream can't strand a reservation.
use axum::http::HeaderMap;
use cortex_core::entitlements::{
BudgetError, EntitlementProvider, HEADER_ACCOUNT_ID, HEADER_KEY_ID, Principal,
};
use cortex_core::error_envelope::OpenAiError;
use std::sync::Arc;
/// Fallback output-token budget when neither the request nor the model's
/// advertised limit gives one. Bounds the reservation so a capped key is
/// still gated even on under-specified requests (#52).
pub const FALLBACK_MAX_OUTPUT: u64 = 4096;
/// Invoked exactly once at request completion with best-effort
/// `(prompt_tokens, completion_tokens)`. When no usage could be observed
/// (e.g. a pre-dispatch failure or a dropped stream) it is dropped unused —
/// which releases the held reservation via [`ReservationGuard`]'s `Drop`.
pub type UsageSink = Box<dyn FnOnce(u64, u64) + Send>;
/// Reconstruct the principal from the cortex-stamped internal headers. The
/// auth middleware strips any client copy and stamps the authoritative value,
/// so these headers are trustworthy within cortex. `None` for anonymous
/// (unauthenticated) requests.
pub fn principal_from_headers(headers: &HeaderMap) -> Option<Principal> {
let account_id = headers.get(HEADER_ACCOUNT_ID)?.to_str().ok()?.to_string();
let key_id = headers.get(HEADER_KEY_ID)?.to_str().ok()?.to_string();
Some(Principal { account_id, key_id })
}
/// Emit per-principal spend counters (#51). Labelled by account/key only —
/// both are operator-bounded, so cardinality is controlled.
pub fn record_spend(principal: &Principal, prompt: u64, completion: u64) {
let labels = [
("account", principal.account_id.clone()),
("key", principal.key_id.clone()),
];
metrics::counter!("cortex_spend_tokens_total", &labels).increment(prompt + completion);
metrics::counter!("cortex_spend_prompt_tokens_total", &labels).increment(prompt);
metrics::counter!("cortex_spend_completion_tokens_total", &labels).increment(completion);
}
/// Holds a budget reservation for the life of a request. [`settle`] records
/// actual spend and releases the remainder; an un-settled guard releases the
/// whole reservation when dropped. Anonymous requests carry an empty guard,
/// where every operation is a no-op.
///
/// [`settle`]: ReservationGuard::settle
pub struct ReservationGuard {
provider: Arc<dyn EntitlementProvider>,
reservation: Option<cortex_core::entitlements::Reservation>,
}
impl ReservationGuard {
/// An empty guard for an anonymous request — no reservation to resolve.
pub fn anonymous(provider: Arc<dyn EntitlementProvider>) -> Self {
Self {
provider,
reservation: None,
}
}
/// Wrap an already-acquired reservation.
fn held(
provider: Arc<dyn EntitlementProvider>,
reservation: cortex_core::entitlements::Reservation,
) -> Self {
Self {
provider,
reservation: Some(reservation),
}
}
/// Settle with the tokens actually consumed, disarming the drop-release.
/// Spawns the (fast, in-process for the local provider) settle so the
/// caller — which may be a sync stream-completion callback — needn't
/// await.
pub fn settle(mut self, actual_tokens: u64) {
if let Some(reservation) = self.reservation.take() {
let provider = Arc::clone(&self.provider);
tokio::spawn(async move {
provider.settle(reservation, actual_tokens).await;
});
}
}
}
impl Drop for ReservationGuard {
fn drop(&mut self) {
if let Some(reservation) = self.reservation.take() {
let provider = Arc::clone(&self.provider);
tokio::spawn(async move {
provider.release(reservation).await;
});
}
}
}
/// Build the completion sink for an authenticated request: record spend and
/// settle the reservation with the observed total. Dropping it unused (no
/// usage observed) releases the reservation via the guard.
pub fn usage_sink(principal: Principal, guard: ReservationGuard) -> UsageSink {
Box::new(move |prompt, completion| {
record_spend(&principal, prompt, completion);
guard.settle(prompt + completion);
})
}
/// Reserve the request's upper-bound token cost for the principal, refusing
/// *before* dispatch if it would exceed the hard cap (#52). On success
/// returns a guard the caller settles with actual usage; on refusal returns
/// the #63 envelope (`rate_limit_exceeded` + `Retry-After` for a resetting
/// window, `insufficient_quota` for a hard balance — never `402`).
pub async fn reserve_or_reject(
provider: Arc<dyn EntitlementProvider>,
principal: &Principal,
max_tokens: u64,
) -> Result<ReservationGuard, OpenAiError> {
match provider.reserve(principal, max_tokens).await {
Ok(reservation) => Ok(ReservationGuard::held(provider, reservation)),
Err(err) => Err(budget_error_to_envelope(err)),
}
}
/// Map a [`BudgetError`] to the #63 envelope. The provider chose the window
/// semantics; this only translates them to HTTP.
fn budget_error_to_envelope(err: BudgetError) -> OpenAiError {
match err {
BudgetError::RateLimited {
retry_after_secs, ..
} => OpenAiError::rate_limit_exceeded(err.to_string(), retry_after_secs),
BudgetError::InsufficientQuota { .. } => OpenAiError::insufficient_quota(err.to_string()),
}
}
/// Upper-bound tokens to reserve for a request (#52): an over-estimate of
/// the prompt plus the maximum output. `advertised_output` is the model's
/// `limit.output` (#62), used when the request omits `max_(completion_)tokens`.
/// Over-reserving is safe — settle corrects spend to the actual usage.
pub fn reservation_estimate(body: &[u8], advertised_output: Option<u64>) -> u64 {
let max_output = requested_max_output(body)
.or(advertised_output)
.unwrap_or(FALLBACK_MAX_OUTPUT);
estimate_prompt_tokens(body).saturating_add(max_output)
}
/// The client's requested output cap, from `max_completion_tokens` (or the
/// legacy `max_tokens`). `None` when unspecified.
fn requested_max_output(body: &[u8]) -> Option<u64> {
let v: serde_json::Value = serde_json::from_slice(body).ok()?;
v.get("max_completion_tokens")
.or_else(|| v.get("max_tokens"))
.and_then(serde_json::Value::as_u64)
}
/// Rough prompt-token estimate at ~4 chars/token over the whole body. cortex
/// has no tokenizer; JSON overhead makes this a conservative over-estimate,
/// and neuron remains the exact context wall (#56/#60). Settle reconciles to
/// the real usage afterward.
fn estimate_prompt_tokens(body: &[u8]) -> u64 {
(body.len() as u64 / 4).max(1)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn requested_max_output_prefers_max_completion_tokens() {
let body = br#"{"model":"m","max_completion_tokens":256,"max_tokens":99}"#;
assert_eq!(requested_max_output(body), Some(256));
}
#[test]
fn requested_max_output_falls_back_to_legacy_max_tokens() {
let body = br#"{"model":"m","max_tokens":128}"#;
assert_eq!(requested_max_output(body), Some(128));
}
#[test]
fn estimate_uses_requested_output_when_present() {
// Requested output dominates; prompt estimate is small for a tiny body.
let body = br#"{"model":"m","max_tokens":1000}"#;
let est = reservation_estimate(body, Some(8192));
assert!(est >= 1000 && est < 1100, "est was {est}");
}
#[test]
fn estimate_uses_advertised_output_when_request_omits_it() {
let body = br#"{"model":"m","messages":[]}"#;
let est = reservation_estimate(body, Some(8192));
assert!(est >= 8192, "est was {est}");
}
#[test]
fn estimate_falls_back_when_nothing_advertised() {
let body = br#"{"model":"m"}"#;
let est = reservation_estimate(body, None);
assert!(est >= FALLBACK_MAX_OUTPUT, "est was {est}");
}
}

View File

@@ -63,4 +63,16 @@ fn describe_metrics() {
"cortex_cold_starts_total",
"Total number of cold-start model loads"
);
metrics::describe_counter!(
"cortex_spend_tokens_total",
"Total metered tokens (prompt + completion) per principal, labelled by account/key (#51)"
);
metrics::describe_counter!(
"cortex_spend_prompt_tokens_total",
"Metered prompt tokens per principal, labelled by account/key (#51)"
);
metrics::describe_counter!(
"cortex_spend_completion_tokens_total",
"Metered completion tokens per principal, labelled by account/key (#51)"
);
}

View File

@@ -5,12 +5,29 @@ use crate::state::CortexState;
use chrono::Utc;
use cortex_core::discovery::{DiscoveryResponse, HealthResponse};
use cortex_core::harness::ModelInfo;
use cortex_core::node::{ModelEntry, ModelStatus};
use cortex_core::node::{ModelEntry, ModelStatus, NodeState};
use std::sync::Arc;
use std::time::Duration;
const POLL_INTERVAL: Duration = Duration::from_secs(10);
/// Consecutive failed `/models` polls before a node is marked unhealthy.
/// Debounces transient misses (a busy neuron briefly slow to answer) so a
/// single blip can't yank a node — and its models — out of routing. At the
/// 10s poll interval this tolerates ~20s of flapping before evicting.
const POLL_FAILURE_THRESHOLD: u32 = 3;
/// Record a failed poll for `node`, marking it unhealthy only once failures
/// reach [`POLL_FAILURE_THRESHOLD`]. Below the threshold the node keeps its
/// last-known health, riding over transient misses. A successful poll resets
/// the counter (see the success arm in `poll_once`).
fn record_poll_failure(node: &mut NodeState) {
node.consecutive_poll_failures = node.consecutive_poll_failures.saturating_add(1);
if node.consecutive_poll_failures >= POLL_FAILURE_THRESHOLD {
node.healthy = false;
}
}
/// Runs forever, polling all neurons on a fixed interval.
pub async fn poll_loop(fleet: Arc<CortexState>) {
loop {
@@ -138,13 +155,14 @@ async fn poll_neuron(fleet: &CortexState, name: &str, endpoint: &str) {
// Remove models no longer reported by the neuron.
node.models.retain(|id, _| seen.contains(id));
node.consecutive_poll_failures = 0;
node.healthy = true;
node.last_poll = Some(Utc::now());
tracing::debug!(node = name, models = models.len(), "poll ok");
}
Err(e) => {
tracing::warn!(node = name, error = %e, "failed to parse /models response");
node.healthy = false;
record_poll_failure(node);
}
}
}
@@ -154,11 +172,11 @@ async fn poll_neuron(fleet: &CortexState, name: &str, endpoint: &str) {
status = %resp.status(),
"neuron returned non-success status"
);
node.healthy = false;
record_poll_failure(node);
}
Err(e) => {
tracing::warn!(node = name, error = %e, "failed to reach neuron");
node.healthy = false;
record_poll_failure(node);
}
}
@@ -200,6 +218,9 @@ async fn poll_health(fleet: &CortexState, name: &str, endpoint: &str) {
let mut nodes = fleet.nodes.write().await;
if let Some(node) = nodes.get_mut(name) {
node.activation = Some(h.activation);
// Per-model admission load (#53) → keyed by id for the
// load-aware router (#55).
node.model_load = h.models.into_iter().map(|m| (m.id.clone(), m)).collect();
}
}
Err(e) => {

View File

@@ -31,6 +31,7 @@ pub async fn forward_request(
headers: HeaderMap,
body: bytes::Bytes,
model_id: &str,
usage_sink: Option<crate::metering::UsageSink>,
) -> Result<Response, ProxyError> {
let request_start = Instant::now();
let url = format!("{}{}", route.endpoint, path);
@@ -82,7 +83,7 @@ pub async fn forward_request(
let resp_headers = upstream_resp.headers().clone();
let stream = TokenMetricsStream::new(
Box::pin(upstream_resp.bytes_stream()),
TokenMetrics::new(model_id, &route.node_name, request_start),
TokenMetrics::new(model_id, &route.node_name, request_start, usage_sink),
);
let body = Body::from_stream(stream);
@@ -186,10 +187,19 @@ struct TokenMetrics {
last_chunk: Option<Instant>,
tail: String,
finished: bool,
/// Per-principal metering hook (#51). Invoked exactly once in `finish`
/// with the observed `(prompt, completion)` so the reservation can be
/// settled and spend recorded. `None` for anonymous requests.
usage_sink: Option<crate::metering::UsageSink>,
}
impl TokenMetrics {
fn new(model_id: &str, node_name: &str, request_start: Instant) -> Self {
fn new(
model_id: &str,
node_name: &str,
request_start: Instant,
usage_sink: Option<crate::metering::UsageSink>,
) -> Self {
Self {
labels: [
("model", model_id.to_string()),
@@ -200,6 +210,7 @@ impl TokenMetrics {
last_chunk: None,
tail: String::new(),
finished: false,
usage_sink,
}
}
@@ -227,36 +238,45 @@ impl TokenMetrics {
return;
}
self.finished = true;
let Some(first) = self.first_chunk else {
return; // no body ever arrived — nothing to record
};
let ttft = first.duration_since(self.request_start).as_secs_f64();
metrics::histogram!("cortex_time_to_first_token_seconds", &self.labels).record(ttft);
if let Some(prompt) = last_count_for(&self.tail, "prompt_tokens") {
metrics::counter!("cortex_prompt_tokens_total", &self.labels).increment(prompt);
}
let Some(completion) = last_count_for(&self.tail, "completion_tokens") else {
return;
};
if completion == 0 {
return;
}
metrics::counter!("cortex_completion_tokens_total", &self.labels).increment(completion);
let prompt = last_count_for(&self.tail, "prompt_tokens");
let completion = last_count_for(&self.tail, "completion_tokens");
let last = self.last_chunk.unwrap_or(first);
let decode_window = last.duration_since(first).as_secs_f64();
// Streaming: rate over the decode window (first→last chunk).
// Non-streaming bodies arrive as ~one chunk (window ≈ 0), where
// the only honest denominator is the full request duration.
let secs = if decode_window >= 0.1 {
decode_window
} else {
last.duration_since(self.request_start).as_secs_f64()
};
if secs > 0.0 {
metrics::histogram!("cortex_tokens_per_second", &self.labels)
.record(completion as f64 / secs);
// Per-model metrics — only when body chunks actually arrived.
if let Some(first) = self.first_chunk {
let ttft = first.duration_since(self.request_start).as_secs_f64();
metrics::histogram!("cortex_time_to_first_token_seconds", &self.labels).record(ttft);
if let Some(prompt) = prompt {
metrics::counter!("cortex_prompt_tokens_total", &self.labels).increment(prompt);
}
if let Some(completion) = completion.filter(|c| *c > 0) {
metrics::counter!("cortex_completion_tokens_total", &self.labels)
.increment(completion);
let last = self.last_chunk.unwrap_or(first);
let decode_window = last.duration_since(first).as_secs_f64();
// Streaming: rate over the decode window (first→last chunk).
// Non-streaming bodies arrive as ~one chunk (window ≈ 0),
// where the only honest denominator is the full request
// duration.
let secs = if decode_window >= 0.1 {
decode_window
} else {
last.duration_since(self.request_start).as_secs_f64()
};
if secs > 0.0 {
metrics::histogram!("cortex_tokens_per_second", &self.labels)
.record(completion as f64 / secs);
}
}
}
// Per-principal metering + reservation settle (#51). Always runs so
// the reservation is resolved even when no usage/body was observed
// (sink with (0, 0) → settle 0 → release).
if let Some(sink) = self.usage_sink.take() {
sink(prompt.unwrap_or(0), completion.unwrap_or(0));
}
}
}

View File

@@ -50,6 +50,10 @@ pub enum RouteError {
"model '{model_id}' is in the catalogue but no healthy neuron's topology satisfies its constraints"
)]
NoFeasibleNeuron { model_id: String },
#[error(
"model '{model_id}' is feasible on a neuron that is currently unhealthy — retry shortly"
)]
FeasibleNodeUnhealthy { model_id: String },
#[error("cold-load of '{model_id}' on '{node}' failed: {message}")]
ColdLoadFailed {
model_id: String,
@@ -68,7 +72,9 @@ impl RouteError {
/// safe to retry the same request); everything else is 404.
pub fn http_status(&self) -> u16 {
match self {
RouteError::NoHealthyNodes | RouteError::ModelRecovering { .. } => 503,
RouteError::NoHealthyNodes
| RouteError::ModelRecovering { .. }
| RouteError::FeasibleNodeUnhealthy { .. } => 503,
_ => 404,
}
}
@@ -81,7 +87,8 @@ impl RouteError {
| RouteError::EndpointResolveFailed(_, _)
| RouteError::NoFeasibleNeuron { .. }
| RouteError::ColdLoadFailed { .. }
| RouteError::ModelRecovering { .. } => "api_error",
| RouteError::ModelRecovering { .. }
| RouteError::FeasibleNodeUnhealthy { .. } => "api_error",
}
}
@@ -94,6 +101,7 @@ impl RouteError {
RouteError::NoFeasibleNeuron { .. } => "service_unavailable",
RouteError::ColdLoadFailed { .. } => "service_unavailable",
RouteError::ModelRecovering { .. } => "service_unavailable",
RouteError::FeasibleNodeUnhealthy { .. } => "service_unavailable",
}
}
@@ -105,6 +113,7 @@ impl RouteError {
pub fn retry_after_secs(&self) -> Option<u64> {
match self {
RouteError::ModelRecovering { .. } => Some(2),
RouteError::FeasibleNodeUnhealthy { .. } => Some(3),
RouteError::NoHealthyNodes => Some(5),
_ => None,
}
@@ -132,7 +141,9 @@ pub async fn resolve(
// Snapshot loaded / unloaded / recovering state from the poller cache.
let (loaded_route, unloaded_route, recovering_node, any_healthy) = {
let nodes = fleet.nodes.read().await;
let mut loaded_route = None;
// All healthy nodes with the model loaded, each with its current
// admission load (#53) so we can pick the least-busy replica (#55).
let mut loaded_candidates: Vec<(String, String, usize)> = Vec::new();
let mut unloaded_route = None;
let mut recovering_node = None;
let mut any_healthy = false;
@@ -144,8 +155,15 @@ pub async fn resolve(
if let Some(entry) = node.models.get(model_id) {
match entry.status {
ModelStatus::Loaded | ModelStatus::Reloading => {
loaded_route = Some((node.name.clone(), node.endpoint.clone(), false));
break;
// Least-busy score: in-flight + queued from the
// neuron's last /health (#53). Unknown load (no poll
// yet) scores 0 so the replica stays eligible.
let score = node
.model_load
.get(model_id)
.map(|l| l.in_flight + l.queue_depth)
.unwrap_or(0);
loaded_candidates.push((node.name.clone(), node.endpoint.clone(), score));
}
ModelStatus::Unloaded => {
if unloaded_route.is_none() {
@@ -175,6 +193,12 @@ pub async fn resolve(
}
}
}
// Pick the least-busy loaded replica; ties break by node name for
// deterministic routing. `false` = not a cold start.
let loaded_route = loaded_candidates
.into_iter()
.min_by(|a, b| a.2.cmp(&b.2).then_with(|| a.0.cmp(&b.0)))
.map(|(name, endpoint, _score)| (name, endpoint, false));
(loaded_route, unloaded_route, recovering_node, any_healthy)
};
@@ -237,11 +261,32 @@ async fn pick_feasible_neuron(
b.2.cmp(&a.2) // pinned first (true > false)
.then(a.0.cmp(&b.0))
});
let pick = candidates.into_iter().next();
pick.map(|(n, e, _)| (n, e))
.ok_or_else(|| RouteError::NoFeasibleNeuron {
if let Some((n, e, _)) = candidates.into_iter().next() {
return Ok((n, e));
}
// No *healthy* feasible neuron. Distinguish a transient outage from a
// permanent misconfiguration: if some neuron is topologically feasible
// but currently unhealthy (e.g. it briefly missed polls while busy),
// this is retryable — return 503 + Retry-After so the client backs off
// and retries instead of treating a 404 as a hard failure. Only when no
// neuron could *ever* satisfy the topology is it a permanent 404.
let feasible_but_unhealthy = nodes.values().any(|node| {
!node.healthy
&& node
.discovery
.as_ref()
.is_some_and(|disc| profile.is_feasible_on(&node.name, &disc.devices))
});
if feasible_but_unhealthy {
Err(RouteError::FeasibleNodeUnhealthy {
model_id: profile.id.clone(),
})
} else {
Err(RouteError::NoFeasibleNeuron {
model_id: profile.id.clone(),
})
}
}
/// Issue `POST {endpoint}/models/load` for this profile on this neuron,

View File

@@ -37,6 +37,8 @@ impl CortexState {
last_poll: None,
discovery: None,
activation: None,
model_load: HashMap::new(),
consecutive_poll_failures: 0,
},
);
}

View File

@@ -0,0 +1,272 @@
//! Integration tests for API-key auth + principal resolution (#49).
//!
//! Verifies the #63 rejection contract (401 invalid_api_key via the #60
//! envelope) and that an authenticated request reaches neuron carrying the
//! internal principal headers — while a client-supplied principal header is
//! stripped (anti-spoofing).
use axum::Json;
use axum::extract::Path;
use axum::http::HeaderMap;
use axum::routing::{get, post};
use cortex_core::config::{
ApiKeyConfig, EntitlementsConfig, EvictionSettings, EvictionStrategy, GatewayConfig,
GatewaySettings, NeuronEndpoint,
};
use cortex_core::entitlements::{CapWindow, HEADER_ACCOUNT_ID, HEADER_KEY_ID};
use cortex_core::node::{ModelEntry, ModelStatus};
use cortex_gateway::state::CortexState;
use serde_json::{Value, json};
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;
/// What the mock neuron observed on the inbound `/v1/chat/completions`
/// request: the principal headers cortex stamped (or didn't).
#[derive(Default)]
struct Seen {
account_id: Option<String>,
key_id: Option<String>,
}
/// Spawn a mock neuron that records the principal headers it receives and
/// returns a trivial chat completion. Returns (base_url, observed).
async fn spawn_capturing_neuron() -> (String, Arc<Mutex<Seen>>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let base_url = format!("http://{addr}");
let inference_url = base_url.clone();
let seen: Arc<Mutex<Seen>> = Arc::new(Mutex::new(Seen::default()));
let sink = Arc::clone(&seen);
let app = axum::Router::new()
.route(
"/models/{model_id}/endpoint",
get(move |Path(_): Path<String>| {
let url = inference_url.clone();
async move { Json(json!({ "url": url })) }
}),
)
.route(
"/v1/chat/completions",
post(move |headers: HeaderMap, Json(body): Json<Value>| {
let sink = Arc::clone(&sink);
async move {
{
let mut s = sink.lock().unwrap();
s.account_id = headers
.get(HEADER_ACCOUNT_ID)
.and_then(|v| v.to_str().ok())
.map(str::to_string);
s.key_id = headers
.get(HEADER_KEY_ID)
.and_then(|v| v.to_str().ok())
.map(str::to_string);
}
let model = body.get("model").and_then(Value::as_str).unwrap_or("m");
Json(json!({
"id": "chatcmpl-auth-001",
"object": "chat.completion",
"created": 1700000000_u64,
"model": model,
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": "ok"},
"finish_reason": "stop"
}],
"usage": {"prompt_tokens": 3, "completion_tokens": 1, "total_tokens": 4}
}))
}
}),
)
.with_state(());
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
(base_url, seen)
}
/// Spawn a gateway with the given entitlements config, a single neuron, and
/// `test-model` seeded as loaded (build_app spawns no poller).
async fn spawn_gateway(neuron_url: &str, entitlements: EntitlementsConfig) -> String {
let config = GatewayConfig {
gateway: GatewaySettings {
listen: "127.0.0.1:0".into(),
metrics_listen: "127.0.0.1:0".into(),
},
eviction: EvictionSettings {
strategy: EvictionStrategy::Lru,
defrag_after_cycles: 0,
},
neurons: vec![NeuronEndpoint {
name: "mock-node".into(),
endpoint: neuron_url.to_string(),
}],
models_config: "/dev/null".into(),
entitlements,
};
let fleet = Arc::new(CortexState::from_config(&config));
{
let mut nodes = fleet.nodes.write().await;
let node = nodes.get_mut("mock-node").unwrap();
node.healthy = true;
node.models.insert(
"test-model".into(),
ModelEntry {
id: "test-model".into(),
status: ModelStatus::Loaded,
last_accessed: None,
vram_estimate_mb: Some(8000),
capabilities: Vec::new(),
tool_call: false,
reasoning: false,
limit: None,
},
);
}
let app = cortex_gateway::build_app(Arc::clone(&fleet));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
format!("http://{addr}")
}
fn one_key_config(require_auth: bool) -> EntitlementsConfig {
EntitlementsConfig {
require_auth,
keys: vec![ApiKeyConfig {
key: "sk-good".into(),
account_id: "acct-1".into(),
key_id: Some("key-1".into()),
hard_cap: None,
window: CapWindow::Balance,
}],
}
}
fn chat_body() -> Value {
json!({
"model": "test-model",
"messages": [{"role": "user", "content": "hi"}]
})
}
#[tokio::test]
async fn missing_key_when_required_is_401_invalid_api_key() {
let (neuron, _seen) = spawn_capturing_neuron().await;
let gateway = spawn_gateway(&neuron, one_key_config(true)).await;
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
.json(&chat_body())
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::UNAUTHORIZED);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"]["code"], "invalid_api_key");
assert_eq!(body["error"]["type"], "invalid_request_error");
}
#[tokio::test]
async fn unrecognized_key_is_ignored_when_auth_not_required() {
let (neuron, seen) = spawn_capturing_neuron().await;
// allow-anonymous mode: a placeholder/unknown bearer (as opencode,
// Open WebUI, Agent Zero, litellm all send by default) must NOT be
// rejected — it's ignored and the request is served anonymously.
let gateway = spawn_gateway(&neuron, one_key_config(false)).await;
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
.bearer_auth("sk-dummy-placeholder")
.json(&chat_body())
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let _ = resp.bytes().await.unwrap();
// Served, but anonymous — no principal stamped from the bogus key.
assert!(seen.lock().unwrap().account_id.is_none());
}
#[tokio::test]
async fn invalid_key_is_401_when_auth_required() {
let (neuron, seen) = spawn_capturing_neuron().await;
// With auth required, a present-but-wrong credential is rejected.
let gateway = spawn_gateway(&neuron, one_key_config(true)).await;
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
.bearer_auth("sk-wrong")
.json(&chat_body())
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::UNAUTHORIZED);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"]["code"], "invalid_api_key");
// Rejected before dispatch — neuron never saw the request.
assert!(seen.lock().unwrap().account_id.is_none());
}
#[tokio::test]
async fn valid_key_reaches_neuron_with_principal_headers() {
let (neuron, seen) = spawn_capturing_neuron().await;
let gateway = spawn_gateway(&neuron, one_key_config(true)).await;
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
.bearer_auth("sk-good")
// A spoofed principal header must be stripped, not forwarded.
.header(HEADER_ACCOUNT_ID, "attacker")
.json(&chat_body())
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let s = seen.lock().unwrap();
assert_eq!(s.account_id.as_deref(), Some("acct-1"));
assert_eq!(s.key_id.as_deref(), Some("key-1"));
}
#[tokio::test]
async fn anonymous_allowed_when_auth_not_required() {
let (neuron, seen) = spawn_capturing_neuron().await;
let gateway = spawn_gateway(&neuron, EntitlementsConfig::default()).await;
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
.json(&chat_body())
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
// No principal resolved → no principal headers stamped.
let s = seen.lock().unwrap();
assert!(s.account_id.is_none());
assert!(s.key_id.is_none());
}
#[tokio::test]
async fn health_is_public_even_when_auth_required() {
let (neuron, _seen) = spawn_capturing_neuron().await;
let gateway = spawn_gateway(&neuron, one_key_config(true)).await;
let resp = reqwest::Client::new()
.get(format!("{gateway}/health"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
}

View File

@@ -0,0 +1,253 @@
//! Integration tests for budget enforcement (#52) — the A0 seatbelt.
//!
//! A reservation over the key's hard cap is refused *before* neuron is hit,
//! with the #63 code matching the cap-window semantics (rate_limit_exceeded
//! + Retry-After for a resetting window, insufficient_quota for a hard
//! balance). Spend never exceeds the cap. No 402, ever.
use axum::Json;
use axum::extract::Path;
use axum::routing::{get, post};
use cortex_core::config::{
ApiKeyConfig, EntitlementsConfig, EvictionSettings, EvictionStrategy, GatewayConfig,
GatewaySettings, NeuronEndpoint,
};
use cortex_core::entitlements::{CapWindow, Principal};
use cortex_core::node::{ModelEntry, ModelStatus};
use cortex_gateway::state::CortexState;
use serde_json::{Value, json};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::net::TcpListener;
/// Mock neuron with a hit counter on the inference path, so a test can prove
/// a request was (or wasn't) dispatched.
async fn spawn_counting_neuron() -> (String, Arc<AtomicU64>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let base_url = format!("http://{addr}");
let inference_url = base_url.clone();
let hits = Arc::new(AtomicU64::new(0));
let sink = Arc::clone(&hits);
let app = axum::Router::new()
.route(
"/models/{model_id}/endpoint",
get(move |Path(_): Path<String>| {
let url = inference_url.clone();
async move { Json(json!({ "url": url })) }
}),
)
.route(
"/v1/chat/completions",
post(move |Json(body): Json<Value>| {
let sink = Arc::clone(&sink);
async move {
sink.fetch_add(1, Ordering::SeqCst);
let model = body.get("model").and_then(Value::as_str).unwrap_or("m");
Json(json!({
"id": "chatcmpl-budget",
"object": "chat.completion",
"created": 1700000000_u64,
"model": model,
"choices": [{"index": 0, "message": {"role": "assistant", "content": "ok"}, "finish_reason": "stop"}],
"usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}
}))
}
}),
);
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
(base_url, hits)
}
async fn spawn_gateway(neuron_url: &str, key: ApiKeyConfig) -> (Arc<CortexState>, String) {
let config = GatewayConfig {
gateway: GatewaySettings {
listen: "127.0.0.1:0".into(),
metrics_listen: "127.0.0.1:0".into(),
},
eviction: EvictionSettings {
strategy: EvictionStrategy::Lru,
defrag_after_cycles: 0,
},
neurons: vec![NeuronEndpoint {
name: "mock-node".into(),
endpoint: neuron_url.to_string(),
}],
models_config: "/dev/null".into(),
entitlements: EntitlementsConfig {
require_auth: true,
keys: vec![key],
},
};
let fleet = Arc::new(CortexState::from_config(&config));
{
let mut nodes = fleet.nodes.write().await;
let node = nodes.get_mut("mock-node").unwrap();
node.healthy = true;
node.models.insert(
"test-model".into(),
ModelEntry {
id: "test-model".into(),
status: ModelStatus::Loaded,
last_accessed: None,
vram_estimate_mb: Some(8000),
capabilities: Vec::new(),
tool_call: false,
reasoning: false,
limit: None,
},
);
}
let app = cortex_gateway::build_app(Arc::clone(&fleet));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
(fleet, format!("http://{addr}"))
}
fn key(window: CapWindow, hard_cap: u64) -> ApiKeyConfig {
ApiKeyConfig {
key: "sk-cap".into(),
account_id: "acct-cap".into(),
key_id: Some("key-cap".into()),
hard_cap: Some(hard_cap),
window,
}
}
fn chat(max_tokens: u64) -> Value {
json!({
"model": "test-model",
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": "hi"}]
})
}
#[tokio::test]
async fn balance_over_cap_is_429_insufficient_quota_before_dispatch() {
let (neuron, hits) = spawn_counting_neuron().await;
// Cap far below a single request's reservation (max_tokens 1000).
let (_fleet, gateway) = spawn_gateway(&neuron, key(CapWindow::Balance, 10)).await;
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
.bearer_auth("sk-cap")
.json(&chat(1000))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::TOO_MANY_REQUESTS);
// Hard balance → no Retry-After.
assert!(resp.headers().get(reqwest::header::RETRY_AFTER).is_none());
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"]["code"], "insufficient_quota");
// Refused before dispatch — neuron never saw it.
assert_eq!(hits.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn rolling_over_cap_is_429_rate_limited_with_retry_after() {
let (neuron, hits) = spawn_counting_neuron().await;
let (_fleet, gateway) =
spawn_gateway(&neuron, key(CapWindow::Rolling { seconds: 3600 }, 10)).await;
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
.bearer_auth("sk-cap")
.json(&chat(1000))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::TOO_MANY_REQUESTS);
let retry = resp
.headers()
.get(reqwest::header::RETRY_AFTER)
.expect("rolling-window rejection must carry Retry-After");
assert!(retry.to_str().unwrap().parse::<u64>().unwrap() >= 1);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"]["code"], "rate_limit_exceeded");
assert_eq!(hits.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn within_cap_is_served() {
let (neuron, hits) = spawn_counting_neuron().await;
let (_fleet, gateway) = spawn_gateway(&neuron, key(CapWindow::Balance, 1_000_000)).await;
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
.bearer_auth("sk-cap")
.json(&chat(50))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let _ = resp.bytes().await.unwrap();
assert_eq!(hits.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn a0_seatbelt_caps_a_runaway_fan_out() {
// An Agent-Zero-style key with a modest cap: a burst of requests drains
// it, then further requests are refused — the account stops draining and
// spend never exceeds the cap.
let (neuron, hits) = spawn_counting_neuron().await;
let (fleet, gateway) = spawn_gateway(&neuron, key(CapWindow::Balance, 100)).await;
let client = reqwest::Client::new();
let mut ok = 0;
let mut refused = 0;
for _ in 0..20 {
let resp = client
.post(format!("{gateway}/v1/chat/completions"))
.bearer_auth("sk-cap")
.json(&chat(20))
.send()
.await
.unwrap();
match resp.status() {
reqwest::StatusCode::OK => {
ok += 1;
let _ = resp.bytes().await.unwrap();
}
reqwest::StatusCode::TOO_MANY_REQUESTS => {
refused += 1;
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"]["code"], "insufficient_quota");
}
other => panic!("unexpected status {other}"),
}
}
assert!(ok >= 1, "some requests should be served");
assert!(refused >= 1, "the cap must eventually refuse the fan-out");
assert_eq!(
hits.load(Ordering::SeqCst),
ok,
"refused requests never dispatched"
);
// Spend never exceeded the hard cap (reservation prevents overshoot).
// Poll briefly for in-flight settles to land.
let principal = Principal {
account_id: "acct-cap".into(),
key_id: "key-cap".into(),
};
for _ in 0..50 {
let snap = fleet.entitlements.snapshot(&principal).await.unwrap();
if snap.reserved == 0 {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
}
let snap = fleet.entitlements.snapshot(&principal).await.unwrap();
assert!(snap.spent <= 100, "spent {} exceeded cap", snap.spent);
}

View File

@@ -0,0 +1,124 @@
//! Router: a catalogued model whose only topologically-feasible neuron is
//! currently unhealthy is a *transient* condition (retryable 503), not a
//! permanent 404. This is the exact shape of the beast incident: benjy/
//! quadbrat (1 GPU, healthy) can't host the 27B, and beast (2 GPU) — the
//! sole feasible node — briefly drops out → clients must back off and retry,
//! not hard-fail.
use cortex_core::config::{
EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NeuronEndpoint,
};
use cortex_core::discovery::{DeviceInfo, DiscoveryResponse};
use cortex_gateway::router::{self, RouteError};
use cortex_gateway::state::CortexState;
use std::sync::Arc;
fn devices(n: usize) -> Vec<DeviceInfo> {
(0..n)
.map(|i| DeviceInfo {
index: i as u32,
name: "RTX 5090".into(),
vram_total_mb: 32_768,
compute_capability: "9.0".into(),
})
.collect()
}
fn discovery(host: &str, n_devices: usize) -> DiscoveryResponse {
DiscoveryResponse {
hostname: host.into(),
os: "Linux".into(),
kernel: "7.0".into(),
cuda_version: Some("13.0".into()),
driver_version: Some("999".into()),
devices: devices(n_devices),
harnesses: vec!["candle".into()],
cuda_unavailable_reason: None,
max_prompt_tokens: 49_152,
}
}
/// Catalogue with one model needing 2 devices. Returns a temp path.
fn write_catalogue() -> std::path::PathBuf {
let toml = r#"
[[models]]
id = "big-model"
harness = "candle"
min_devices = 2
"#;
let path = std::env::temp_dir().join("cortex_test_feasibility_models.toml");
std::fs::write(&path, toml).unwrap();
path
}
async fn fleet_with(big_healthy: bool, big_devices: usize) -> Arc<CortexState> {
let cat = write_catalogue();
let config = GatewayConfig {
gateway: GatewaySettings {
listen: "127.0.0.1:0".into(),
metrics_listen: "127.0.0.1:0".into(),
},
eviction: EvictionSettings {
strategy: EvictionStrategy::Lru,
defrag_after_cycles: 0,
},
neurons: vec![
NeuronEndpoint {
name: "small".into(),
endpoint: "http://127.0.0.1:1".into(),
},
NeuronEndpoint {
name: "big".into(),
endpoint: "http://127.0.0.1:2".into(),
},
],
models_config: cat.to_string_lossy().into_owned(),
entitlements: Default::default(),
};
let fleet = Arc::new(CortexState::from_config(&config));
{
let mut nodes = fleet.nodes.write().await;
// "small" is healthy but only has 1 GPU → not feasible for the model.
let small = nodes.get_mut("small").unwrap();
small.healthy = true;
small.discovery = Some(discovery("small", 1));
// "big" has enough GPUs but its health is the variable under test.
let big = nodes.get_mut("big").unwrap();
big.healthy = big_healthy;
big.discovery = Some(discovery("big", big_devices));
}
fleet
}
#[tokio::test]
async fn feasible_node_unhealthy_is_transient_503() {
// big (2 GPU, the only feasible node) is unhealthy; small (1 GPU) is
// healthy but can't host the model → retryable, not a permanent 404.
let fleet = fleet_with(false, 2).await;
let err = router::resolve(&fleet, "big-model")
.await
.expect_err("model can't be served right now");
assert!(
matches!(err, RouteError::FeasibleNodeUnhealthy { .. }),
"expected FeasibleNodeUnhealthy, got {err:?}"
);
assert_eq!(err.http_status(), 503);
assert_eq!(err.retry_after_secs(), Some(3));
assert_eq!(err.code(), "service_unavailable");
}
#[tokio::test]
async fn no_node_can_ever_satisfy_is_permanent_404() {
// big is healthy but only has 1 GPU now (e.g. topology genuinely can't
// satisfy min_devices=2 anywhere) → permanent, non-retryable 404.
let fleet = fleet_with(true, 1).await;
let err = router::resolve(&fleet, "big-model")
.await
.expect_err("no feasible topology");
assert!(
matches!(err, RouteError::NoFeasibleNeuron { .. }),
"expected NoFeasibleNeuron, got {err:?}"
);
assert_eq!(err.http_status(), 404);
assert_eq!(err.retry_after_secs(), None);
}

View File

@@ -0,0 +1,189 @@
//! Load-aware routing across replicas (#55).
//!
//! When a model is loaded on more than one healthy neuron, the router picks
//! the least-busy replica using the per-model admission load each neuron
//! reports on `GET /health` (#53), rather than always taking the first.
mod common;
use axum::Json;
use axum::extract::Path;
use axum::http::{StatusCode, header};
use axum::response::IntoResponse;
use axum::routing::{get, post};
use cortex_core::config::{
EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NeuronEndpoint,
};
use cortex_core::discovery::ModelLoad;
use cortex_core::node::{ModelEntry, ModelStatus};
use cortex_gateway::state::CortexState;
use serde_json::{Value, json};
use std::sync::Arc;
use tokio::net::TcpListener;
/// Seed a node as healthy with `test-model` loaded and a given admission load.
async fn seed_loaded(fleet: &CortexState, node: &str, in_flight: usize, queue_depth: usize) {
let mut nodes = fleet.nodes.write().await;
let n = nodes.get_mut(node).expect("node exists");
n.healthy = true;
n.models.insert(
"test-model".into(),
ModelEntry {
id: "test-model".into(),
status: ModelStatus::Loaded,
last_accessed: None,
vram_estimate_mb: Some(8000),
capabilities: Vec::new(),
tool_call: false,
reasoning: false,
limit: None,
},
);
n.model_load.insert(
"test-model".into(),
ModelLoad {
id: "test-model".into(),
in_flight,
queue_depth,
},
);
}
/// Build a gateway state over two mock neurons (no poller; we seed state).
async fn two_neuron_fleet(endpoint_a: &str, endpoint_b: &str) -> Arc<CortexState> {
let config = GatewayConfig {
gateway: GatewaySettings {
listen: "127.0.0.1:0".into(),
metrics_listen: "127.0.0.1:0".into(),
},
eviction: EvictionSettings {
strategy: EvictionStrategy::Lru,
defrag_after_cycles: 0,
},
neurons: vec![
NeuronEndpoint {
name: "node-a".into(),
endpoint: endpoint_a.to_string(),
},
NeuronEndpoint {
name: "node-b".into(),
endpoint: endpoint_b.to_string(),
},
],
models_config: "/dev/null".into(),
entitlements: Default::default(),
};
Arc::new(CortexState::from_config(&config))
}
#[tokio::test]
async fn routes_to_least_busy_replica() {
let neuron_a = common::spawn_mock_neuron().await;
let neuron_b = common::spawn_mock_neuron().await;
let fleet = two_neuron_fleet(&neuron_a, &neuron_b).await;
// A is busy (1 running + 3 queued), B is idle.
seed_loaded(&fleet, "node-a", 1, 3).await;
seed_loaded(&fleet, "node-b", 0, 0).await;
let route = cortex_gateway::router::resolve(&fleet, "test-model")
.await
.expect("model is loaded on both nodes");
assert_eq!(route.node_name, "node-b", "should pick the idle replica");
// Flip the load: now B is the busy one.
seed_loaded(&fleet, "node-a", 0, 0).await;
seed_loaded(&fleet, "node-b", 1, 5).await;
let route = cortex_gateway::router::resolve(&fleet, "test-model")
.await
.expect("still loaded");
assert_eq!(route.node_name, "node-a", "should follow the lighter load");
}
/// Mock neuron whose inference endpoint always returns a #63 backpressure
/// envelope (503 + Retry-After) — simulating a saturated neuron.
async fn spawn_busy_neuron() -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let base_url = format!("http://{addr}");
let inference_url = base_url.clone();
let app = axum::Router::new()
.route(
"/models/{model_id}/endpoint",
get(move |Path(_): Path<String>| {
let url = inference_url.clone();
async move { Json(json!({ "url": url })) }
}),
)
.route(
"/v1/chat/completions",
post(|| async {
let body = json!({"error": {
"message": "model is busy (admission queue full); retry shortly",
"type": "rate_limit_error",
"code": "rate_limit_exceeded",
"param": null
}});
(
StatusCode::SERVICE_UNAVAILABLE,
[(header::RETRY_AFTER, "6")],
Json(body),
)
.into_response()
}),
);
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
base_url
}
#[tokio::test]
async fn neuron_backpressure_is_propagated_intact() {
// A saturated neuron's 503 + Retry-After + envelope must reach the client
// verbatim — not unwrapped, remapped, or stripped (#55 / #63).
let neuron = spawn_busy_neuron().await;
let fleet = two_neuron_fleet(&neuron, &neuron).await;
seed_loaded(&fleet, "node-a", 1, 8).await;
let app = cortex_gateway::build_app(Arc::clone(&fleet));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
let resp = reqwest::Client::new()
.post(format!("http://{addr}/v1/chat/completions"))
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(
resp.headers()
.get(reqwest::header::RETRY_AFTER)
.and_then(|v| v.to_str().ok()),
Some("6"),
"Retry-After must survive the proxy"
);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"]["code"], "rate_limit_exceeded");
}
#[tokio::test]
async fn ties_break_deterministically_by_name() {
let neuron_a = common::spawn_mock_neuron().await;
let neuron_b = common::spawn_mock_neuron().await;
let fleet = two_neuron_fleet(&neuron_a, &neuron_b).await;
// Equal load on both → stable pick (lowest node name).
seed_loaded(&fleet, "node-a", 0, 0).await;
seed_loaded(&fleet, "node-b", 0, 0).await;
let route = cortex_gateway::router::resolve(&fleet, "test-model")
.await
.expect("loaded");
assert_eq!(route.node_name, "node-a", "ties break by name");
}

View File

@@ -0,0 +1,207 @@
//! Integration tests for per-request token metering (#51).
//!
//! Drives authenticated requests through the gateway to a mock neuron that
//! reports a fixed `usage` object, then asserts the EntitlementProvider's
//! spend ledger reflects cumulative per-key spend and that reservations
//! settle to actual (no outstanding reserved tokens once requests complete).
mod common;
use cortex_core::config::{
ApiKeyConfig, EntitlementsConfig, EvictionSettings, EvictionStrategy, GatewayConfig,
GatewaySettings, NeuronEndpoint,
};
use cortex_core::entitlements::{CapWindow, Principal};
use cortex_core::node::{ModelEntry, ModelStatus};
use cortex_gateway::state::CortexState;
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
const ACCOUNT: &str = "acct-meter";
const KEY_ID: &str = "key-meter";
const BEARER: &str = "sk-meter";
/// The mock neuron (common::spawn_mock_neuron) reports this fixed usage on
/// every chat completion.
const PROMPT_PER_REQ: u64 = 10;
const COMPLETION_PER_REQ: u64 = 5;
async fn spawn_metered_gateway(neuron_url: &str) -> (Arc<CortexState>, String) {
let config = GatewayConfig {
gateway: GatewaySettings {
listen: "127.0.0.1:0".into(),
metrics_listen: "127.0.0.1:0".into(),
},
eviction: EvictionSettings {
strategy: EvictionStrategy::Lru,
defrag_after_cycles: 0,
},
neurons: vec![NeuronEndpoint {
name: "mock-node".into(),
endpoint: neuron_url.to_string(),
}],
models_config: "/dev/null".into(),
entitlements: EntitlementsConfig {
require_auth: true,
keys: vec![ApiKeyConfig {
key: BEARER.into(),
account_id: ACCOUNT.into(),
key_id: Some(KEY_ID.into()),
hard_cap: Some(1_000_000),
window: CapWindow::Balance,
}],
},
};
let fleet = Arc::new(CortexState::from_config(&config));
{
let mut nodes = fleet.nodes.write().await;
let node = nodes.get_mut("mock-node").unwrap();
node.healthy = true;
node.models.insert(
"test-model".into(),
ModelEntry {
id: "test-model".into(),
status: ModelStatus::Loaded,
last_accessed: None,
vram_estimate_mb: Some(8000),
capabilities: Vec::new(),
tool_call: false,
reasoning: false,
limit: None,
},
);
}
let app = cortex_gateway::build_app(Arc::clone(&fleet));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
(fleet, format!("http://{addr}"))
}
fn principal() -> Principal {
Principal {
account_id: ACCOUNT.into(),
key_id: KEY_ID.into(),
}
}
/// Poll the provider ledger until settled spend reaches `expected` (settle
/// runs in a spawned task after the response stream finishes) or time out.
async fn await_spent(fleet: &CortexState, expected: u64) -> u64 {
let principal = principal();
for _ in 0..100 {
let snap = fleet.entitlements.snapshot(&principal).await.unwrap();
if snap.spent >= expected {
return snap.spent;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
fleet.entitlements.snapshot(&principal).await.unwrap().spent
}
#[tokio::test]
async fn cumulative_spend_is_metered_per_key() {
let neuron = common::spawn_mock_neuron().await;
let (fleet, gateway) = spawn_metered_gateway(&neuron).await;
let client = reqwest::Client::new();
const N: u64 = 3;
for _ in 0..N {
let resp = client
.post(format!("{gateway}/v1/chat/completions"))
.bearer_auth(BEARER)
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
// Drain the body so the response stream finishes and metering settles.
let _ = resp.bytes().await.unwrap();
}
let expected = N * (PROMPT_PER_REQ + COMPLETION_PER_REQ);
let spent = await_spent(&fleet, expected).await;
assert_eq!(
spent, expected,
"ledger must reflect cumulative per-key spend"
);
// Reservations settled to actual — nothing left outstanding.
let snap = fleet.entitlements.snapshot(&principal()).await.unwrap();
assert_eq!(snap.reserved, 0, "all reservations must settle/release");
assert_eq!(snap.hard_cap, Some(1_000_000));
}
#[tokio::test]
async fn anonymous_request_records_no_spend() {
// require_auth=false so the unauthenticated request is served, but with
// no principal it must not touch any ledger.
let neuron = common::spawn_mock_neuron().await;
let config = GatewayConfig {
gateway: GatewaySettings {
listen: "127.0.0.1:0".into(),
metrics_listen: "127.0.0.1:0".into(),
},
eviction: EvictionSettings {
strategy: EvictionStrategy::Lru,
defrag_after_cycles: 0,
},
neurons: vec![NeuronEndpoint {
name: "mock-node".into(),
endpoint: neuron.clone(),
}],
models_config: "/dev/null".into(),
entitlements: EntitlementsConfig::default(),
};
let fleet = Arc::new(CortexState::from_config(&config));
{
let mut nodes = fleet.nodes.write().await;
let node = nodes.get_mut("mock-node").unwrap();
node.healthy = true;
node.models.insert(
"test-model".into(),
ModelEntry {
id: "test-model".into(),
status: ModelStatus::Loaded,
last_accessed: None,
vram_estimate_mb: Some(8000),
capabilities: Vec::new(),
tool_call: false,
reasoning: false,
limit: None,
},
);
}
let app = cortex_gateway::build_app(Arc::clone(&fleet));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
let resp = reqwest::Client::new()
.post(format!("http://{addr}/v1/chat/completions"))
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let _ = resp.bytes().await.unwrap();
// An unconfigured principal has a zeroed snapshot — nothing was metered.
let snap = fleet
.entitlements
.snapshot(&Principal {
account_id: "nobody".into(),
key_id: "nobody".into(),
})
.await
.unwrap();
assert_eq!(snap.spent, 0);
}

View File

@@ -228,10 +228,26 @@ async fn test_poller_marks_unreachable_node_unhealthy() {
nodes.get_mut("dead-node").unwrap().healthy = true;
}
// Debounce (#53 follow-up): a single missed poll must NOT evict a
// previously-healthy node — a busy neuron briefly slow to answer
// shouldn't yank its models out of routing.
cortex_gateway::poller::poll_once(&fleet).await;
assert!(
fleet.nodes.read().await.get("dead-node").unwrap().healthy,
"one failed poll should not mark a healthy node unhealthy"
);
let nodes = fleet.nodes.read().await;
assert!(!nodes.get("dead-node").unwrap().healthy);
// It flips unhealthy only after POLL_FAILURE_THRESHOLD (3) consecutive
// failures.
cortex_gateway::poller::poll_once(&fleet).await;
cortex_gateway::poller::poll_once(&fleet).await;
assert!(
!fleet.nodes.read().await.get("dead-node").unwrap().healthy,
"three consecutive failed polls should mark the node unhealthy"
);
// A subsequent successful poll would reset the counter and restore
// health; covered implicitly by the discovery tests above.
}
#[tokio::test]

View File

@@ -0,0 +1,174 @@
//! Fail-fast prompt pre-validation + advisory client hints (#56).
//!
//! cortex refuses a prompt that already exceeds the model's advertised
//! context window before dispatching to neuron — the same #60
//! `context_length_exceeded` envelope neuron would emit, just earlier — and
//! attaches an advisory `X-Helexa-Advice` header for fingerprinted clients.
use axum::Json;
use axum::extract::Path;
use axum::routing::{get, post};
use cortex_core::config::{
EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NeuronEndpoint,
};
use cortex_core::harness::ModelLimit;
use cortex_core::node::{ModelEntry, ModelStatus};
use cortex_gateway::state::CortexState;
use serde_json::{Value, json};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::net::TcpListener;
/// Mock neuron with a hit counter, so a test can prove a request was (or
/// wasn't) dispatched past the gateway's pre-validation.
async fn spawn_counting_neuron() -> (String, Arc<AtomicU64>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let base_url = format!("http://{addr}");
let inference_url = base_url.clone();
let hits = Arc::new(AtomicU64::new(0));
let sink = Arc::clone(&hits);
let app = axum::Router::new()
.route(
"/models/{model_id}/endpoint",
get(move |Path(_): Path<String>| {
let url = inference_url.clone();
async move { Json(json!({ "url": url })) }
}),
)
.route(
"/v1/chat/completions",
post(move || {
let sink = Arc::clone(&sink);
async move {
sink.fetch_add(1, Ordering::SeqCst);
Json(json!({
"id": "c", "object": "chat.completion", "created": 1_700_000_000_u64,
"model": "test-model",
"choices": [{"index": 0, "message": {"role": "assistant", "content": "ok"}, "finish_reason": "stop"}],
"usage": {"prompt_tokens": 3, "completion_tokens": 1, "total_tokens": 4}
}))
}
}),
);
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
(base_url, hits)
}
/// Gateway over one neuron with `test-model` loaded and a tiny advertised
/// context window (so a modest prompt overflows it).
async fn spawn_gateway(neuron: &str, context: usize) -> String {
let config = GatewayConfig {
gateway: GatewaySettings {
listen: "127.0.0.1:0".into(),
metrics_listen: "127.0.0.1:0".into(),
},
eviction: EvictionSettings {
strategy: EvictionStrategy::Lru,
defrag_after_cycles: 0,
},
neurons: vec![NeuronEndpoint {
name: "mock-node".into(),
endpoint: neuron.to_string(),
}],
models_config: "/dev/null".into(),
entitlements: Default::default(),
};
let fleet = Arc::new(CortexState::from_config(&config));
{
let mut nodes = fleet.nodes.write().await;
let n = nodes.get_mut("mock-node").unwrap();
n.healthy = true;
n.models.insert(
"test-model".into(),
ModelEntry {
id: "test-model".into(),
status: ModelStatus::Loaded,
last_accessed: None,
vram_estimate_mb: Some(8000),
capabilities: Vec::new(),
tool_call: false,
reasoning: false,
limit: Some(ModelLimit {
context,
input: None,
output: 16,
}),
},
);
}
let app = cortex_gateway::build_app(Arc::clone(&fleet));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
format!("http://{addr}")
}
#[tokio::test]
async fn over_long_prompt_is_rejected_before_dispatch() {
let (neuron, hits) = spawn_counting_neuron().await;
let gateway = spawn_gateway(&neuron, 50).await; // tiny 50-token window
// ~1200 chars → ~300 est tokens, well over 50.
let big = "word ".repeat(240);
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
.header("user-agent", "litellm/1.0")
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": big}]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
// Advisory hint for the fingerprinted client (header only, never body).
assert!(
resp.headers().get("x-helexa-advice").is_some(),
"litellm should get advice"
);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"]["code"], "context_length_exceeded");
assert_eq!(body["error"]["max"], 50);
// Refused at the edge — neuron never saw it.
assert_eq!(hits.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn within_context_passes_through() {
let (neuron, hits) = spawn_counting_neuron().await;
let gateway = spawn_gateway(&neuron, 4096).await;
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let _ = resp.bytes().await.unwrap();
assert_eq!(hits.load(Ordering::SeqCst), 1, "served by neuron");
}
#[tokio::test]
async fn unknown_client_gets_no_advice_header() {
let (neuron, _hits) = spawn_counting_neuron().await;
let gateway = spawn_gateway(&neuron, 50).await;
let big = "word ".repeat(240);
let resp = reqwest::Client::new()
.post(format!("{gateway}/v1/chat/completions"))
// no/unknown User-Agent → no advice, but still a clean 400
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": big}]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
assert!(resp.headers().get("x-helexa-advice").is_none());
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"]["code"], "context_length_exceeded");
}

View File

@@ -13,6 +13,7 @@ use axum::response::sse::{Event, KeepAlive, Sse};
use axum::response::{IntoResponse, Json};
use axum::routing::{get, post};
use cortex_core::discovery::{DiscoveryResponse, HealthResponse};
use cortex_core::entitlements::{HEADER_ACCOUNT_ID, HEADER_KEY_ID};
use cortex_core::harness::ModelSpec;
use cortex_core::openai::{ChatCompletionRequest, MessageContent};
use cortex_core::responses::{ResponsesRequest, ResponsesUsage};
@@ -71,6 +72,12 @@ async fn health_handler(State(state): State<Arc<NeuronState>>) -> Json<HealthRes
// know about activation lifecycle.
let mut snapshot = state.health_cache.snapshot().await;
snapshot.activation = state.activation.snapshot().await;
// Per-model admission load (#53) — read live from the candle harness so
// cortex's load-aware router (#55) can spread traffic and propagate
// backpressure. Absent when no candle harness is present.
if let Some(candle) = &state.candle {
snapshot.models = candle.load_snapshot().await;
}
Json(snapshot)
}
@@ -228,6 +235,17 @@ fn default_enable_thinking(req: &mut ChatCompletionRequest, include_thinking: bo
}
}
/// The request's principal for fair-share admission (#54), reconstructed
/// from the internal headers cortex stamps (#49). cortex strips any
/// client-supplied copy and asserts the authoritative value, so over the
/// trusted WireGuard link these are safe to key fair-share on. `None` for an
/// unauthenticated/direct request — exempt from the per-principal cap.
fn principal_key(headers: &axum::http::HeaderMap) -> Option<String> {
let account = headers.get(HEADER_ACCOUNT_ID)?.to_str().ok()?;
let key = headers.get(HEADER_KEY_ID)?.to_str().ok()?;
Some(format!("{account}/{key}"))
}
/// OpenAI-compatible chat completions. Dispatches to streaming SSE when
/// `stream: true` is set on the request; otherwise returns a single
/// `ChatCompletionResponse`.
@@ -271,8 +289,14 @@ async fn chat_completions(
// true`) keep reasoning on.
default_enable_thinking(&mut req, include_thinking);
// Fair-share admission principal (#54), from cortex's stamped headers.
let principal = principal_key(&headers);
if req.stream.unwrap_or(false) {
match candle.chat_completion_stream_with(req, chat_config).await {
match candle
.chat_completion_stream_with(req, chat_config, principal)
.await
{
Ok(rx) => {
// Each chunk → one SSE `data: {json}` line. After the
// channel closes, append the OpenAI [DONE] terminator.
@@ -289,7 +313,7 @@ async fn chat_completions(
Err(e) => inference_error_response(e),
}
} else {
match candle.chat_completion(req).await {
match candle.chat_completion(req, principal).await {
Ok(resp) => Json(resp).into_response(),
Err(e) => inference_error_response(e),
}
@@ -302,6 +326,7 @@ async fn chat_completions(
/// event stream into the Responses event family.
async fn responses(
State(state): State<Arc<NeuronState>>,
headers: axum::http::HeaderMap,
Json(req): Json<ResponsesRequest>,
) -> impl IntoResponse {
let Some(candle) = state.candle.as_ref().map(Arc::clone) else {
@@ -336,9 +361,12 @@ async fn responses(
};
chat_req.stream = Some(stream_requested);
// Fair-share admission principal (#54), from cortex's stamped headers.
let principal = principal_key(&headers);
if stream_requested {
match candle
.responses_stream(chat_req, response_id, message_item_id)
.responses_stream(chat_req, response_id, message_item_id, principal)
.await
{
Ok(rx) => {
@@ -362,7 +390,7 @@ async fn responses(
// and translate the result. We don't currently re-tokenise
// to compute usage; the harness returns it via the chat
// response and we pass it through.
match candle.chat_completion(chat_req).await {
match candle.chat_completion(chat_req, principal).await {
Ok(chat_resp) => {
// Extract the assistant text (chat completions
// always emits one choice on the candle path).
@@ -486,6 +514,24 @@ fn inference_error_response(err: InferenceError) -> axum::response::Response {
"template_render_failed",
format!("chat template could not render this request: {detail}"),
),
// Admission control refused on load (#53): a fast, retryable "busy"
// signal. 503 (service busy) + Retry-After; opencode/AI SDK back off.
InferenceError::Overloaded { retry_after_secs } => OpenAiError::new(
503,
"rate_limit_error",
"rate_limit_exceeded",
"model is busy (admission queue full); retry shortly",
)
.with_retry_after(retry_after_secs),
// Per-principal fair-share cap (#54): 429 rate_limit_exceeded +
// Retry-After — the caller is sending too many concurrent requests.
InferenceError::PerPrincipalLimit { retry_after_secs } => OpenAiError::new(
429,
"rate_limit_error",
"rate_limit_exceeded",
"too many concurrent requests for this key; retry shortly",
)
.with_retry_after(retry_after_secs),
InferenceError::Other(e) => OpenAiError::without_code(500, "api_error", format!("{e:#}")),
};
envelope_response(env)
@@ -660,6 +706,26 @@ mod error_envelope_tests {
assert_eq!(error["required_mb"], 8_192);
}
#[tokio::test]
async fn overloaded_is_503_rate_limited_with_retry_after() {
// Admission rejection (#53) → fast, retryable backpressure.
let resp = inference_error_response(InferenceError::Overloaded {
retry_after_secs: 7,
});
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
let retry = resp
.headers()
.get(axum::http::header::RETRY_AFTER)
.expect("admission rejection must advertise Retry-After");
assert_eq!(retry.to_str().unwrap(), "7");
let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap();
let body: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(body["error"]["code"], "rate_limit_exceeded");
}
#[tokio::test]
async fn insufficient_vram_carries_retry_after() {
// Transient 503 — VRAM frees as in-flight requests finish, so the

View File

@@ -85,6 +85,68 @@ pub struct CandleHarnessConfig {
/// `/models`, and enforces it. These knobs tune that derivation.
#[serde(default)]
pub context_limit: ContextLimitConfig,
/// Admission control (#53): bounds the per-model wait queue so a busy
/// model returns a fast, retryable `429`/`503` instead of stalling new
/// requests until their client times out.
#[serde(default)]
pub admission: AdmissionConfig,
}
/// `[harness.candle.admission]` settings (#53).
///
/// Inference is batch-1, so `max_in_flight` is 1 in practice; the queue
/// (`max_queue_depth`) absorbs short bursts, and `max_wait_secs` caps how
/// long a queued request waits before it's refused with backpressure.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdmissionConfig {
/// Concurrent running requests per model. Batch-1 inference → 1.
#[serde(default = "default_admission_max_in_flight")]
pub max_in_flight: usize,
/// Queued (waiting) requests allowed beyond the in-flight one. The
/// `(max_in_flight + max_queue_depth + 1)`-th request is refused
/// immediately with `429`/`503` + `Retry-After`.
#[serde(default = "default_admission_max_queue_depth")]
pub max_queue_depth: usize,
/// Maximum seconds a queued request waits for the in-flight slot before
/// it is refused (turns the old ~300s client-side hang into a fast,
/// honest signal).
#[serde(default = "default_admission_max_wait_secs")]
pub max_wait_secs: u64,
/// Per-principal fair-share cap (#54): max in-flight + queued requests
/// for any single principal (resolved from the `x-helexa-*` headers
/// cortex stamps), so one client can't monopolize the queue while others
/// wait. Over-cap → `429 rate_limit_exceeded` + `Retry-After`. `0`
/// disables the cap; anonymous requests are always exempt.
#[serde(default = "default_admission_max_per_principal")]
pub max_per_principal: usize,
}
impl Default for AdmissionConfig {
fn default() -> Self {
Self {
max_in_flight: default_admission_max_in_flight(),
max_queue_depth: default_admission_max_queue_depth(),
max_wait_secs: default_admission_max_wait_secs(),
max_per_principal: default_admission_max_per_principal(),
}
}
}
fn default_admission_max_in_flight() -> usize {
1
}
fn default_admission_max_queue_depth() -> usize {
8
}
fn default_admission_max_wait_secs() -> u64 {
30
}
fn default_admission_max_per_principal() -> usize {
2
}
/// `[harness.candle.prefix_cache]` settings.

View File

@@ -0,0 +1,298 @@
//! Per-model admission control (#53).
//!
//! Inference against a loaded model is batch-1: one request runs at a time,
//! serialized by the model's `inference_lock` (single-GPU) / `pool` mutex
//! (TP). Before this, the wait for that lock was an **unbounded FIFO of
//! mutex waiters with no timeout** — a busy model made every new request
//! hang until its client gave up (~300s) with an opaque error.
//!
//! [`AdmissionController`] replaces that implicit unbounded wait with an
//! explicit bounded scheduler: at most `max_in_flight` running (1, batch-1)
//! plus a bounded queue of `max_queue_depth` waiters, each waiting at most
//! `max_wait`. When the queue is full or the wait elapses, the request is
//! rejected *immediately* — an honest, fast, retryable "busy" signal
//! (`429`/`503` + `Retry-After` per #63) instead of a silent stall.
//!
//! The controller is pure async (no CUDA), so the inference paths just call
//! [`AdmissionController::enter`] before taking the inference lock and hold
//! the returned [`AdmissionPermit`] for the request's lifetime. Its counters
//! ([`in_flight`](AdmissionController::in_flight) /
//! [`queue_depth`](AdmissionController::queue_depth)) are lock-free, so
//! `/health` can read live load without contending with inference.
use crate::config::AdmissionConfig;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
/// Why admission was refused. All map to the #63 backpressure envelope
/// (`rate_limit_exceeded` + `Retry-After`); they differ in cause (and HTTP
/// status — load → `503`, per-principal → `429`).
#[derive(Debug, Clone, Copy)]
pub enum AdmissionRejection {
/// The bounded wait queue was already full (server-side load).
QueueFull { retry_after_secs: u64 },
/// A queue slot was taken but the in-flight slot didn't free within
/// `max_wait` (server-side load).
Timeout { retry_after_secs: u64 },
/// This principal already has `max_per_principal` requests in flight or
/// queued (#54 fair-share) — one principal can't monopolize the model.
PrincipalCap { retry_after_secs: u64 },
}
impl AdmissionRejection {
pub fn retry_after_secs(&self) -> u64 {
match self {
AdmissionRejection::QueueFull { retry_after_secs }
| AdmissionRejection::Timeout { retry_after_secs }
| AdmissionRejection::PrincipalCap { retry_after_secs } => *retry_after_secs,
}
}
}
/// Admission accounting, mutated under a brief lock (never held across an
/// await). `pending` is queued + in-flight overall; `per_principal` is the
/// same count keyed by principal for fair-share (#54).
#[derive(Default, Debug)]
struct AdmissionState {
pending: usize,
per_principal: HashMap<String, usize>,
}
/// Bounded batch-1 scheduler for one loaded model, with per-principal
/// fair-share.
pub struct AdmissionController {
/// In-flight slots — `max_in_flight` permits (1 for batch-1).
slots: Arc<Semaphore>,
/// Queued + in-flight accounting (overall + per principal).
state: Arc<Mutex<AdmissionState>>,
/// `max_in_flight + max_queue_depth` — the overall rejection threshold.
max_pending: usize,
/// Max in-flight + queued for any single principal (#54). `0` disables.
max_per_principal: usize,
max_in_flight: usize,
max_wait: Duration,
}
impl AdmissionController {
pub fn new(cfg: &AdmissionConfig) -> Self {
// A controller with zero in-flight slots would deadlock; clamp.
let max_in_flight = cfg.max_in_flight.max(1);
Self {
slots: Arc::new(Semaphore::new(max_in_flight)),
state: Arc::new(Mutex::new(AdmissionState::default())),
max_pending: max_in_flight + cfg.max_queue_depth,
max_per_principal: cfg.max_per_principal,
max_in_flight,
max_wait: Duration::from_secs(cfg.max_wait_secs),
}
}
/// Admit a request for `principal` (`None` = anonymous, exempt from the
/// per-principal cap). Reserves a queue slot — fast-rejecting if the
/// overall queue is full or the principal is over its fair-share cap —
/// then waits up to `max_wait` for an in-flight slot. The returned permit
/// must be held for the request's lifetime; dropping it frees the slots.
pub async fn enter(
&self,
principal: Option<&str>,
) -> Result<AdmissionPermit, AdmissionRejection> {
// Decision + reservation under one brief lock so concurrent callers
// can't both slip past the thresholds. No await is held here.
{
let mut st = self.state.lock().expect("admission state poisoned");
if st.pending >= self.max_pending {
return Err(AdmissionRejection::QueueFull {
retry_after_secs: self.retry_hint(st.pending),
});
}
if let Some(p) = principal
&& self.max_per_principal > 0
&& st.per_principal.get(p).copied().unwrap_or(0) >= self.max_per_principal
{
return Err(AdmissionRejection::PrincipalCap {
retry_after_secs: self.retry_hint(st.pending),
});
}
st.pending += 1;
if let Some(p) = principal {
*st.per_principal.entry(p.to_string()).or_insert(0) += 1;
}
}
match tokio::time::timeout(self.max_wait, Arc::clone(&self.slots).acquire_owned()).await {
Ok(Ok(permit)) => Ok(AdmissionPermit {
_permit: permit,
state: Arc::clone(&self.state),
principal: principal.map(str::to_string),
}),
// Semaphore is never closed; treat a closed/elapsed wait the same.
Ok(Err(_)) | Err(_) => {
self.release(principal);
Err(AdmissionRejection::Timeout {
retry_after_secs: self.retry_hint(self.max_pending),
})
}
}
}
/// Roll back a reserved-but-not-admitted slot (wait timed out).
fn release(&self, principal: Option<&str>) {
let mut st = self.state.lock().expect("admission state poisoned");
st.pending = st.pending.saturating_sub(1);
decrement_principal(&mut st.per_principal, principal);
}
/// Requests currently running (holding an in-flight slot).
pub fn in_flight(&self) -> usize {
self.max_in_flight
.saturating_sub(self.slots.available_permits())
}
/// Requests waiting for an in-flight slot.
pub fn queue_depth(&self) -> usize {
let pending = self.state.lock().expect("admission state poisoned").pending;
pending.saturating_sub(self.in_flight())
}
/// Rough `Retry-After`: scale with how backed-up the model is, clamped to
/// a sane band. Without per-request timing this is a heuristic, but it
/// gives well-behaved clients (opencode/AI SDK) a sensible backoff.
fn retry_hint(&self, pending: usize) -> u64 {
let queued = pending.saturating_sub(self.max_in_flight) as u64;
((queued + 1) * 2).clamp(1, 120)
}
}
/// Decrement (and prune at zero) a principal's outstanding count.
fn decrement_principal(map: &mut HashMap<String, usize>, principal: Option<&str>) {
if let Some(p) = principal
&& let Some(count) = map.get_mut(p)
{
*count -= 1;
if *count == 0 {
map.remove(p);
}
}
}
/// Held for a request's lifetime; frees the in-flight + queue slot (and the
/// principal's fair-share slot) on drop.
#[derive(Debug)]
pub struct AdmissionPermit {
_permit: OwnedSemaphorePermit,
state: Arc<Mutex<AdmissionState>>,
principal: Option<String>,
}
impl Drop for AdmissionPermit {
fn drop(&mut self) {
let mut st = self.state.lock().expect("admission state poisoned");
st.pending = st.pending.saturating_sub(1);
decrement_principal(&mut st.per_principal, self.principal.as_deref());
}
}
#[cfg(test)]
mod tests {
use super::*;
/// Config with the per-principal cap disabled (0) — most tests exercise
/// the overall queue with anonymous (`None`) callers.
fn cfg(max_in_flight: usize, max_queue_depth: usize, max_wait_secs: u64) -> AdmissionConfig {
AdmissionConfig {
max_in_flight,
max_queue_depth,
max_wait_secs,
max_per_principal: 0,
}
}
#[tokio::test]
async fn admits_up_to_in_flight_and_reports_load() {
let ctrl = AdmissionController::new(&cfg(1, 4, 30));
assert_eq!(ctrl.in_flight(), 0);
let p = ctrl.enter(None).await.expect("first admits");
assert_eq!(ctrl.in_flight(), 1);
assert_eq!(ctrl.queue_depth(), 0);
drop(p);
assert_eq!(ctrl.in_flight(), 0);
}
#[tokio::test]
async fn rejects_when_queue_full() {
// 1 in-flight + 1 queue slot = capacity 2; the 3rd is refused fast.
let ctrl = Arc::new(AdmissionController::new(&cfg(1, 1, 30)));
let _running = ctrl.enter(None).await.expect("admit running");
// Fill the single queue slot with a waiter that parks on the semaphore.
let ctrl2 = Arc::clone(&ctrl);
let waiter = tokio::spawn(async move { ctrl2.enter(None).await.map(|p| drop(p)) });
// Give the waiter a moment to occupy the queue slot.
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(ctrl.queue_depth(), 1);
// Queue full → immediate QueueFull with a Retry-After hint.
match ctrl.enter(None).await {
Err(AdmissionRejection::QueueFull { retry_after_secs }) => {
assert!(retry_after_secs >= 1)
}
other => panic!("expected QueueFull, got {other:?}"),
}
// Release the runner so the parked waiter can proceed and finish.
drop(_running);
waiter.await.unwrap().unwrap();
}
#[tokio::test]
async fn rejects_on_wait_timeout() {
// Zero queue depth + a runner holding the only slot → a second
// request can't even queue, so it's QueueFull, not Timeout. Use a
// queue of 1 and a tiny max_wait to exercise the timeout path.
let ctrl = Arc::new(AdmissionController::new(&cfg(1, 1, 0)));
let _running = ctrl.enter(None).await.expect("admit running");
// max_wait 0 → the queued request times out almost immediately.
match ctrl.enter(None).await {
Err(AdmissionRejection::Timeout { .. }) => {}
other => panic!("expected Timeout, got {other:?}"),
}
// The timed-out request released its queue slot.
assert_eq!(ctrl.queue_depth(), 0);
}
#[tokio::test]
async fn per_principal_cap_protects_other_principals() {
// Generous overall queue, but each principal capped at 1 in-flight+
// queued. Principal A holds the running slot; A's second request is
// refused (PrincipalCap) rather than occupying the queue, so B's
// single request still gets a queue slot and proceeds.
let cfg = AdmissionConfig {
max_in_flight: 1,
max_queue_depth: 8,
max_wait_secs: 30,
max_per_principal: 1,
};
let ctrl = Arc::new(AdmissionController::new(&cfg));
let _a1 = ctrl.enter(Some("acct-a/key-a")).await.expect("A admits");
// A is over its fair-share cap → fast PrincipalCap, no queue slot taken.
match ctrl.enter(Some("acct-a/key-a")).await {
Err(AdmissionRejection::PrincipalCap { retry_after_secs }) => {
assert!(retry_after_secs >= 1)
}
other => panic!("expected PrincipalCap, got {other:?}"),
}
// B (a different principal) is admitted to the queue and proceeds
// once A releases — it was never stuck behind A's backlog.
let ctrl2 = Arc::clone(&ctrl);
let b = tokio::spawn(async move { ctrl2.enter(Some("acct-b/key-b")).await.map(drop) });
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(ctrl.queue_depth(), 1, "B is queued, not rejected");
drop(_a1);
b.await.unwrap().expect("B is served after A releases");
}
}

View File

@@ -33,7 +33,7 @@ use crate::wire::{
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
#[cfg(feature = "cuda")]
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
@@ -81,6 +81,9 @@ pub struct CandleHarness {
/// Context-limit derivation settings (#67), read in `list_models`
/// to compute each model's advertised `limit{context,input,output}`.
context_limit_cfg: crate::config::ContextLimitConfig,
/// Admission-control settings (#53), used to build each loaded model's
/// [`super::admission::AdmissionController`] at load time.
admission_cfg: crate::config::AdmissionConfig,
}
/// Devices/capabilities snapshot of a model entering auto-recovery
@@ -146,6 +149,16 @@ impl LoadedHandle {
}
}
/// Current admission load (#53): `(in_flight, queue_depth)`. Lock-free,
/// so `/health` can read it without contending with inference.
pub fn load(&self) -> (usize, usize) {
match self {
LoadedHandle::Single(m) => (m.admission.in_flight(), m.admission.queue_depth()),
#[cfg(feature = "cuda")]
LoadedHandle::Tp(m) => (m.admission.in_flight(), m.admission.queue_depth()),
}
}
/// Modalities the loaded model supports. Stage B7 (single-GPU) +
/// TP-vision (#12) — both single-GPU and TP loads advertise
/// `"vision"` when a replicated vision tower materialised.
@@ -192,23 +205,50 @@ impl LoadedHandle {
/// `NEURON_MAX_PROMPT_TOKENS`, when explicitly set, is applied as a
/// clamp-only upper bound on the derived `context` — a backstop, not
/// the authority. Unset → no clamp; the derivation stands alone.
pub async fn derived_limit(
/// Refresh the cached free-VRAM reading used by [`Self::derived_limit`]
/// (#53). Queries the device worker — so it MUST run off the request
/// path (background refresher / load-time seed), never from a control
/// endpoint, since the query queues behind inference on the worker.
/// Single-GPU caches the device's free VRAM; TP caches the tightest
/// free across ranks (the same value `derived_limit` used pre-cache).
pub async fn refresh_free_mb(&self) {
let free = match self {
LoadedHandle::Single(m) => m.query_vram().await.0,
#[cfg(feature = "cuda")]
LoadedHandle::Tp(m) => m.query_vram_tightest_free_mb().await,
};
// Don't clobber a good cached value with a transient `0`
// (worker gone/poisoned sentinel).
if free > 0 {
match self {
LoadedHandle::Single(m) => m.last_free_mb.store(free, Ordering::Release),
#[cfg(feature = "cuda")]
LoadedHandle::Tp(m) => m.last_free_mb.store(free, Ordering::Release),
}
}
}
pub fn derived_limit(
&self,
cfg: &crate::config::ContextLimitConfig,
) -> Option<cortex_core::harness::ModelLimit> {
if !cfg.enabled {
return None;
}
// Read the *cached* free VRAM — never query the device worker here.
// This runs on `GET /models`; a live query would queue behind
// inference on the worker thread and stall the control plane (#53).
// The cache is refreshed off the request path (load + background task).
let (profile, free_mb, rate) = match self {
LoadedHandle::Single(m) => (
m.context_profile?,
m.query_vram().await.0,
m.last_free_mb.load(Ordering::Acquire),
m.prefill_rate.get(),
),
#[cfg(feature = "cuda")]
LoadedHandle::Tp(m) => (
m.context_profile?,
m.query_vram_tightest_free_mb().await,
m.last_free_mb.load(Ordering::Acquire),
m.prefill_rate.get(),
),
};
@@ -305,6 +345,10 @@ pub struct LoadedModel {
/// for the TP path (which already had this invariant by accident
/// because the pool lock covered the same window).
pub inference_lock: tokio::sync::Mutex<()>,
/// Bounded admission scheduler (#53). Gated *before* `inference_lock`
/// so a busy model refuses overflow fast instead of growing an
/// unbounded, untimed queue of lock waiters.
pub admission: super::admission::AdmissionController,
/// Open/close token IDs for the reasoning marker this model
/// emits, populated once at load time by probing the tokenizer's
/// added-tokens table. `None` for non-reasoning models or
@@ -374,6 +418,13 @@ pub struct LoadedModel {
/// request-path enforcement reads this — `0` means "not derived yet"
/// → fall back to the static `NEURON_MAX_PROMPT_TOKENS`.
pub derived_input_cap: AtomicUsize,
/// Cached free VRAM (MiB) for the control plane (#53). `derived_limit`
/// (served by `GET /models`) reads this instead of querying the device
/// worker, which during inference is saturated processing forward jobs —
/// a live query would queue behind them and stall `/models`, tripping
/// cortex's health poller into marking the node unhealthy. Refreshed off
/// the request path: seeded at load, then by a background task.
pub last_free_mb: AtomicU64,
}
impl LoadedModel {
@@ -422,6 +473,10 @@ pub struct TpLoadedModel {
/// serialises subprocess RPC traffic on the pool's
/// `Vec<Worker>` channels.
pub pool: tokio::sync::Mutex<super::tp::WorkerPool>,
/// Bounded admission scheduler (#53), mirroring the single-GPU path.
/// Gated before the pool lock so an overloaded TP model returns fast
/// backpressure instead of an unbounded, untimed wait.
pub admission: super::admission::AdmissionController,
/// Handle into the leader device worker's TP slab. The boxed
/// `TpLeaderModel` (with its embedded `Arc<Comm>` clones and
/// per-rank CUDA tensors) lives on the worker thread; we hold an
@@ -482,6 +537,10 @@ pub struct TpLoadedModel {
/// Mint for pool-wide snapshot ids. Plain counter; uniqueness only
/// needs to hold per model lifetime (snapshots die with the model).
pub next_snapshot_id: std::sync::atomic::AtomicU64,
/// Cached tightest free VRAM (MiB) for the control plane (#53) — see
/// [`LoadedModel::last_free_mb`]. Read by `derived_limit` so `GET /models`
/// never fans a VRAM query out to the (inference-saturated) TP workers.
pub last_free_mb: AtomicU64,
}
#[cfg(feature = "cuda")]
@@ -1088,6 +1147,32 @@ fn debug_poison_armed(model_id: &str) -> bool {
armed && !FIRED.swap(true, Ordering::Relaxed)
}
/// Background control-plane VRAM cache refresher (#53). Every few seconds,
/// refreshes each loaded model's `last_free_mb` so `derived_limit` (served
/// by `GET /models`) reads a cached value and never queries the device
/// worker on the request path — a live query would queue behind inference
/// forward jobs on the worker thread, stalling `/models` for seconds and
/// tripping cortex's health poller into evicting the node from routing.
/// Holds a `Weak` so a shutting-down harness lets the task exit. The query
/// itself may queue behind inference, but that only delays this background
/// refresh — no request-path caller is ever blocked.
async fn vram_cache_refresh_loop(weak: std::sync::Weak<CandleHarness>) {
const REFRESH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
loop {
tokio::time::sleep(REFRESH_INTERVAL).await;
let Some(this) = weak.upgrade() else {
return; // harness dropped — exit
};
// Snapshot handles, then release the read lock before awaiting the
// (possibly slow) worker queries so we never hold it across an await.
let handles: Vec<LoadedHandle> = this.models.read().await.values().cloned().collect();
drop(this);
for handle in handles {
handle.refresh_free_mb().await;
}
}
}
/// Background auto-recovery task (#17). Drains poisoned model ids and
/// rebuilds each via [`CandleHarness::recover_one`]. Holds a `Weak` so a
/// shutting-down harness lets the task exit; processes one id at a time,
@@ -1565,6 +1650,7 @@ impl CandleHarness {
recovery_tx,
prefix_cache_cfg: config.prefix_cache.clone(),
context_limit_cfg: config.context_limit.clone(),
admission_cfg: config.admission.clone(),
});
// Background auto-recovery task (#17). Holds a `Weak` so it can't
// keep the harness alive. Spawned only when a tokio runtime is
@@ -1573,6 +1659,11 @@ impl CandleHarness {
if tokio::runtime::Handle::try_current().is_ok() {
let weak = Arc::downgrade(&this);
tokio::spawn(recovery_loop(weak, recovery_rx));
// Control-plane VRAM cache refresher (#53): keeps each loaded
// model's `last_free_mb` current off the request path, so
// `derived_limit` / `GET /models` never query the device worker
// (which is saturated during inference) and never stall.
tokio::spawn(vram_cache_refresh_loop(Arc::downgrade(&this)));
}
this
}
@@ -2006,6 +2097,7 @@ impl CandleHarness {
pub async fn chat_completion(
&self,
request: ChatCompletionRequest,
principal: Option<String>,
) -> Result<ChatCompletionResponse, InferenceError> {
let handle = {
let models = self.models.read().await;
@@ -2030,7 +2122,7 @@ impl CandleHarness {
LoadedHandle::Single(m) => m,
#[cfg(feature = "cuda")]
LoadedHandle::Tp(m) => {
return self.chat_completion_tp(m, request).await;
return self.chat_completion_tp(m, request, principal).await;
}
};
@@ -2059,6 +2151,15 @@ impl CandleHarness {
return Err(self.trigger_recovery(&model_id).await);
}
// Admission control (#53): refuse fast if the bounded queue is full
// or the wait elapses, rather than joining an unbounded lock-wait.
// The permit is held for the whole request (released on drop).
let _admit = loaded
.admission
.enter(principal.as_deref())
.await
.map_err(InferenceError::from)?;
// Serialise concurrent requests against this model. Holds for
// the duration of clear_kv_cache → prefill → decode so two
// requests' chunked-prefill sequences can't interleave on the
@@ -2378,9 +2479,14 @@ impl CandleHarness {
pub async fn chat_completion_stream(
&self,
request: ChatCompletionRequest,
principal: Option<String>,
) -> Result<mpsc::Receiver<ChatCompletionChunk>, InferenceError> {
self.chat_completion_stream_with(request, wire_chat::ChatProjectionConfig::default())
.await
self.chat_completion_stream_with(
request,
wire_chat::ChatProjectionConfig::default(),
principal,
)
.await
}
/// Same as [`Self::chat_completion_stream`] but lets the caller
@@ -2391,8 +2497,9 @@ impl CandleHarness {
&self,
request: ChatCompletionRequest,
mut config: wire_chat::ChatProjectionConfig,
principal: Option<String>,
) -> Result<mpsc::Receiver<ChatCompletionChunk>, InferenceError> {
let stream = self.inference_stream(request).await?;
let stream = self.inference_stream(request, principal).await?;
// Fill in the model's reasoning markers if the caller
// didn't pre-populate them — they're a property of the
// loaded model (which the HTTP handler doesn't reach into
@@ -2419,9 +2526,10 @@ impl CandleHarness {
request: ChatCompletionRequest,
response_id: String,
message_item_id: String,
principal: Option<String>,
) -> Result<mpsc::Receiver<crate::wire::openai_responses::ResponseStreamFrame>, InferenceError>
{
let stream = self.inference_stream(request).await?;
let stream = self.inference_stream(request, principal).await?;
let meta = crate::wire::openai_responses::ResponseMeta {
response_id,
created_at: stream.created,
@@ -2442,6 +2550,7 @@ impl CandleHarness {
async fn inference_stream(
&self,
request: ChatCompletionRequest,
principal: Option<String>,
) -> Result<InferenceStream, InferenceError> {
let handle = {
let models = self.models.read().await;
@@ -2466,7 +2575,7 @@ impl CandleHarness {
LoadedHandle::Single(m) => m,
#[cfg(feature = "cuda")]
LoadedHandle::Tp(m) => {
return self.inference_tp_stream(m, request).await;
return self.inference_tp_stream(m, request, principal).await;
}
};
@@ -2610,6 +2719,15 @@ impl CandleHarness {
// role chunk was already sent above, so the client sees
// immediate "stream open" feedback even when this request
// queues behind another for the lock.
// Admission control (#53): refuse before opening the stream if the
// model's bounded queue is full / the wait elapses. The permit moves
// into the inference task and is held until it completes.
let admit = loaded
.admission
.enter(principal.as_deref())
.await
.map_err(InferenceError::from)?;
let tool_schemas = build_tool_schemas(&request);
if let (Some(worker), Some(handle)) = (loaded.worker.clone(), loaded.arch_handle) {
#[cfg(feature = "cuda")]
@@ -2620,6 +2738,7 @@ impl CandleHarness {
let tool_schemas_inner = tool_schemas.clone();
tokio::spawn(
async move {
let _admit = admit;
let _inference_guard = loaded_for_task.inference_lock.lock().await;
match stream_inference_via_worker(
worker,
@@ -2680,6 +2799,7 @@ impl CandleHarness {
let tool_call_tokens_inner = loaded.tool_call_tokens.clone();
let tool_schemas_inner = tool_schemas.clone();
tokio::task::spawn_blocking(move || {
let _admit = admit;
let _g = span_for_task.enter();
// `blocking_lock` is safe here: spawn_blocking runs on
// a dedicated thread, not on the async runtime, so
@@ -2779,6 +2899,24 @@ pub struct InferenceStream {
/// Auto-recovery (#17) — rebuild a poisoned model's device context
/// automatically instead of leaving it bricked until a human reloads.
impl CandleHarness {
/// Per-model admission load for `GET /health` (#53): in-flight + queued
/// counts for every resident model. Lock-free per-model reads, so this
/// only briefly holds the registry read lock to enumerate handles.
pub async fn load_snapshot(&self) -> Vec<cortex_core::discovery::ModelLoad> {
let models = self.models.read().await;
models
.values()
.map(|handle| {
let (in_flight, queue_depth) = handle.load();
cortex_core::discovery::ModelLoad {
id: handle.model_id().to_string(),
in_flight,
queue_depth,
}
})
.collect()
}
/// True while `model_id` is being auto-recovered (its slot is briefly
/// absent from the registry during the reload).
pub async fn is_recovering(&self, model_id: &str) -> bool {
@@ -2890,7 +3028,7 @@ impl Harness for CandleHarness {
// physics + live free VRAM + measured prefill rate. `None`
// for arches without a context profile. `cost` stays
// operator-set in the catalogue, filled by the gateway.
let limit = h.derived_limit(&self.context_limit_cfg).await;
let limit = h.derived_limit(&self.context_limit_cfg);
out.push(ModelInfo {
id: h.model_id().into(),
harness: "candle".into(),
@@ -3128,6 +3266,7 @@ impl Harness for CandleHarness {
worker,
arch_handle,
inference_lock: tokio::sync::Mutex::new(()),
admission: super::admission::AdmissionController::new(&self.admission_cfg),
reasoning_tokens,
tool_call_tokens,
chat_template,
@@ -3139,6 +3278,7 @@ impl Harness for CandleHarness {
context_profile,
prefill_rate: super::context_limit::PrefillRateEma::new(),
derived_input_cap: AtomicUsize::new(0),
last_free_mb: AtomicU64::new(0),
});
if loaded.prefix_cache.is_some() {
tracing::info!(
@@ -3149,6 +3289,14 @@ impl Harness for CandleHarness {
);
}
// Seed the control-plane VRAM cache (#53) while the worker is idle
// (load just finished), so `/models` has a value before the
// background refresher's first tick and never queries the worker.
let (free_mb, _) = loaded.query_vram().await;
if free_mb > 0 {
loaded.last_free_mb.store(free_mb, Ordering::Release);
}
let mut models = self.models.write().await;
models.insert(spec.model_id.clone(), LoadedHandle::Single(loaded));
tracing::info!(model = %spec.model_id, "model loaded");
@@ -3372,6 +3520,7 @@ impl CandleHarness {
tokenizer,
devices: devices.clone(),
pool: TMutex::new(pool),
admission: super::admission::AdmissionController::new(&self.admission_cfg),
leader_handle,
leader_device: leader_device.clone(),
poisoned: AtomicBool::new(false),
@@ -3398,6 +3547,7 @@ impl CandleHarness {
),
prefill_rate: super::context_limit::PrefillRateEma::new(),
derived_input_cap: AtomicUsize::new(0),
last_free_mb: AtomicU64::new(0),
next_snapshot_id: std::sync::atomic::AtomicU64::new(1),
});
if tp_loaded.prefix_cache.is_some() {
@@ -3409,6 +3559,14 @@ impl CandleHarness {
);
}
// Seed the control-plane VRAM cache (#53) — tightest free across
// ranks, while the workers are idle post-load — so `/models` never
// fans a query out to the inference-busy TP workers.
let free_mb = tp_loaded.query_vram_tightest_free_mb().await;
if free_mb > 0 {
tp_loaded.last_free_mb.store(free_mb, Ordering::Release);
}
let mut models = self.models.write().await;
models.insert(spec.model_id.clone(), LoadedHandle::Tp(tp_loaded));
tracing::info!(
@@ -3438,6 +3596,7 @@ impl CandleHarness {
&self,
tp: Arc<TpLoadedModel>,
request: ChatCompletionRequest,
principal: Option<String>,
) -> Result<ChatCompletionResponse, InferenceError> {
// Tag every line of this request with a short req_id so a
// grep over journalctl reconstructs one request even when
@@ -3474,7 +3633,8 @@ impl CandleHarness {
}
let tp_for_marker = Arc::clone(&tp);
let handle = tokio::spawn(chat_completion_tp_inner(tp, request).instrument(span.clone()));
let handle =
tokio::spawn(chat_completion_tp_inner(tp, request, principal).instrument(span.clone()));
match handle.await {
Ok(Ok(resp)) => Ok(resp),
Ok(Err(e)) => {
@@ -3545,6 +3705,7 @@ impl CandleHarness {
&self,
tp: Arc<TpLoadedModel>,
request: ChatCompletionRequest,
principal: Option<String>,
) -> Result<InferenceStream, InferenceError> {
if tp.poisoned.load(Ordering::Acquire) {
return Err(self.trigger_recovery(&request.model).await);
@@ -3690,10 +3851,19 @@ impl CandleHarness {
validate_vision_prefill(prompt_len, vram_free_mb)?;
}
// Admission control (#53): refuse before opening the stream; the
// permit moves into the orchestration task and is held for its life.
let admit = tp
.admission
.enter(principal.as_deref())
.await
.map_err(InferenceError::from)?;
let tool_schemas = build_tool_schemas(&request);
let tp_for_task = Arc::clone(&tp);
tokio::spawn(
async move {
let _admit = admit;
let mut failure: Option<String> = None;
let mut pool = acquire_pool_lock(&tp_for_task.pool, &model_id).await;
let leader_handle = tp_for_task.leader_handle;
@@ -4196,6 +4366,7 @@ impl CandleHarness {
async fn chat_completion_tp_inner(
tp: Arc<TpLoadedModel>,
request: ChatCompletionRequest,
principal: Option<String>,
) -> Result<ChatCompletionResponse, InferenceError> {
let req_start = std::time::Instant::now();
let model_id = request.model.clone();
@@ -4284,6 +4455,14 @@ async fn chat_completion_tp_inner(
validate_vision_prefill(prompt_len, vram_free_mb)?;
}
// Admission control (#53): bounded queue + fast reject before joining
// the pool-lock wait. Held for the whole request (released on drop).
let _admit = tp
.admission
.enter(principal.as_deref())
.await
.map_err(InferenceError::from)?;
// Acquire the pool lock for the duration of the request. After
// Phase 3 the leader's TpLeaderModel lives in the device worker
// thread, so the pool lock now serialises only subprocess RPC
@@ -4826,10 +5005,35 @@ pub enum InferenceError {
/// failure mode that hid several client-compat bugs. Maps to 422.
#[error("chat template could not render this request: {detail}")]
TemplateRenderFailed { detail: String },
/// Admission control (#53) refused on load: the model's bounded queue is
/// full or the wait elapsed. Maps to `503 rate_limit_exceeded` +
/// `Retry-After` — a fast, retryable "busy" signal, not a stall.
#[error("model is busy; retry after {retry_after_secs}s")]
Overloaded { retry_after_secs: u64 },
/// Per-principal fair-share cap (#54) exceeded: this principal already
/// has its max requests in flight/queued. Maps to `429
/// rate_limit_exceeded` + `Retry-After`; a well-behaved client self-paces.
#[error("per-principal in-flight limit reached; retry after {retry_after_secs}s")]
PerPrincipalLimit { retry_after_secs: u64 },
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl From<super::admission::AdmissionRejection> for InferenceError {
fn from(rejection: super::admission::AdmissionRejection) -> Self {
use super::admission::AdmissionRejection;
match rejection {
AdmissionRejection::QueueFull { retry_after_secs }
| AdmissionRejection::Timeout { retry_after_secs } => {
InferenceError::Overloaded { retry_after_secs }
}
AdmissionRejection::PrincipalCap { retry_after_secs } => {
InferenceError::PerPrincipalLimit { retry_after_secs }
}
}
}
}
/// Build the model's prompt from a [`ChatCompletionRequest`].
///
/// Prefers the model's own `chat_template` when one was loaded

View File

@@ -1,5 +1,6 @@
//! Harness registry — maps harness names to trait implementations.
pub mod admission;
pub mod arch;
pub mod candle;
pub mod chat_template;

View File

@@ -30,6 +30,9 @@ impl HealthCache {
// direct read from the cache stays a well-typed
// HealthResponse on the wire.
activation: Default::default(),
// Per-model admission load is overlaid by the api handler
// from the candle harness (#53); the cache doesn't own it.
models: Vec::new(),
}),
has_gpus: RwLock::new(false),
}

View File

@@ -114,6 +114,12 @@ async fn test_health_endpoint() {
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["uptime_secs"], 0);
// Per-model admission load (#53) is always present, even with no models
// loaded (empty array) — cortex's load-aware router (#55) relies on it.
assert!(
body["models"].is_array(),
"/health must expose a models load array"
);
}
#[tokio::test]