feat(#47 phase 1c): per-request token metering + spend ledger
All checks were successful
CI / Format (push) Successful in 40s
CI / CUDA type-check (push) Successful in 1m41s
CI / Clippy (push) Successful in 2m15s
CI / Test (push) Successful in 4m28s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Resolve version stamps + change detection (push) Successful in 32s
build-prerelease / Build neuron-blackwell (push) Has been skipped
build-prerelease / Build neuron-ampere (push) Has been skipped
build-prerelease / Build neuron-ada (push) Has been skipped
build-prerelease / Package helexa-neuron-ada RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-ampere RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been skipped
build-prerelease / Lint (fmt + clippy) (push) Successful in 2m30s
build-prerelease / Build cortex binary (push) Successful in 2m49s
build-prerelease / Package cortex RPM (push) Successful in 1m24s
build-prerelease / Test (push) Successful in 5m59s
build-prerelease / Build helexa-bench binary (push) Has been skipped
build-prerelease / Package helexa-bench RPM (push) Has been skipped
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 49s

Stage 1 accounting (#51): capture real per-request usage and feed it to
the spend ledger + per-principal metrics. Establishes the reserve→settle
lifecycle that budget enforcement (#52) will tighten.

- cortex-gateway::metering: ReservationGuard makes reservation leaks
  impossible — settle() records actual spend + releases the remainder;
  dropping an un-settled guard releases the whole reservation, so any
  early return / error / dropped stream resolves it. UsageSink is the
  completion hook; principal_from_headers reconstructs the principal from
  the middleware-stamped headers (uniform across all proxy paths, no
  handler-signature churn); record_spend emits per-principal counters.
- proxy::TokenMetrics gains an optional usage_sink, invoked exactly once
  in finish() with the observed (prompt, completion) — restructured so it
  always runs (even when no body/usage arrived → settle 0 → release),
  while preserving the existing per-model metric emissions unchanged.
- All proxy paths metered: chat/completions/responses via
  proxy_with_metrics (reserve 0 → forward_request → settle in finish);
  Anthropic non-streaming settles from the buffered body; Anthropic
  streaming (anthropic_sse) now scans the upstream frames for the usage
  object (#48) — it captured none before — and settles at pump end.
- This phase reserves 0 tokens (metering only, no enforcement); #52 flips
  the reserved amount to prompt+max_output and surfaces BudgetError. The
  settle/release plumbing is identical, so that change is localized.
- New Prometheus counters: cortex_spend_tokens_total (+ prompt/completion
  splits), labelled by account/key.

2 integration tests: cumulative per-key spend after N requests with
reservations settled to zero outstanding; anonymous requests record no
spend. Local fmt/clippy/test all green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-17 19:29:51 +03:00
parent 486d7e9a8f
commit 4f16b8c541
7 changed files with 468 additions and 32 deletions

View File

@@ -33,6 +33,7 @@ pub async fn stream_translated(
model_id: &str,
node_name: &str,
inbound_headers: &axum::http::HeaderMap,
usage_sink: Option<crate::metering::UsageSink>,
) -> Response {
let url = format!("{endpoint}/v1/chat/completions");
tracing::info!(
@@ -96,6 +97,10 @@ pub async fn stream_translated(
let mut saw_tool_call = false;
let mut last_finish: Option<String> = None;
let mut frames = 0u64;
// Engine-truth usage for metering (#51), scanned from the upstream
// frames (neuron emits a final `usage` object on the stream, #48).
let mut usage_prompt = 0u64;
let mut usage_completion = 0u64;
'outer: while let Some(block) = upstream.next().await {
let block = match block {
@@ -123,6 +128,15 @@ pub async fn stream_translated(
continue;
}
tracing::trace!(node = %node, frame = %data, "anthropic stream: upstream frame");
// Capture usage for metering before translation — the
// usage object rides on a late frame (often after the
// last content delta).
if let Some(p) = crate::proxy::last_count_for(data, "prompt_tokens") {
usage_prompt = p;
}
if let Some(c) = crate::proxy::last_count_for(data, "completion_tokens") {
usage_completion = c;
}
let Ok(chunk) = serde_json::from_str::<ChatCompletionChunk>(data) else {
tracing::debug!(node = %node, "anthropic stream: unparsable upstream frame skipped");
continue;
@@ -164,6 +178,14 @@ pub async fn stream_translated(
terminated = done,
"anthropic stream complete"
);
// Settle metering with the observed usage (#51). Runs on every exit
// path of the pump — clean end, early break, or upstream error — so
// the reservation is always resolved. `(0, 0)` when no usage frame
// was seen, which releases without recording spend.
if let Some(sink) = usage_sink {
sink(usage_prompt, usage_completion);
}
});
Response::builder()

View File

@@ -306,6 +306,23 @@ async fn anthropic_messages(
}
let start = Instant::now();
// Per-request metering (#51), same lifecycle as the OpenAI paths:
// reserve (0 tokens this phase) and build the completion sink. Consumed
// by whichever branch runs below; dropping it unused releases the
// reservation.
let usage_sink = match crate::metering::principal_from_headers(&headers) {
Some(principal) => {
let guard = crate::metering::ReservationGuard::reserve(
Arc::clone(&fleet.entitlements),
&principal,
0,
)
.await;
Some(crate::metering::usage_sink(principal, guard))
}
None => None,
};
if is_streaming {
// Anthropic SSE translation (#24): upstream speaks OpenAI SSE;
// re-frame it event-by-event into Anthropic's message_start /
@@ -317,6 +334,7 @@ async fn anthropic_messages(
&model_id,
&route.node_name,
&headers,
usage_sink,
)
.await;
metrics::histogram!("cortex_request_duration_seconds", &labels)
@@ -441,6 +459,15 @@ async fn anthropic_messages(
metrics::histogram!("cortex_request_duration_seconds", &labels)
.record(start.elapsed().as_secs_f64());
// Settle metering with the upstream usage (#51). Scanned from the
// raw body — same engine-truth source as the streaming path — so we
// don't depend on the typed usage struct's optionality.
if let Some(sink) = usage_sink {
let tail = String::from_utf8_lossy(&body_bytes);
let prompt = proxy::last_count_for(&tail, "prompt_tokens").unwrap_or(0);
let completion = proxy::last_count_for(&tail, "completion_tokens").unwrap_or(0);
sink(prompt, completion);
}
// Did the model actually produce a structured tool call, or just
// text? This is the single most useful signal for "is tool
// calling working end-to-end" — a `false` here alongside a
@@ -738,9 +765,35 @@ async fn proxy_with_metrics(
metrics::counter!("cortex_cold_starts_total", &labels).increment(1);
}
// Per-request metering (#51): reconstruct the principal from the
// middleware-stamped headers, reserve (0 tokens this phase — metering
// only; #52 makes it the real cap), and build the completion sink that
// settles spend when the response finishes. Anonymous requests get no
// sink. Must happen before `headers`/`body` are moved into the proxy.
let usage_sink = match crate::metering::principal_from_headers(&headers) {
Some(principal) => {
let guard = crate::metering::ReservationGuard::reserve(
Arc::clone(&fleet.entitlements),
&principal,
0,
)
.await;
Some(crate::metering::usage_sink(principal, guard))
}
None => None,
};
let start = Instant::now();
let result =
proxy::forward_request(&fleet.http_client, route, path, headers, body, model_id).await;
let result = proxy::forward_request(
&fleet.http_client,
route,
path,
headers,
body,
model_id,
usage_sink,
)
.await;
let duration = start.elapsed();
match result {

View File

@@ -4,6 +4,7 @@ pub mod entitlements_local;
pub mod error;
pub mod evictor;
pub mod handlers;
pub mod metering;
pub mod metrics;
pub mod poller;
pub mod proxy;

View File

@@ -0,0 +1,121 @@
//! Per-request token metering (#51).
//!
//! Captures the real `(prompt, completion)` usage of every request and feeds
//! it to two places: the [`EntitlementProvider`] spend ledger (via
//! reserve→settle) and per-principal Prometheus counters. The principal is
//! reconstructed from the internal headers the auth middleware stamped (#49),
//! so this works uniformly across every proxy path without threading the
//! typed principal through each handler.
//!
//! The reserve→settle lifecycle is established here but, in this phase,
//! reserves **zero** tokens — metering only, no enforcement. Budget
//! enforcement (#52) flips the reserved amount to the real
//! `prompt + max_output` and handles the [`BudgetError`] rejection; the
//! settle/release plumbing is identical, so that change is localized.
//!
//! [`ReservationGuard`] makes leaks impossible: settling records actual
//! spend and releases the unused remainder; dropping a guard that was never
//! settled releases the whole reservation. So an early return, error path,
//! or dropped stream can't strand a reservation.
use axum::http::HeaderMap;
use cortex_core::entitlements::{EntitlementProvider, HEADER_ACCOUNT_ID, HEADER_KEY_ID, Principal};
use std::sync::Arc;
/// Invoked exactly once at request completion with best-effort
/// `(prompt_tokens, completion_tokens)`. When no usage could be observed
/// (e.g. a pre-dispatch failure or a dropped stream) it is dropped unused —
/// which releases the held reservation via [`ReservationGuard`]'s `Drop`.
pub type UsageSink = Box<dyn FnOnce(u64, u64) + Send>;
/// Reconstruct the principal from the cortex-stamped internal headers. The
/// auth middleware strips any client copy and stamps the authoritative value,
/// so these headers are trustworthy within cortex. `None` for anonymous
/// (unauthenticated) requests.
pub fn principal_from_headers(headers: &HeaderMap) -> Option<Principal> {
let account_id = headers.get(HEADER_ACCOUNT_ID)?.to_str().ok()?.to_string();
let key_id = headers.get(HEADER_KEY_ID)?.to_str().ok()?.to_string();
Some(Principal { account_id, key_id })
}
/// Emit per-principal spend counters (#51). Labelled by account/key only —
/// both are operator-bounded, so cardinality is controlled.
pub fn record_spend(principal: &Principal, prompt: u64, completion: u64) {
let labels = [
("account", principal.account_id.clone()),
("key", principal.key_id.clone()),
];
metrics::counter!("cortex_spend_tokens_total", &labels).increment(prompt + completion);
metrics::counter!("cortex_spend_prompt_tokens_total", &labels).increment(prompt);
metrics::counter!("cortex_spend_completion_tokens_total", &labels).increment(completion);
}
/// Holds a budget reservation for the life of a request. [`settle`] records
/// actual spend and releases the remainder; an un-settled guard releases the
/// whole reservation when dropped. Anonymous requests carry an empty guard,
/// where every operation is a no-op.
///
/// [`settle`]: ReservationGuard::settle
pub struct ReservationGuard {
provider: Arc<dyn EntitlementProvider>,
reservation: Option<cortex_core::entitlements::Reservation>,
}
impl ReservationGuard {
/// An empty guard for an anonymous request — no reservation to resolve.
pub fn anonymous(provider: Arc<dyn EntitlementProvider>) -> Self {
Self {
provider,
reservation: None,
}
}
/// Reserve `max_tokens` for the principal, returning a guard. In this
/// phase callers pass `0` (metering only); #52 passes the real cap and
/// surfaces the [`cortex_core::entitlements::BudgetError`] instead.
pub async fn reserve(
provider: Arc<dyn EntitlementProvider>,
principal: &Principal,
max_tokens: u64,
) -> Self {
let reservation = provider.reserve(principal, max_tokens).await.ok();
Self {
provider,
reservation,
}
}
/// Settle with the tokens actually consumed, disarming the drop-release.
/// Spawns the (fast, in-process for the local provider) settle so the
/// caller — which may be a sync stream-completion callback — needn't
/// await.
pub fn settle(mut self, actual_tokens: u64) {
if let Some(reservation) = self.reservation.take() {
let provider = Arc::clone(&self.provider);
tokio::spawn(async move {
provider.settle(reservation, actual_tokens).await;
});
}
}
}
impl Drop for ReservationGuard {
fn drop(&mut self) {
if let Some(reservation) = self.reservation.take() {
let provider = Arc::clone(&self.provider);
tokio::spawn(async move {
provider.release(reservation).await;
});
}
}
}
/// Build the completion sink for an authenticated request: record spend and
/// settle the reservation with the observed total. Dropping it unused (no
/// usage observed) releases the reservation via the guard.
pub fn usage_sink(principal: Principal, guard: ReservationGuard) -> UsageSink {
Box::new(move |prompt, completion| {
record_spend(&principal, prompt, completion);
guard.settle(prompt + completion);
})
}

View File

@@ -63,4 +63,16 @@ fn describe_metrics() {
"cortex_cold_starts_total",
"Total number of cold-start model loads"
);
metrics::describe_counter!(
"cortex_spend_tokens_total",
"Total metered tokens (prompt + completion) per principal, labelled by account/key (#51)"
);
metrics::describe_counter!(
"cortex_spend_prompt_tokens_total",
"Metered prompt tokens per principal, labelled by account/key (#51)"
);
metrics::describe_counter!(
"cortex_spend_completion_tokens_total",
"Metered completion tokens per principal, labelled by account/key (#51)"
);
}

View File

@@ -31,6 +31,7 @@ pub async fn forward_request(
headers: HeaderMap,
body: bytes::Bytes,
model_id: &str,
usage_sink: Option<crate::metering::UsageSink>,
) -> Result<Response, ProxyError> {
let request_start = Instant::now();
let url = format!("{}{}", route.endpoint, path);
@@ -82,7 +83,7 @@ pub async fn forward_request(
let resp_headers = upstream_resp.headers().clone();
let stream = TokenMetricsStream::new(
Box::pin(upstream_resp.bytes_stream()),
TokenMetrics::new(model_id, &route.node_name, request_start),
TokenMetrics::new(model_id, &route.node_name, request_start, usage_sink),
);
let body = Body::from_stream(stream);
@@ -186,10 +187,19 @@ struct TokenMetrics {
last_chunk: Option<Instant>,
tail: String,
finished: bool,
/// Per-principal metering hook (#51). Invoked exactly once in `finish`
/// with the observed `(prompt, completion)` so the reservation can be
/// settled and spend recorded. `None` for anonymous requests.
usage_sink: Option<crate::metering::UsageSink>,
}
impl TokenMetrics {
fn new(model_id: &str, node_name: &str, request_start: Instant) -> Self {
fn new(
model_id: &str,
node_name: &str,
request_start: Instant,
usage_sink: Option<crate::metering::UsageSink>,
) -> Self {
Self {
labels: [
("model", model_id.to_string()),
@@ -200,6 +210,7 @@ impl TokenMetrics {
last_chunk: None,
tail: String::new(),
finished: false,
usage_sink,
}
}
@@ -227,36 +238,45 @@ impl TokenMetrics {
return;
}
self.finished = true;
let Some(first) = self.first_chunk else {
return; // no body ever arrived — nothing to record
};
let ttft = first.duration_since(self.request_start).as_secs_f64();
metrics::histogram!("cortex_time_to_first_token_seconds", &self.labels).record(ttft);
if let Some(prompt) = last_count_for(&self.tail, "prompt_tokens") {
metrics::counter!("cortex_prompt_tokens_total", &self.labels).increment(prompt);
}
let Some(completion) = last_count_for(&self.tail, "completion_tokens") else {
return;
};
if completion == 0 {
return;
}
metrics::counter!("cortex_completion_tokens_total", &self.labels).increment(completion);
let prompt = last_count_for(&self.tail, "prompt_tokens");
let completion = last_count_for(&self.tail, "completion_tokens");
let last = self.last_chunk.unwrap_or(first);
let decode_window = last.duration_since(first).as_secs_f64();
// Streaming: rate over the decode window (first→last chunk).
// Non-streaming bodies arrive as ~one chunk (window ≈ 0), where
// the only honest denominator is the full request duration.
let secs = if decode_window >= 0.1 {
decode_window
} else {
last.duration_since(self.request_start).as_secs_f64()
};
if secs > 0.0 {
metrics::histogram!("cortex_tokens_per_second", &self.labels)
.record(completion as f64 / secs);
// Per-model metrics — only when body chunks actually arrived.
if let Some(first) = self.first_chunk {
let ttft = first.duration_since(self.request_start).as_secs_f64();
metrics::histogram!("cortex_time_to_first_token_seconds", &self.labels).record(ttft);
if let Some(prompt) = prompt {
metrics::counter!("cortex_prompt_tokens_total", &self.labels).increment(prompt);
}
if let Some(completion) = completion.filter(|c| *c > 0) {
metrics::counter!("cortex_completion_tokens_total", &self.labels)
.increment(completion);
let last = self.last_chunk.unwrap_or(first);
let decode_window = last.duration_since(first).as_secs_f64();
// Streaming: rate over the decode window (first→last chunk).
// Non-streaming bodies arrive as ~one chunk (window ≈ 0),
// where the only honest denominator is the full request
// duration.
let secs = if decode_window >= 0.1 {
decode_window
} else {
last.duration_since(self.request_start).as_secs_f64()
};
if secs > 0.0 {
metrics::histogram!("cortex_tokens_per_second", &self.labels)
.record(completion as f64 / secs);
}
}
}
// Per-principal metering + reservation settle (#51). Always runs so
// the reservation is resolved even when no usage/body was observed
// (sink with (0, 0) → settle 0 → release).
if let Some(sink) = self.usage_sink.take() {
sink(prompt.unwrap_or(0), completion.unwrap_or(0));
}
}
}

View File

@@ -0,0 +1,207 @@
//! Integration tests for per-request token metering (#51).
//!
//! Drives authenticated requests through the gateway to a mock neuron that
//! reports a fixed `usage` object, then asserts the EntitlementProvider's
//! spend ledger reflects cumulative per-key spend and that reservations
//! settle to actual (no outstanding reserved tokens once requests complete).
mod common;
use cortex_core::config::{
ApiKeyConfig, EntitlementsConfig, EvictionSettings, EvictionStrategy, GatewayConfig,
GatewaySettings, NeuronEndpoint,
};
use cortex_core::entitlements::{CapWindow, Principal};
use cortex_core::node::{ModelEntry, ModelStatus};
use cortex_gateway::state::CortexState;
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
const ACCOUNT: &str = "acct-meter";
const KEY_ID: &str = "key-meter";
const BEARER: &str = "sk-meter";
/// The mock neuron (common::spawn_mock_neuron) reports this fixed usage on
/// every chat completion.
const PROMPT_PER_REQ: u64 = 10;
const COMPLETION_PER_REQ: u64 = 5;
async fn spawn_metered_gateway(neuron_url: &str) -> (Arc<CortexState>, String) {
let config = GatewayConfig {
gateway: GatewaySettings {
listen: "127.0.0.1:0".into(),
metrics_listen: "127.0.0.1:0".into(),
},
eviction: EvictionSettings {
strategy: EvictionStrategy::Lru,
defrag_after_cycles: 0,
},
neurons: vec![NeuronEndpoint {
name: "mock-node".into(),
endpoint: neuron_url.to_string(),
}],
models_config: "/dev/null".into(),
entitlements: EntitlementsConfig {
require_auth: true,
keys: vec![ApiKeyConfig {
key: BEARER.into(),
account_id: ACCOUNT.into(),
key_id: Some(KEY_ID.into()),
hard_cap: Some(1_000_000),
window: CapWindow::Balance,
}],
},
};
let fleet = Arc::new(CortexState::from_config(&config));
{
let mut nodes = fleet.nodes.write().await;
let node = nodes.get_mut("mock-node").unwrap();
node.healthy = true;
node.models.insert(
"test-model".into(),
ModelEntry {
id: "test-model".into(),
status: ModelStatus::Loaded,
last_accessed: None,
vram_estimate_mb: Some(8000),
capabilities: Vec::new(),
tool_call: false,
reasoning: false,
limit: None,
},
);
}
let app = cortex_gateway::build_app(Arc::clone(&fleet));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
(fleet, format!("http://{addr}"))
}
fn principal() -> Principal {
Principal {
account_id: ACCOUNT.into(),
key_id: KEY_ID.into(),
}
}
/// Poll the provider ledger until settled spend reaches `expected` (settle
/// runs in a spawned task after the response stream finishes) or time out.
async fn await_spent(fleet: &CortexState, expected: u64) -> u64 {
let principal = principal();
for _ in 0..100 {
let snap = fleet.entitlements.snapshot(&principal).await.unwrap();
if snap.spent >= expected {
return snap.spent;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
fleet.entitlements.snapshot(&principal).await.unwrap().spent
}
#[tokio::test]
async fn cumulative_spend_is_metered_per_key() {
let neuron = common::spawn_mock_neuron().await;
let (fleet, gateway) = spawn_metered_gateway(&neuron).await;
let client = reqwest::Client::new();
const N: u64 = 3;
for _ in 0..N {
let resp = client
.post(format!("{gateway}/v1/chat/completions"))
.bearer_auth(BEARER)
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
// Drain the body so the response stream finishes and metering settles.
let _ = resp.bytes().await.unwrap();
}
let expected = N * (PROMPT_PER_REQ + COMPLETION_PER_REQ);
let spent = await_spent(&fleet, expected).await;
assert_eq!(
spent, expected,
"ledger must reflect cumulative per-key spend"
);
// Reservations settled to actual — nothing left outstanding.
let snap = fleet.entitlements.snapshot(&principal()).await.unwrap();
assert_eq!(snap.reserved, 0, "all reservations must settle/release");
assert_eq!(snap.hard_cap, Some(1_000_000));
}
#[tokio::test]
async fn anonymous_request_records_no_spend() {
// require_auth=false so the unauthenticated request is served, but with
// no principal it must not touch any ledger.
let neuron = common::spawn_mock_neuron().await;
let config = GatewayConfig {
gateway: GatewaySettings {
listen: "127.0.0.1:0".into(),
metrics_listen: "127.0.0.1:0".into(),
},
eviction: EvictionSettings {
strategy: EvictionStrategy::Lru,
defrag_after_cycles: 0,
},
neurons: vec![NeuronEndpoint {
name: "mock-node".into(),
endpoint: neuron.clone(),
}],
models_config: "/dev/null".into(),
entitlements: EntitlementsConfig::default(),
};
let fleet = Arc::new(CortexState::from_config(&config));
{
let mut nodes = fleet.nodes.write().await;
let node = nodes.get_mut("mock-node").unwrap();
node.healthy = true;
node.models.insert(
"test-model".into(),
ModelEntry {
id: "test-model".into(),
status: ModelStatus::Loaded,
last_accessed: None,
vram_estimate_mb: Some(8000),
capabilities: Vec::new(),
tool_call: false,
reasoning: false,
limit: None,
},
);
}
let app = cortex_gateway::build_app(Arc::clone(&fleet));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
let resp = reqwest::Client::new()
.post(format!("http://{addr}/v1/chat/completions"))
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let _ = resp.bytes().await.unwrap();
// An unconfigured principal has a zeroed snapshot — nothing was metered.
let snap = fleet
.entitlements
.snapshot(&Principal {
account_id: "nobody".into(),
key_id: "nobody".into(),
})
.await
.unwrap();
assert_eq!(snap.spent, 0);
}