Compare commits
2 Commits
feat/47-ph
...
feat/47-ph
| Author | SHA1 | Date | |
|---|---|---|---|
|
cdf87284af
|
|||
|
4f16b8c541
|
@@ -33,6 +33,7 @@ pub async fn stream_translated(
|
||||
model_id: &str,
|
||||
node_name: &str,
|
||||
inbound_headers: &axum::http::HeaderMap,
|
||||
usage_sink: Option<crate::metering::UsageSink>,
|
||||
) -> Response {
|
||||
let url = format!("{endpoint}/v1/chat/completions");
|
||||
tracing::info!(
|
||||
@@ -96,6 +97,10 @@ pub async fn stream_translated(
|
||||
let mut saw_tool_call = false;
|
||||
let mut last_finish: Option<String> = None;
|
||||
let mut frames = 0u64;
|
||||
// Engine-truth usage for metering (#51), scanned from the upstream
|
||||
// frames (neuron emits a final `usage` object on the stream, #48).
|
||||
let mut usage_prompt = 0u64;
|
||||
let mut usage_completion = 0u64;
|
||||
|
||||
'outer: while let Some(block) = upstream.next().await {
|
||||
let block = match block {
|
||||
@@ -123,6 +128,15 @@ pub async fn stream_translated(
|
||||
continue;
|
||||
}
|
||||
tracing::trace!(node = %node, frame = %data, "anthropic stream: upstream frame");
|
||||
// Capture usage for metering before translation — the
|
||||
// usage object rides on a late frame (often after the
|
||||
// last content delta).
|
||||
if let Some(p) = crate::proxy::last_count_for(data, "prompt_tokens") {
|
||||
usage_prompt = p;
|
||||
}
|
||||
if let Some(c) = crate::proxy::last_count_for(data, "completion_tokens") {
|
||||
usage_completion = c;
|
||||
}
|
||||
let Ok(chunk) = serde_json::from_str::<ChatCompletionChunk>(data) else {
|
||||
tracing::debug!(node = %node, "anthropic stream: unparsable upstream frame skipped");
|
||||
continue;
|
||||
@@ -164,6 +178,14 @@ pub async fn stream_translated(
|
||||
terminated = done,
|
||||
"anthropic stream complete"
|
||||
);
|
||||
|
||||
// Settle metering with the observed usage (#51). Runs on every exit
|
||||
// path of the pump — clean end, early break, or upstream error — so
|
||||
// the reservation is always resolved. `(0, 0)` when no usage frame
|
||||
// was seen, which releases without recording spend.
|
||||
if let Some(sink) = usage_sink {
|
||||
sink(usage_prompt, usage_completion);
|
||||
}
|
||||
});
|
||||
|
||||
Response::builder()
|
||||
|
||||
@@ -306,6 +306,29 @@ async fn anthropic_messages(
|
||||
}
|
||||
let start = Instant::now();
|
||||
|
||||
// Per-request metering + budget enforcement (#51/#52), same lifecycle as
|
||||
// the OpenAI paths. Estimate from the translated OpenAI body (what neuron
|
||||
// sees). Refuse over-cap before dispatch via the #63 envelope; otherwise
|
||||
// build the sink consumed by whichever branch runs below.
|
||||
let usage_sink = match crate::metering::principal_from_headers(&headers) {
|
||||
Some(principal) => {
|
||||
let advertised =
|
||||
advertised_output_limit(&fleet, &route.node_name, &route.resolved_model_id).await;
|
||||
let max_tokens = crate::metering::reservation_estimate(&openai_body, advertised);
|
||||
match crate::metering::reserve_or_reject(
|
||||
Arc::clone(&fleet.entitlements),
|
||||
&principal,
|
||||
max_tokens,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(guard) => Some(crate::metering::usage_sink(principal, guard)),
|
||||
Err(env) => return crate::error::envelope_response(env),
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
if is_streaming {
|
||||
// Anthropic SSE translation (#24): upstream speaks OpenAI SSE;
|
||||
// re-frame it event-by-event into Anthropic's message_start /
|
||||
@@ -317,6 +340,7 @@ async fn anthropic_messages(
|
||||
&model_id,
|
||||
&route.node_name,
|
||||
&headers,
|
||||
usage_sink,
|
||||
)
|
||||
.await;
|
||||
metrics::histogram!("cortex_request_duration_seconds", &labels)
|
||||
@@ -441,6 +465,15 @@ async fn anthropic_messages(
|
||||
|
||||
metrics::histogram!("cortex_request_duration_seconds", &labels)
|
||||
.record(start.elapsed().as_secs_f64());
|
||||
// Settle metering with the upstream usage (#51). Scanned from the
|
||||
// raw body — same engine-truth source as the streaming path — so we
|
||||
// don't depend on the typed usage struct's optionality.
|
||||
if let Some(sink) = usage_sink {
|
||||
let tail = String::from_utf8_lossy(&body_bytes);
|
||||
let prompt = proxy::last_count_for(&tail, "prompt_tokens").unwrap_or(0);
|
||||
let completion = proxy::last_count_for(&tail, "completion_tokens").unwrap_or(0);
|
||||
sink(prompt, completion);
|
||||
}
|
||||
// Did the model actually produce a structured tool call, or just
|
||||
// text? This is the single most useful signal for "is tool
|
||||
// calling working end-to-end" — a `false` here alongside a
|
||||
@@ -738,9 +771,42 @@ async fn proxy_with_metrics(
|
||||
metrics::counter!("cortex_cold_starts_total", &labels).increment(1);
|
||||
}
|
||||
|
||||
// Per-request metering + budget enforcement (#51/#52): reconstruct the
|
||||
// principal from the middleware-stamped headers, reserve the request's
|
||||
// upper-bound cost (prompt estimate + max output), and build the
|
||||
// completion sink that settles actual spend when the response finishes.
|
||||
// A reservation over the hard cap is refused *before* dispatch with the
|
||||
// #63 envelope. Anonymous requests skip all of this. Must happen before
|
||||
// `headers`/`body` are moved into the proxy.
|
||||
let usage_sink = match crate::metering::principal_from_headers(&headers) {
|
||||
Some(principal) => {
|
||||
let advertised = advertised_output_limit(fleet, &route.node_name, model_id).await;
|
||||
let max_tokens = crate::metering::reservation_estimate(&body, advertised);
|
||||
match crate::metering::reserve_or_reject(
|
||||
Arc::clone(&fleet.entitlements),
|
||||
&principal,
|
||||
max_tokens,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(guard) => Some(crate::metering::usage_sink(principal, guard)),
|
||||
Err(env) => return crate::error::envelope_response(env),
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let start = Instant::now();
|
||||
let result =
|
||||
proxy::forward_request(&fleet.http_client, route, path, headers, body, model_id).await;
|
||||
let result = proxy::forward_request(
|
||||
&fleet.http_client,
|
||||
route,
|
||||
path,
|
||||
headers,
|
||||
body,
|
||||
model_id,
|
||||
usage_sink,
|
||||
)
|
||||
.await;
|
||||
let duration = start.elapsed();
|
||||
|
||||
match result {
|
||||
@@ -759,6 +825,25 @@ async fn proxy_with_metrics(
|
||||
}
|
||||
}
|
||||
|
||||
/// The model's advertised `limit.output` (#62) on a given node, used as the
|
||||
/// default output budget for budget reservations (#52) when the request
|
||||
/// omits `max_(completion_)tokens`. `None` when the node/model/limit is
|
||||
/// unknown — callers fall back to [`crate::metering::FALLBACK_MAX_OUTPUT`].
|
||||
async fn advertised_output_limit(
|
||||
fleet: &CortexState,
|
||||
node_name: &str,
|
||||
model_id: &str,
|
||||
) -> Option<u64> {
|
||||
let nodes = fleet.nodes.read().await;
|
||||
nodes
|
||||
.get(node_name)?
|
||||
.models
|
||||
.get(model_id)?
|
||||
.limit
|
||||
.as_ref()
|
||||
.map(|l| l.output as u64)
|
||||
}
|
||||
|
||||
/// Update `last_accessed` timestamp for a model on a node (drives LRU eviction).
|
||||
async fn touch_model(fleet: &CortexState, node_name: &str, model_id: &str) {
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
|
||||
@@ -4,6 +4,7 @@ pub mod entitlements_local;
|
||||
pub mod error;
|
||||
pub mod evictor;
|
||||
pub mod handlers;
|
||||
pub mod metering;
|
||||
pub mod metrics;
|
||||
pub mod poller;
|
||||
pub mod proxy;
|
||||
|
||||
219
crates/cortex-gateway/src/metering.rs
Normal file
219
crates/cortex-gateway/src/metering.rs
Normal file
@@ -0,0 +1,219 @@
|
||||
//! Per-request token metering (#51).
|
||||
//!
|
||||
//! Captures the real `(prompt, completion)` usage of every request and feeds
|
||||
//! it to two places: the [`EntitlementProvider`] spend ledger (via
|
||||
//! reserve→settle) and per-principal Prometheus counters. The principal is
|
||||
//! reconstructed from the internal headers the auth middleware stamped (#49),
|
||||
//! so this works uniformly across every proxy path without threading the
|
||||
//! typed principal through each handler.
|
||||
//!
|
||||
//! The reserve→settle lifecycle is established here but, in this phase,
|
||||
//! reserves **zero** tokens — metering only, no enforcement. Budget
|
||||
//! enforcement (#52) flips the reserved amount to the real
|
||||
//! `prompt + max_output` and handles the [`BudgetError`] rejection; the
|
||||
//! settle/release plumbing is identical, so that change is localized.
|
||||
//!
|
||||
//! [`ReservationGuard`] makes leaks impossible: settling records actual
|
||||
//! spend and releases the unused remainder; dropping a guard that was never
|
||||
//! settled releases the whole reservation. So an early return, error path,
|
||||
//! or dropped stream can't strand a reservation.
|
||||
|
||||
use axum::http::HeaderMap;
|
||||
use cortex_core::entitlements::{
|
||||
BudgetError, EntitlementProvider, HEADER_ACCOUNT_ID, HEADER_KEY_ID, Principal,
|
||||
};
|
||||
use cortex_core::error_envelope::OpenAiError;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Fallback output-token budget when neither the request nor the model's
|
||||
/// advertised limit gives one. Bounds the reservation so a capped key is
|
||||
/// still gated even on under-specified requests (#52).
|
||||
pub const FALLBACK_MAX_OUTPUT: u64 = 4096;
|
||||
|
||||
/// Invoked exactly once at request completion with best-effort
|
||||
/// `(prompt_tokens, completion_tokens)`. When no usage could be observed
|
||||
/// (e.g. a pre-dispatch failure or a dropped stream) it is dropped unused —
|
||||
/// which releases the held reservation via [`ReservationGuard`]'s `Drop`.
|
||||
pub type UsageSink = Box<dyn FnOnce(u64, u64) + Send>;
|
||||
|
||||
/// Reconstruct the principal from the cortex-stamped internal headers. The
|
||||
/// auth middleware strips any client copy and stamps the authoritative value,
|
||||
/// so these headers are trustworthy within cortex. `None` for anonymous
|
||||
/// (unauthenticated) requests.
|
||||
pub fn principal_from_headers(headers: &HeaderMap) -> Option<Principal> {
|
||||
let account_id = headers.get(HEADER_ACCOUNT_ID)?.to_str().ok()?.to_string();
|
||||
let key_id = headers.get(HEADER_KEY_ID)?.to_str().ok()?.to_string();
|
||||
Some(Principal { account_id, key_id })
|
||||
}
|
||||
|
||||
/// Emit per-principal spend counters (#51). Labelled by account/key only —
|
||||
/// both are operator-bounded, so cardinality is controlled.
|
||||
pub fn record_spend(principal: &Principal, prompt: u64, completion: u64) {
|
||||
let labels = [
|
||||
("account", principal.account_id.clone()),
|
||||
("key", principal.key_id.clone()),
|
||||
];
|
||||
metrics::counter!("cortex_spend_tokens_total", &labels).increment(prompt + completion);
|
||||
metrics::counter!("cortex_spend_prompt_tokens_total", &labels).increment(prompt);
|
||||
metrics::counter!("cortex_spend_completion_tokens_total", &labels).increment(completion);
|
||||
}
|
||||
|
||||
/// Holds a budget reservation for the life of a request. [`settle`] records
|
||||
/// actual spend and releases the remainder; an un-settled guard releases the
|
||||
/// whole reservation when dropped. Anonymous requests carry an empty guard,
|
||||
/// where every operation is a no-op.
|
||||
///
|
||||
/// [`settle`]: ReservationGuard::settle
|
||||
pub struct ReservationGuard {
|
||||
provider: Arc<dyn EntitlementProvider>,
|
||||
reservation: Option<cortex_core::entitlements::Reservation>,
|
||||
}
|
||||
|
||||
impl ReservationGuard {
|
||||
/// An empty guard for an anonymous request — no reservation to resolve.
|
||||
pub fn anonymous(provider: Arc<dyn EntitlementProvider>) -> Self {
|
||||
Self {
|
||||
provider,
|
||||
reservation: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap an already-acquired reservation.
|
||||
fn held(
|
||||
provider: Arc<dyn EntitlementProvider>,
|
||||
reservation: cortex_core::entitlements::Reservation,
|
||||
) -> Self {
|
||||
Self {
|
||||
provider,
|
||||
reservation: Some(reservation),
|
||||
}
|
||||
}
|
||||
|
||||
/// Settle with the tokens actually consumed, disarming the drop-release.
|
||||
/// Spawns the (fast, in-process for the local provider) settle so the
|
||||
/// caller — which may be a sync stream-completion callback — needn't
|
||||
/// await.
|
||||
pub fn settle(mut self, actual_tokens: u64) {
|
||||
if let Some(reservation) = self.reservation.take() {
|
||||
let provider = Arc::clone(&self.provider);
|
||||
tokio::spawn(async move {
|
||||
provider.settle(reservation, actual_tokens).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ReservationGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Some(reservation) = self.reservation.take() {
|
||||
let provider = Arc::clone(&self.provider);
|
||||
tokio::spawn(async move {
|
||||
provider.release(reservation).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the completion sink for an authenticated request: record spend and
|
||||
/// settle the reservation with the observed total. Dropping it unused (no
|
||||
/// usage observed) releases the reservation via the guard.
|
||||
pub fn usage_sink(principal: Principal, guard: ReservationGuard) -> UsageSink {
|
||||
Box::new(move |prompt, completion| {
|
||||
record_spend(&principal, prompt, completion);
|
||||
guard.settle(prompt + completion);
|
||||
})
|
||||
}
|
||||
|
||||
/// Reserve the request's upper-bound token cost for the principal, refusing
|
||||
/// *before* dispatch if it would exceed the hard cap (#52). On success
|
||||
/// returns a guard the caller settles with actual usage; on refusal returns
|
||||
/// the #63 envelope (`rate_limit_exceeded` + `Retry-After` for a resetting
|
||||
/// window, `insufficient_quota` for a hard balance — never `402`).
|
||||
pub async fn reserve_or_reject(
|
||||
provider: Arc<dyn EntitlementProvider>,
|
||||
principal: &Principal,
|
||||
max_tokens: u64,
|
||||
) -> Result<ReservationGuard, OpenAiError> {
|
||||
match provider.reserve(principal, max_tokens).await {
|
||||
Ok(reservation) => Ok(ReservationGuard::held(provider, reservation)),
|
||||
Err(err) => Err(budget_error_to_envelope(err)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Map a [`BudgetError`] to the #63 envelope. The provider chose the window
|
||||
/// semantics; this only translates them to HTTP.
|
||||
fn budget_error_to_envelope(err: BudgetError) -> OpenAiError {
|
||||
match err {
|
||||
BudgetError::RateLimited {
|
||||
retry_after_secs, ..
|
||||
} => OpenAiError::rate_limit_exceeded(err.to_string(), retry_after_secs),
|
||||
BudgetError::InsufficientQuota { .. } => OpenAiError::insufficient_quota(err.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Upper-bound tokens to reserve for a request (#52): an over-estimate of
|
||||
/// the prompt plus the maximum output. `advertised_output` is the model's
|
||||
/// `limit.output` (#62), used when the request omits `max_(completion_)tokens`.
|
||||
/// Over-reserving is safe — settle corrects spend to the actual usage.
|
||||
pub fn reservation_estimate(body: &[u8], advertised_output: Option<u64>) -> u64 {
|
||||
let max_output = requested_max_output(body)
|
||||
.or(advertised_output)
|
||||
.unwrap_or(FALLBACK_MAX_OUTPUT);
|
||||
estimate_prompt_tokens(body).saturating_add(max_output)
|
||||
}
|
||||
|
||||
/// The client's requested output cap, from `max_completion_tokens` (or the
|
||||
/// legacy `max_tokens`). `None` when unspecified.
|
||||
fn requested_max_output(body: &[u8]) -> Option<u64> {
|
||||
let v: serde_json::Value = serde_json::from_slice(body).ok()?;
|
||||
v.get("max_completion_tokens")
|
||||
.or_else(|| v.get("max_tokens"))
|
||||
.and_then(serde_json::Value::as_u64)
|
||||
}
|
||||
|
||||
/// Rough prompt-token estimate at ~4 chars/token over the whole body. cortex
|
||||
/// has no tokenizer; JSON overhead makes this a conservative over-estimate,
|
||||
/// and neuron remains the exact context wall (#56/#60). Settle reconciles to
|
||||
/// the real usage afterward.
|
||||
fn estimate_prompt_tokens(body: &[u8]) -> u64 {
|
||||
(body.len() as u64 / 4).max(1)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn requested_max_output_prefers_max_completion_tokens() {
|
||||
let body = br#"{"model":"m","max_completion_tokens":256,"max_tokens":99}"#;
|
||||
assert_eq!(requested_max_output(body), Some(256));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn requested_max_output_falls_back_to_legacy_max_tokens() {
|
||||
let body = br#"{"model":"m","max_tokens":128}"#;
|
||||
assert_eq!(requested_max_output(body), Some(128));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn estimate_uses_requested_output_when_present() {
|
||||
// Requested output dominates; prompt estimate is small for a tiny body.
|
||||
let body = br#"{"model":"m","max_tokens":1000}"#;
|
||||
let est = reservation_estimate(body, Some(8192));
|
||||
assert!(est >= 1000 && est < 1100, "est was {est}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn estimate_uses_advertised_output_when_request_omits_it() {
|
||||
let body = br#"{"model":"m","messages":[]}"#;
|
||||
let est = reservation_estimate(body, Some(8192));
|
||||
assert!(est >= 8192, "est was {est}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn estimate_falls_back_when_nothing_advertised() {
|
||||
let body = br#"{"model":"m"}"#;
|
||||
let est = reservation_estimate(body, None);
|
||||
assert!(est >= FALLBACK_MAX_OUTPUT, "est was {est}");
|
||||
}
|
||||
}
|
||||
@@ -63,4 +63,16 @@ fn describe_metrics() {
|
||||
"cortex_cold_starts_total",
|
||||
"Total number of cold-start model loads"
|
||||
);
|
||||
metrics::describe_counter!(
|
||||
"cortex_spend_tokens_total",
|
||||
"Total metered tokens (prompt + completion) per principal, labelled by account/key (#51)"
|
||||
);
|
||||
metrics::describe_counter!(
|
||||
"cortex_spend_prompt_tokens_total",
|
||||
"Metered prompt tokens per principal, labelled by account/key (#51)"
|
||||
);
|
||||
metrics::describe_counter!(
|
||||
"cortex_spend_completion_tokens_total",
|
||||
"Metered completion tokens per principal, labelled by account/key (#51)"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ pub async fn forward_request(
|
||||
headers: HeaderMap,
|
||||
body: bytes::Bytes,
|
||||
model_id: &str,
|
||||
usage_sink: Option<crate::metering::UsageSink>,
|
||||
) -> Result<Response, ProxyError> {
|
||||
let request_start = Instant::now();
|
||||
let url = format!("{}{}", route.endpoint, path);
|
||||
@@ -82,7 +83,7 @@ pub async fn forward_request(
|
||||
let resp_headers = upstream_resp.headers().clone();
|
||||
let stream = TokenMetricsStream::new(
|
||||
Box::pin(upstream_resp.bytes_stream()),
|
||||
TokenMetrics::new(model_id, &route.node_name, request_start),
|
||||
TokenMetrics::new(model_id, &route.node_name, request_start, usage_sink),
|
||||
);
|
||||
|
||||
let body = Body::from_stream(stream);
|
||||
@@ -186,10 +187,19 @@ struct TokenMetrics {
|
||||
last_chunk: Option<Instant>,
|
||||
tail: String,
|
||||
finished: bool,
|
||||
/// Per-principal metering hook (#51). Invoked exactly once in `finish`
|
||||
/// with the observed `(prompt, completion)` so the reservation can be
|
||||
/// settled and spend recorded. `None` for anonymous requests.
|
||||
usage_sink: Option<crate::metering::UsageSink>,
|
||||
}
|
||||
|
||||
impl TokenMetrics {
|
||||
fn new(model_id: &str, node_name: &str, request_start: Instant) -> Self {
|
||||
fn new(
|
||||
model_id: &str,
|
||||
node_name: &str,
|
||||
request_start: Instant,
|
||||
usage_sink: Option<crate::metering::UsageSink>,
|
||||
) -> Self {
|
||||
Self {
|
||||
labels: [
|
||||
("model", model_id.to_string()),
|
||||
@@ -200,6 +210,7 @@ impl TokenMetrics {
|
||||
last_chunk: None,
|
||||
tail: String::new(),
|
||||
finished: false,
|
||||
usage_sink,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -227,36 +238,45 @@ impl TokenMetrics {
|
||||
return;
|
||||
}
|
||||
self.finished = true;
|
||||
let Some(first) = self.first_chunk else {
|
||||
return; // no body ever arrived — nothing to record
|
||||
};
|
||||
let ttft = first.duration_since(self.request_start).as_secs_f64();
|
||||
metrics::histogram!("cortex_time_to_first_token_seconds", &self.labels).record(ttft);
|
||||
|
||||
if let Some(prompt) = last_count_for(&self.tail, "prompt_tokens") {
|
||||
metrics::counter!("cortex_prompt_tokens_total", &self.labels).increment(prompt);
|
||||
}
|
||||
let Some(completion) = last_count_for(&self.tail, "completion_tokens") else {
|
||||
return;
|
||||
};
|
||||
if completion == 0 {
|
||||
return;
|
||||
}
|
||||
metrics::counter!("cortex_completion_tokens_total", &self.labels).increment(completion);
|
||||
let prompt = last_count_for(&self.tail, "prompt_tokens");
|
||||
let completion = last_count_for(&self.tail, "completion_tokens");
|
||||
|
||||
let last = self.last_chunk.unwrap_or(first);
|
||||
let decode_window = last.duration_since(first).as_secs_f64();
|
||||
// Streaming: rate over the decode window (first→last chunk).
|
||||
// Non-streaming bodies arrive as ~one chunk (window ≈ 0), where
|
||||
// the only honest denominator is the full request duration.
|
||||
let secs = if decode_window >= 0.1 {
|
||||
decode_window
|
||||
} else {
|
||||
last.duration_since(self.request_start).as_secs_f64()
|
||||
};
|
||||
if secs > 0.0 {
|
||||
metrics::histogram!("cortex_tokens_per_second", &self.labels)
|
||||
.record(completion as f64 / secs);
|
||||
// Per-model metrics — only when body chunks actually arrived.
|
||||
if let Some(first) = self.first_chunk {
|
||||
let ttft = first.duration_since(self.request_start).as_secs_f64();
|
||||
metrics::histogram!("cortex_time_to_first_token_seconds", &self.labels).record(ttft);
|
||||
|
||||
if let Some(prompt) = prompt {
|
||||
metrics::counter!("cortex_prompt_tokens_total", &self.labels).increment(prompt);
|
||||
}
|
||||
if let Some(completion) = completion.filter(|c| *c > 0) {
|
||||
metrics::counter!("cortex_completion_tokens_total", &self.labels)
|
||||
.increment(completion);
|
||||
|
||||
let last = self.last_chunk.unwrap_or(first);
|
||||
let decode_window = last.duration_since(first).as_secs_f64();
|
||||
// Streaming: rate over the decode window (first→last chunk).
|
||||
// Non-streaming bodies arrive as ~one chunk (window ≈ 0),
|
||||
// where the only honest denominator is the full request
|
||||
// duration.
|
||||
let secs = if decode_window >= 0.1 {
|
||||
decode_window
|
||||
} else {
|
||||
last.duration_since(self.request_start).as_secs_f64()
|
||||
};
|
||||
if secs > 0.0 {
|
||||
metrics::histogram!("cortex_tokens_per_second", &self.labels)
|
||||
.record(completion as f64 / secs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Per-principal metering + reservation settle (#51). Always runs so
|
||||
// the reservation is resolved even when no usage/body was observed
|
||||
// (sink with (0, 0) → settle 0 → release).
|
||||
if let Some(sink) = self.usage_sink.take() {
|
||||
sink(prompt.unwrap_or(0), completion.unwrap_or(0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
253
crates/cortex-gateway/tests/budget_enforcement.rs
Normal file
253
crates/cortex-gateway/tests/budget_enforcement.rs
Normal file
@@ -0,0 +1,253 @@
|
||||
//! Integration tests for budget enforcement (#52) — the A0 seatbelt.
|
||||
//!
|
||||
//! A reservation over the key's hard cap is refused *before* neuron is hit,
|
||||
//! with the #63 code matching the cap-window semantics (rate_limit_exceeded
|
||||
//! + Retry-After for a resetting window, insufficient_quota for a hard
|
||||
//! balance). Spend never exceeds the cap. No 402, ever.
|
||||
|
||||
use axum::Json;
|
||||
use axum::extract::Path;
|
||||
use axum::routing::{get, post};
|
||||
use cortex_core::config::{
|
||||
ApiKeyConfig, EntitlementsConfig, EvictionSettings, EvictionStrategy, GatewayConfig,
|
||||
GatewaySettings, NeuronEndpoint,
|
||||
};
|
||||
use cortex_core::entitlements::{CapWindow, Principal};
|
||||
use cortex_core::node::{ModelEntry, ModelStatus};
|
||||
use cortex_gateway::state::CortexState;
|
||||
use serde_json::{Value, json};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
/// Mock neuron with a hit counter on the inference path, so a test can prove
|
||||
/// a request was (or wasn't) dispatched.
|
||||
async fn spawn_counting_neuron() -> (String, Arc<AtomicU64>) {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let base_url = format!("http://{addr}");
|
||||
let inference_url = base_url.clone();
|
||||
let hits = Arc::new(AtomicU64::new(0));
|
||||
let sink = Arc::clone(&hits);
|
||||
|
||||
let app = axum::Router::new()
|
||||
.route(
|
||||
"/models/{model_id}/endpoint",
|
||||
get(move |Path(_): Path<String>| {
|
||||
let url = inference_url.clone();
|
||||
async move { Json(json!({ "url": url })) }
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/v1/chat/completions",
|
||||
post(move |Json(body): Json<Value>| {
|
||||
let sink = Arc::clone(&sink);
|
||||
async move {
|
||||
sink.fetch_add(1, Ordering::SeqCst);
|
||||
let model = body.get("model").and_then(Value::as_str).unwrap_or("m");
|
||||
Json(json!({
|
||||
"id": "chatcmpl-budget",
|
||||
"object": "chat.completion",
|
||||
"created": 1700000000_u64,
|
||||
"model": model,
|
||||
"choices": [{"index": 0, "message": {"role": "assistant", "content": "ok"}, "finish_reason": "stop"}],
|
||||
"usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}
|
||||
}))
|
||||
}
|
||||
}),
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
(base_url, hits)
|
||||
}
|
||||
|
||||
async fn spawn_gateway(neuron_url: &str, key: ApiKeyConfig) -> (Arc<CortexState>, String) {
|
||||
let config = GatewayConfig {
|
||||
gateway: GatewaySettings {
|
||||
listen: "127.0.0.1:0".into(),
|
||||
metrics_listen: "127.0.0.1:0".into(),
|
||||
},
|
||||
eviction: EvictionSettings {
|
||||
strategy: EvictionStrategy::Lru,
|
||||
defrag_after_cycles: 0,
|
||||
},
|
||||
neurons: vec![NeuronEndpoint {
|
||||
name: "mock-node".into(),
|
||||
endpoint: neuron_url.to_string(),
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: EntitlementsConfig {
|
||||
require_auth: true,
|
||||
keys: vec![key],
|
||||
},
|
||||
};
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
{
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
let node = nodes.get_mut("mock-node").unwrap();
|
||||
node.healthy = true;
|
||||
node.models.insert(
|
||||
"test-model".into(),
|
||||
ModelEntry {
|
||||
id: "test-model".into(),
|
||||
status: ModelStatus::Loaded,
|
||||
last_accessed: None,
|
||||
vram_estimate_mb: Some(8000),
|
||||
capabilities: Vec::new(),
|
||||
tool_call: false,
|
||||
reasoning: false,
|
||||
limit: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
let app = cortex_gateway::build_app(Arc::clone(&fleet));
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
(fleet, format!("http://{addr}"))
|
||||
}
|
||||
|
||||
fn key(window: CapWindow, hard_cap: u64) -> ApiKeyConfig {
|
||||
ApiKeyConfig {
|
||||
key: "sk-cap".into(),
|
||||
account_id: "acct-cap".into(),
|
||||
key_id: Some("key-cap".into()),
|
||||
hard_cap: Some(hard_cap),
|
||||
window,
|
||||
}
|
||||
}
|
||||
|
||||
fn chat(max_tokens: u64) -> Value {
|
||||
json!({
|
||||
"model": "test-model",
|
||||
"max_tokens": max_tokens,
|
||||
"messages": [{"role": "user", "content": "hi"}]
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn balance_over_cap_is_429_insufficient_quota_before_dispatch() {
|
||||
let (neuron, hits) = spawn_counting_neuron().await;
|
||||
// Cap far below a single request's reservation (max_tokens 1000).
|
||||
let (_fleet, gateway) = spawn_gateway(&neuron, key(CapWindow::Balance, 10)).await;
|
||||
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("{gateway}/v1/chat/completions"))
|
||||
.bearer_auth("sk-cap")
|
||||
.json(&chat(1000))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::TOO_MANY_REQUESTS);
|
||||
// Hard balance → no Retry-After.
|
||||
assert!(resp.headers().get(reqwest::header::RETRY_AFTER).is_none());
|
||||
let body: Value = resp.json().await.unwrap();
|
||||
assert_eq!(body["error"]["code"], "insufficient_quota");
|
||||
// Refused before dispatch — neuron never saw it.
|
||||
assert_eq!(hits.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rolling_over_cap_is_429_rate_limited_with_retry_after() {
|
||||
let (neuron, hits) = spawn_counting_neuron().await;
|
||||
let (_fleet, gateway) =
|
||||
spawn_gateway(&neuron, key(CapWindow::Rolling { seconds: 3600 }, 10)).await;
|
||||
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("{gateway}/v1/chat/completions"))
|
||||
.bearer_auth("sk-cap")
|
||||
.json(&chat(1000))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::TOO_MANY_REQUESTS);
|
||||
let retry = resp
|
||||
.headers()
|
||||
.get(reqwest::header::RETRY_AFTER)
|
||||
.expect("rolling-window rejection must carry Retry-After");
|
||||
assert!(retry.to_str().unwrap().parse::<u64>().unwrap() >= 1);
|
||||
let body: Value = resp.json().await.unwrap();
|
||||
assert_eq!(body["error"]["code"], "rate_limit_exceeded");
|
||||
assert_eq!(hits.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn within_cap_is_served() {
|
||||
let (neuron, hits) = spawn_counting_neuron().await;
|
||||
let (_fleet, gateway) = spawn_gateway(&neuron, key(CapWindow::Balance, 1_000_000)).await;
|
||||
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("{gateway}/v1/chat/completions"))
|
||||
.bearer_auth("sk-cap")
|
||||
.json(&chat(50))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::OK);
|
||||
let _ = resp.bytes().await.unwrap();
|
||||
assert_eq!(hits.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn a0_seatbelt_caps_a_runaway_fan_out() {
|
||||
// An Agent-Zero-style key with a modest cap: a burst of requests drains
|
||||
// it, then further requests are refused — the account stops draining and
|
||||
// spend never exceeds the cap.
|
||||
let (neuron, hits) = spawn_counting_neuron().await;
|
||||
let (fleet, gateway) = spawn_gateway(&neuron, key(CapWindow::Balance, 100)).await;
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let mut ok = 0;
|
||||
let mut refused = 0;
|
||||
for _ in 0..20 {
|
||||
let resp = client
|
||||
.post(format!("{gateway}/v1/chat/completions"))
|
||||
.bearer_auth("sk-cap")
|
||||
.json(&chat(20))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
match resp.status() {
|
||||
reqwest::StatusCode::OK => {
|
||||
ok += 1;
|
||||
let _ = resp.bytes().await.unwrap();
|
||||
}
|
||||
reqwest::StatusCode::TOO_MANY_REQUESTS => {
|
||||
refused += 1;
|
||||
let body: Value = resp.json().await.unwrap();
|
||||
assert_eq!(body["error"]["code"], "insufficient_quota");
|
||||
}
|
||||
other => panic!("unexpected status {other}"),
|
||||
}
|
||||
}
|
||||
|
||||
assert!(ok >= 1, "some requests should be served");
|
||||
assert!(refused >= 1, "the cap must eventually refuse the fan-out");
|
||||
assert_eq!(
|
||||
hits.load(Ordering::SeqCst),
|
||||
ok,
|
||||
"refused requests never dispatched"
|
||||
);
|
||||
|
||||
// Spend never exceeded the hard cap (reservation prevents overshoot).
|
||||
// Poll briefly for in-flight settles to land.
|
||||
let principal = Principal {
|
||||
account_id: "acct-cap".into(),
|
||||
key_id: "key-cap".into(),
|
||||
};
|
||||
for _ in 0..50 {
|
||||
let snap = fleet.entitlements.snapshot(&principal).await.unwrap();
|
||||
if snap.reserved == 0 {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||
}
|
||||
let snap = fleet.entitlements.snapshot(&principal).await.unwrap();
|
||||
assert!(snap.spent <= 100, "spent {} exceeded cap", snap.spent);
|
||||
}
|
||||
207
crates/cortex-gateway/tests/metering.rs
Normal file
207
crates/cortex-gateway/tests/metering.rs
Normal file
@@ -0,0 +1,207 @@
|
||||
//! Integration tests for per-request token metering (#51).
|
||||
//!
|
||||
//! Drives authenticated requests through the gateway to a mock neuron that
|
||||
//! reports a fixed `usage` object, then asserts the EntitlementProvider's
|
||||
//! spend ledger reflects cumulative per-key spend and that reservations
|
||||
//! settle to actual (no outstanding reserved tokens once requests complete).
|
||||
|
||||
mod common;
|
||||
|
||||
use cortex_core::config::{
|
||||
ApiKeyConfig, EntitlementsConfig, EvictionSettings, EvictionStrategy, GatewayConfig,
|
||||
GatewaySettings, NeuronEndpoint,
|
||||
};
|
||||
use cortex_core::entitlements::{CapWindow, Principal};
|
||||
use cortex_core::node::{ModelEntry, ModelStatus};
|
||||
use cortex_gateway::state::CortexState;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
const ACCOUNT: &str = "acct-meter";
|
||||
const KEY_ID: &str = "key-meter";
|
||||
const BEARER: &str = "sk-meter";
|
||||
|
||||
/// The mock neuron (common::spawn_mock_neuron) reports this fixed usage on
|
||||
/// every chat completion.
|
||||
const PROMPT_PER_REQ: u64 = 10;
|
||||
const COMPLETION_PER_REQ: u64 = 5;
|
||||
|
||||
async fn spawn_metered_gateway(neuron_url: &str) -> (Arc<CortexState>, String) {
|
||||
let config = GatewayConfig {
|
||||
gateway: GatewaySettings {
|
||||
listen: "127.0.0.1:0".into(),
|
||||
metrics_listen: "127.0.0.1:0".into(),
|
||||
},
|
||||
eviction: EvictionSettings {
|
||||
strategy: EvictionStrategy::Lru,
|
||||
defrag_after_cycles: 0,
|
||||
},
|
||||
neurons: vec![NeuronEndpoint {
|
||||
name: "mock-node".into(),
|
||||
endpoint: neuron_url.to_string(),
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: EntitlementsConfig {
|
||||
require_auth: true,
|
||||
keys: vec![ApiKeyConfig {
|
||||
key: BEARER.into(),
|
||||
account_id: ACCOUNT.into(),
|
||||
key_id: Some(KEY_ID.into()),
|
||||
hard_cap: Some(1_000_000),
|
||||
window: CapWindow::Balance,
|
||||
}],
|
||||
},
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
{
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
let node = nodes.get_mut("mock-node").unwrap();
|
||||
node.healthy = true;
|
||||
node.models.insert(
|
||||
"test-model".into(),
|
||||
ModelEntry {
|
||||
id: "test-model".into(),
|
||||
status: ModelStatus::Loaded,
|
||||
last_accessed: None,
|
||||
vram_estimate_mb: Some(8000),
|
||||
capabilities: Vec::new(),
|
||||
tool_call: false,
|
||||
reasoning: false,
|
||||
limit: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let app = cortex_gateway::build_app(Arc::clone(&fleet));
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
(fleet, format!("http://{addr}"))
|
||||
}
|
||||
|
||||
fn principal() -> Principal {
|
||||
Principal {
|
||||
account_id: ACCOUNT.into(),
|
||||
key_id: KEY_ID.into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Poll the provider ledger until settled spend reaches `expected` (settle
|
||||
/// runs in a spawned task after the response stream finishes) or time out.
|
||||
async fn await_spent(fleet: &CortexState, expected: u64) -> u64 {
|
||||
let principal = principal();
|
||||
for _ in 0..100 {
|
||||
let snap = fleet.entitlements.snapshot(&principal).await.unwrap();
|
||||
if snap.spent >= expected {
|
||||
return snap.spent;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||
}
|
||||
fleet.entitlements.snapshot(&principal).await.unwrap().spent
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cumulative_spend_is_metered_per_key() {
|
||||
let neuron = common::spawn_mock_neuron().await;
|
||||
let (fleet, gateway) = spawn_metered_gateway(&neuron).await;
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
const N: u64 = 3;
|
||||
for _ in 0..N {
|
||||
let resp = client
|
||||
.post(format!("{gateway}/v1/chat/completions"))
|
||||
.bearer_auth(BEARER)
|
||||
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::OK);
|
||||
// Drain the body so the response stream finishes and metering settles.
|
||||
let _ = resp.bytes().await.unwrap();
|
||||
}
|
||||
|
||||
let expected = N * (PROMPT_PER_REQ + COMPLETION_PER_REQ);
|
||||
let spent = await_spent(&fleet, expected).await;
|
||||
assert_eq!(
|
||||
spent, expected,
|
||||
"ledger must reflect cumulative per-key spend"
|
||||
);
|
||||
|
||||
// Reservations settled to actual — nothing left outstanding.
|
||||
let snap = fleet.entitlements.snapshot(&principal()).await.unwrap();
|
||||
assert_eq!(snap.reserved, 0, "all reservations must settle/release");
|
||||
assert_eq!(snap.hard_cap, Some(1_000_000));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn anonymous_request_records_no_spend() {
|
||||
// require_auth=false so the unauthenticated request is served, but with
|
||||
// no principal it must not touch any ledger.
|
||||
let neuron = common::spawn_mock_neuron().await;
|
||||
let config = GatewayConfig {
|
||||
gateway: GatewaySettings {
|
||||
listen: "127.0.0.1:0".into(),
|
||||
metrics_listen: "127.0.0.1:0".into(),
|
||||
},
|
||||
eviction: EvictionSettings {
|
||||
strategy: EvictionStrategy::Lru,
|
||||
defrag_after_cycles: 0,
|
||||
},
|
||||
neurons: vec![NeuronEndpoint {
|
||||
name: "mock-node".into(),
|
||||
endpoint: neuron.clone(),
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: EntitlementsConfig::default(),
|
||||
};
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
{
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
let node = nodes.get_mut("mock-node").unwrap();
|
||||
node.healthy = true;
|
||||
node.models.insert(
|
||||
"test-model".into(),
|
||||
ModelEntry {
|
||||
id: "test-model".into(),
|
||||
status: ModelStatus::Loaded,
|
||||
last_accessed: None,
|
||||
vram_estimate_mb: Some(8000),
|
||||
capabilities: Vec::new(),
|
||||
tool_call: false,
|
||||
reasoning: false,
|
||||
limit: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
let app = cortex_gateway::build_app(Arc::clone(&fleet));
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("http://{addr}/v1/chat/completions"))
|
||||
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::OK);
|
||||
let _ = resp.bytes().await.unwrap();
|
||||
|
||||
// An unconfigured principal has a zeroed snapshot — nothing was metered.
|
||||
let snap = fleet
|
||||
.entitlements
|
||||
.snapshot(&Principal {
|
||||
account_id: "nobody".into(),
|
||||
key_id: "nobody".into(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(snap.spent, 0);
|
||||
}
|
||||
Reference in New Issue
Block a user