Files
helexa/crates/cortex-gateway/src/lib.rs
rob thijssen 4f16b8c541
All checks were successful
CI / Format (push) Successful in 40s
CI / CUDA type-check (push) Successful in 1m41s
CI / Clippy (push) Successful in 2m15s
CI / Test (push) Successful in 4m28s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Resolve version stamps + change detection (push) Successful in 32s
build-prerelease / Build neuron-blackwell (push) Has been skipped
build-prerelease / Build neuron-ampere (push) Has been skipped
build-prerelease / Build neuron-ada (push) Has been skipped
build-prerelease / Package helexa-neuron-ada RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-ampere RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been skipped
build-prerelease / Lint (fmt + clippy) (push) Successful in 2m30s
build-prerelease / Build cortex binary (push) Successful in 2m49s
build-prerelease / Package cortex RPM (push) Successful in 1m24s
build-prerelease / Test (push) Successful in 5m59s
build-prerelease / Build helexa-bench binary (push) Has been skipped
build-prerelease / Package helexa-bench RPM (push) Has been skipped
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 49s
feat(#47 phase 1c): per-request token metering + spend ledger
Stage 1 accounting (#51): capture real per-request usage and feed it to
the spend ledger + per-principal metrics. Establishes the reserve→settle
lifecycle that budget enforcement (#52) will tighten.

- cortex-gateway::metering: ReservationGuard makes reservation leaks
  impossible — settle() records actual spend + releases the remainder;
  dropping an un-settled guard releases the whole reservation, so any
  early return / error / dropped stream resolves it. UsageSink is the
  completion hook; principal_from_headers reconstructs the principal from
  the middleware-stamped headers (uniform across all proxy paths, no
  handler-signature churn); record_spend emits per-principal counters.
- proxy::TokenMetrics gains an optional usage_sink, invoked exactly once
  in finish() with the observed (prompt, completion) — restructured so it
  always runs (even when no body/usage arrived → settle 0 → release),
  while preserving the existing per-model metric emissions unchanged.
- All proxy paths metered: chat/completions/responses via
  proxy_with_metrics (reserve 0 → forward_request → settle in finish);
  Anthropic non-streaming settles from the buffered body; Anthropic
  streaming (anthropic_sse) now scans the upstream frames for the usage
  object (#48) — it captured none before — and settles at pump end.
- This phase reserves 0 tokens (metering only, no enforcement); #52 flips
  the reserved amount to prompt+max_output and surfaces BudgetError. The
  settle/release plumbing is identical, so that change is localized.
- New Prometheus counters: cortex_spend_tokens_total (+ prompt/completion
  splits), labelled by account/key.

2 integration tests: cumulative per-key spend after N requests with
reservations settled to zero outstanding; anonymous requests record no
spend. Local fmt/clippy/test all green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 19:29:51 +03:00

68 lines
2.0 KiB
Rust

pub mod anthropic_sse;
pub mod auth;
pub mod entitlements_local;
pub mod error;
pub mod evictor;
pub mod handlers;
pub mod metering;
pub mod metrics;
pub mod poller;
pub mod proxy;
pub mod router;
pub mod state;
use anyhow::Result;
use axum::Router;
use axum::middleware::from_fn_with_state;
use cortex_core::config::GatewayConfig;
use std::sync::Arc;
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
/// Build the Axum application router with all routes wired up.
///
/// Layer order (outermost first): trace → CORS → auth → handlers. CORS is
/// outer to auth so preflight `OPTIONS` short-circuits before resolution;
/// auth (`require_principal`) resolves the bearer key, attaches the
/// principal, and stamps the internal principal headers before any handler
/// runs.
pub fn build_app(fleet: Arc<state::CortexState>) -> Router {
Router::new()
.merge(handlers::api_routes())
.layer(from_fn_with_state(
Arc::clone(&fleet),
auth::require_principal,
))
.layer(CorsLayer::permissive())
.layer(TraceLayer::new_for_http())
.with_state(fleet)
}
/// Start the gateway: build state from config, spawn background tasks,
/// bind the HTTP server.
pub async fn run(config: GatewayConfig) -> Result<()> {
let fleet = Arc::new(state::CortexState::from_config(&config));
// Spawn the background poller that refreshes node/model status.
let poller_fleet = Arc::clone(&fleet);
tokio::spawn(async move {
poller::poll_loop(poller_fleet).await;
});
// Spawn the evictor (reacts to VRAM pressure events from the router).
let evictor_fleet = Arc::clone(&fleet);
tokio::spawn(async move {
evictor::eviction_loop(evictor_fleet).await;
});
let app = build_app(Arc::clone(&fleet));
let listen_addr = config.gateway.listen.parse::<std::net::SocketAddr>()?;
tracing::info!("cortex listening on {listen_addr}");
let listener = tokio::net::TcpListener::bind(listen_addr).await?;
axum::serve(listener, app).await?;
Ok(())
}