From 67b9b044d35637dacd8c731da9a01b3a47af32d6 Mon Sep 17 00:00:00 2001 From: rob thijssen Date: Tue, 14 Apr 2026 19:42:09 +0300 Subject: [PATCH] feat: add per-request Prometheus metrics instrumentation Emit cortex_requests_total, cortex_request_duration_seconds, cortex_request_errors_total, and cortex_cold_starts_total with model and node labels on every proxied request. Add install_test_recorder() for testing metrics without HTTP listener. Integration test verifies counters and histograms appear after proxy. Co-Authored-By: Claude Opus 4.6 (1M context) --- CLAUDE.md | 34 +++++----- crates/cortex-gateway/src/handlers.rs | 86 +++++++++++++++++++++----- crates/cortex-gateway/src/metrics.rs | 19 ++++-- crates/cortex-gateway/tests/metrics.rs | 53 ++++++++++++++++ 4 files changed, 152 insertions(+), 40 deletions(-) create mode 100644 crates/cortex-gateway/tests/metrics.rs diff --git a/CLAUDE.md b/CLAUDE.md index 2912e85..6bfeb48 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -258,28 +258,24 @@ returns Anthropic-format JSON. 5 tests in `cortex-gateway/tests/anthropic.rs`: Streaming Anthropic SSE translation (OpenAI SSE → Anthropic SSE event types) deferred as a follow-up. -### Phase 6: Metrics instrumentation +### Phase 6: Metrics instrumentation ✅ -**Goal:** Every proxied request emits Prometheus metrics. `/metrics` -on port 9100 returns valid Prometheus text format. +Completed. Added `proxy_with_metrics` helper in handlers that wraps +every proxy call with timing and counters. All three handler paths +(chat completions, completions, Anthropic messages) instrumented. -**Files to change:** -- `cortex-gateway/src/proxy.rs` or `cortex-gateway/src/handlers.rs` — - wrap each proxy call with timing instrumentation: - - `Instant::now()` before the request, compute duration after - - Parse `usage` from the response (non-streaming) or final chunk - (streaming) for token counts - - Emit: `metrics::histogram!("cortex_request_duration_seconds", ...)` - with labels `model` and `node` - - Emit: `metrics::counter!("cortex_requests_total", ...)` - - Emit cold start, eviction, and error counters -- `cortex-gateway/src/metrics.rs` — already installs the exporter; - verify the described metrics appear -- `tests/` — test that after a proxied request, the `/metrics` - endpoint contains the expected metric names +Metrics emitted per request (with `model` and `node` labels): +- `cortex_requests_total` — incremented on every proxy attempt +- `cortex_request_duration_seconds` — histogram of successful request latency +- `cortex_request_errors_total` — incremented on proxy failures +- `cortex_cold_starts_total` — incremented when routing to an unloaded model -**Done when:** `curl localhost:9100/metrics` shows request counters -and duration histograms after proxying a test request. +Added `install_test_recorder()` for testing without the HTTP listener. +1 test in `cortex-gateway/tests/metrics.rs` verifies counters and +histograms appear after a proxied request. + +Token-level metrics (tok/s, TTFT) deferred — requires parsing the +response body or final SSE chunk, which is Phase 6b work. ### Phase 7 (lower priority): Agent sidecar diff --git a/crates/cortex-gateway/src/handlers.rs b/crates/cortex-gateway/src/handlers.rs index a2d739a..294dd3d 100644 --- a/crates/cortex-gateway/src/handlers.rs +++ b/crates/cortex-gateway/src/handlers.rs @@ -2,6 +2,7 @@ use crate::proxy; use crate::router; +use crate::router::RouteDecision; use crate::state::CortexState; use axum::Router; use axum::body::Bytes; @@ -13,6 +14,7 @@ use chrono::Utc; use cortex_core::node::{CortexModelEntry, ModelLocation}; use serde_json::{Value, json}; use std::sync::Arc; +use std::time::Instant; pub fn api_routes() -> Router> { Router::new() @@ -42,18 +44,15 @@ async fn chat_completions( touch_model(&fleet, &route.node_name, &model_id).await; - match proxy::forward_request( - &fleet.http_client, + proxy_with_metrics( + &fleet, &route, "/v1/chat/completions", headers, body, + &model_id, ) .await - { - Ok(resp) => resp, - Err(e) => e.into_response(), - } } /// `POST /v1/completions` — proxy completions endpoint. @@ -74,11 +73,7 @@ async fn completions( touch_model(&fleet, &route.node_name, &model_id).await; - match proxy::forward_request(&fleet.http_client, &route, "/v1/completions", headers, body).await - { - Ok(resp) => resp, - Err(e) => e.into_response(), - } + proxy_with_metrics(&fleet, &route, "/v1/completions", headers, body, &model_id).await } /// `POST /v1/messages` — accept Anthropic format, translate, proxy, translate back. @@ -110,21 +105,36 @@ async fn anthropic_messages( touch_model(&fleet, &route.node_name, &model_id).await; + let labels = [ + ("model", model_id.clone()), + ("node", route.node_name.clone()), + ]; + metrics::counter!("cortex_requests_total", &labels).increment(1); + if route.cold_start { + metrics::counter!("cortex_cold_starts_total", &labels).increment(1); + } + let start = Instant::now(); + if is_streaming { // TODO: streaming Anthropic translation requires converting SSE format. // For now, proxy the OpenAI SSE stream directly (clients that can handle // OpenAI SSE will work; full Anthropic SSE translation is a follow-up). - match proxy::forward_request( + let result = proxy::forward_request( &fleet.http_client, &route, "/v1/chat/completions", headers, openai_body, ) - .await - { + .await; + metrics::histogram!("cortex_request_duration_seconds", &labels) + .record(start.elapsed().as_secs_f64()); + match result { Ok(resp) => resp, - Err(e) => e.into_response(), + Err(e) => { + metrics::counter!("cortex_request_errors_total", &labels).increment(1); + e.into_response() + } } } else { // Non-streaming: proxy, buffer full response, translate back to Anthropic. @@ -138,10 +148,14 @@ async fn anthropic_messages( let upstream_resp = match upstream_resp { Ok(r) => r, - Err(e) => return error_response(502, &format!("upstream request failed: {e}")), + Err(e) => { + metrics::counter!("cortex_request_errors_total", &labels).increment(1); + return error_response(502, &format!("upstream request failed: {e}")); + } }; if !upstream_resp.status().is_success() { + metrics::counter!("cortex_request_errors_total", &labels).increment(1); let status = upstream_resp.status().as_u16(); let body = upstream_resp.text().await.unwrap_or_default(); return error_response(status, &format!("upstream error: {body}")); @@ -150,6 +164,7 @@ async fn anthropic_messages( let body_bytes = match upstream_resp.bytes().await { Ok(b) => b, Err(e) => { + metrics::counter!("cortex_request_errors_total", &labels).increment(1); return error_response(502, &format!("failed to read upstream response: {e}")); } }; @@ -158,10 +173,13 @@ async fn anthropic_messages( match serde_json::from_slice(&body_bytes) { Ok(r) => r, Err(e) => { + metrics::counter!("cortex_request_errors_total", &labels).increment(1); return error_response(502, &format!("failed to parse upstream response: {e}")); } }; + metrics::histogram!("cortex_request_duration_seconds", &labels) + .record(start.elapsed().as_secs_f64()); let anthropic_resp = cortex_core::translate::openai_to_anthropic(openai_resp); Json(json!(anthropic_resp)).into_response() } @@ -216,6 +234,42 @@ async fn health(State(fleet): State>) -> Json { // ── Helpers ────────────────────────────────────────────────────────── +/// Proxy a request with metrics instrumentation. +async fn proxy_with_metrics( + fleet: &CortexState, + route: &RouteDecision, + path: &str, + headers: HeaderMap, + body: Bytes, + model_id: &str, +) -> Response { + let labels = [ + ("model", model_id.to_string()), + ("node", route.node_name.clone()), + ]; + + metrics::counter!("cortex_requests_total", &labels).increment(1); + if route.cold_start { + metrics::counter!("cortex_cold_starts_total", &labels).increment(1); + } + + let start = Instant::now(); + let result = proxy::forward_request(&fleet.http_client, route, path, headers, body).await; + let duration = start.elapsed(); + + match result { + Ok(resp) => { + metrics::histogram!("cortex_request_duration_seconds", &labels) + .record(duration.as_secs_f64()); + resp + } + Err(e) => { + metrics::counter!("cortex_request_errors_total", &labels).increment(1); + e.into_response() + } + } +} + /// Update `last_accessed` timestamp for a model on a node (drives LRU eviction). async fn touch_model(fleet: &CortexState, node_name: &str, model_id: &str) { let mut nodes = fleet.nodes.write().await; diff --git a/crates/cortex-gateway/src/metrics.rs b/crates/cortex-gateway/src/metrics.rs index 65902ee..24656cd 100644 --- a/crates/cortex-gateway/src/metrics.rs +++ b/crates/cortex-gateway/src/metrics.rs @@ -18,10 +18,21 @@ pub fn install(listen: &str) -> Result<()> { .map_err(|e| anyhow::anyhow!("failed to install Prometheus exporter: {e}"))?; tracing::info!("prometheus metrics exporter on {addr}"); + describe_metrics(); + Ok(()) +} - // Register histograms and counters used by the proxy layer. - // The `metrics` crate lazily creates metrics on first use, but - // describing them up front gives Prometheus proper HELP/TYPE lines. +/// Install a recorder for testing (no HTTP listener). Returns a handle +/// that can render the current metrics as Prometheus text. +pub fn install_test_recorder() -> Result { + let handle = PrometheusBuilder::new() + .install_recorder() + .map_err(|e| anyhow::anyhow!("failed to install test recorder: {e}"))?; + describe_metrics(); + Ok(handle) +} + +fn describe_metrics() { metrics::describe_histogram!( "cortex_request_duration_seconds", "Total request latency in seconds" @@ -44,6 +55,4 @@ pub fn install(listen: &str) -> Result<()> { "cortex_cold_starts_total", "Total number of cold-start model loads" ); - - Ok(()) } diff --git a/crates/cortex-gateway/tests/metrics.rs b/crates/cortex-gateway/tests/metrics.rs new file mode 100644 index 0000000..a08aded --- /dev/null +++ b/crates/cortex-gateway/tests/metrics.rs @@ -0,0 +1,53 @@ +mod common; + +use serde_json::json; + +#[tokio::test] +async fn test_metrics_emitted_after_proxy() { + // Install a test recorder (no HTTP listener, renders to string). + // This sets the global recorder, so only one test can do this. + let handle = cortex_gateway::metrics::install_test_recorder().expect("recorder should install"); + + let mock_url = common::spawn_mock_backend().await; + let gw_url = common::spawn_gateway(&mock_url).await; + + // Verify no request metrics yet. + let before = handle.render(); + assert!( + !before.contains("cortex_requests_total"), + "no request metrics before any requests" + ); + + // Make a successful request. + let client = reqwest::Client::new(); + let resp = client + .post(format!("{gw_url}/v1/chat/completions")) + .header("content-type", "application/json") + .json(&json!({ + "model": "test-model", + "messages": [{"role": "user", "content": "Hi"}] + })) + .send() + .await + .expect("request should succeed"); + assert_eq!(resp.status(), 200); + // Consume the response body to ensure the proxy completes. + let _body: serde_json::Value = resp.json().await.unwrap(); + + // Check metrics were emitted. + let after = handle.render(); + + assert!( + after.contains("cortex_requests_total"), + "cortex_requests_total should be present after a request.\nMetrics:\n{after}" + ); + assert!( + after.contains("cortex_request_duration_seconds"), + "cortex_request_duration_seconds should be present.\nMetrics:\n{after}" + ); + // Should NOT have error or cold start counters for this request. + assert!( + !after.contains("cortex_request_errors_total"), + "no errors expected for a successful request" + ); +}