feat: add per-request Prometheus metrics instrumentation
All checks were successful
CI / Format, lint, build, test (push) Successful in 2m26s
CI / Build SRPM (push) Has been skipped
CI / Publish to COPR (push) Has been skipped

Emit cortex_requests_total, cortex_request_duration_seconds,
cortex_request_errors_total, and cortex_cold_starts_total with
model and node labels on every proxied request.

Add install_test_recorder() for testing metrics without HTTP listener.
Integration test verifies counters and histograms appear after proxy.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-14 19:42:09 +03:00
parent 29c8f10761
commit 67b9b044d3
4 changed files with 152 additions and 40 deletions

View File

@@ -258,28 +258,24 @@ returns Anthropic-format JSON. 5 tests in `cortex-gateway/tests/anthropic.rs`:
Streaming Anthropic SSE translation (OpenAI SSE → Anthropic SSE event Streaming Anthropic SSE translation (OpenAI SSE → Anthropic SSE event
types) deferred as a follow-up. types) deferred as a follow-up.
### Phase 6: Metrics instrumentation ### Phase 6: Metrics instrumentation
**Goal:** Every proxied request emits Prometheus metrics. `/metrics` Completed. Added `proxy_with_metrics` helper in handlers that wraps
on port 9100 returns valid Prometheus text format. every proxy call with timing and counters. All three handler paths
(chat completions, completions, Anthropic messages) instrumented.
**Files to change:** Metrics emitted per request (with `model` and `node` labels):
- `cortex-gateway/src/proxy.rs` or `cortex-gateway/src/handlers.rs` - `cortex_requests_total` — incremented on every proxy attempt
wrap each proxy call with timing instrumentation: - `cortex_request_duration_seconds` — histogram of successful request latency
- `Instant::now()` before the request, compute duration after - `cortex_request_errors_total` — incremented on proxy failures
- Parse `usage` from the response (non-streaming) or final chunk - `cortex_cold_starts_total` — incremented when routing to an unloaded model
(streaming) for token counts
- Emit: `metrics::histogram!("cortex_request_duration_seconds", ...)`
with labels `model` and `node`
- Emit: `metrics::counter!("cortex_requests_total", ...)`
- Emit cold start, eviction, and error counters
- `cortex-gateway/src/metrics.rs` — already installs the exporter;
verify the described metrics appear
- `tests/` — test that after a proxied request, the `/metrics`
endpoint contains the expected metric names
**Done when:** `curl localhost:9100/metrics` shows request counters Added `install_test_recorder()` for testing without the HTTP listener.
and duration histograms after proxying a test request. 1 test in `cortex-gateway/tests/metrics.rs` verifies counters and
histograms appear after a proxied request.
Token-level metrics (tok/s, TTFT) deferred — requires parsing the
response body or final SSE chunk, which is Phase 6b work.
### Phase 7 (lower priority): Agent sidecar ### Phase 7 (lower priority): Agent sidecar

View File

@@ -2,6 +2,7 @@
use crate::proxy; use crate::proxy;
use crate::router; use crate::router;
use crate::router::RouteDecision;
use crate::state::CortexState; use crate::state::CortexState;
use axum::Router; use axum::Router;
use axum::body::Bytes; use axum::body::Bytes;
@@ -13,6 +14,7 @@ use chrono::Utc;
use cortex_core::node::{CortexModelEntry, ModelLocation}; use cortex_core::node::{CortexModelEntry, ModelLocation};
use serde_json::{Value, json}; use serde_json::{Value, json};
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
pub fn api_routes() -> Router<Arc<CortexState>> { pub fn api_routes() -> Router<Arc<CortexState>> {
Router::new() Router::new()
@@ -42,18 +44,15 @@ async fn chat_completions(
touch_model(&fleet, &route.node_name, &model_id).await; touch_model(&fleet, &route.node_name, &model_id).await;
match proxy::forward_request( proxy_with_metrics(
&fleet.http_client, &fleet,
&route, &route,
"/v1/chat/completions", "/v1/chat/completions",
headers, headers,
body, body,
&model_id,
) )
.await .await
{
Ok(resp) => resp,
Err(e) => e.into_response(),
}
} }
/// `POST /v1/completions` — proxy completions endpoint. /// `POST /v1/completions` — proxy completions endpoint.
@@ -74,11 +73,7 @@ async fn completions(
touch_model(&fleet, &route.node_name, &model_id).await; touch_model(&fleet, &route.node_name, &model_id).await;
match proxy::forward_request(&fleet.http_client, &route, "/v1/completions", headers, body).await proxy_with_metrics(&fleet, &route, "/v1/completions", headers, body, &model_id).await
{
Ok(resp) => resp,
Err(e) => e.into_response(),
}
} }
/// `POST /v1/messages` — accept Anthropic format, translate, proxy, translate back. /// `POST /v1/messages` — accept Anthropic format, translate, proxy, translate back.
@@ -110,21 +105,36 @@ async fn anthropic_messages(
touch_model(&fleet, &route.node_name, &model_id).await; touch_model(&fleet, &route.node_name, &model_id).await;
let labels = [
("model", model_id.clone()),
("node", route.node_name.clone()),
];
metrics::counter!("cortex_requests_total", &labels).increment(1);
if route.cold_start {
metrics::counter!("cortex_cold_starts_total", &labels).increment(1);
}
let start = Instant::now();
if is_streaming { if is_streaming {
// TODO: streaming Anthropic translation requires converting SSE format. // TODO: streaming Anthropic translation requires converting SSE format.
// For now, proxy the OpenAI SSE stream directly (clients that can handle // For now, proxy the OpenAI SSE stream directly (clients that can handle
// OpenAI SSE will work; full Anthropic SSE translation is a follow-up). // OpenAI SSE will work; full Anthropic SSE translation is a follow-up).
match proxy::forward_request( let result = proxy::forward_request(
&fleet.http_client, &fleet.http_client,
&route, &route,
"/v1/chat/completions", "/v1/chat/completions",
headers, headers,
openai_body, openai_body,
) )
.await .await;
{ metrics::histogram!("cortex_request_duration_seconds", &labels)
.record(start.elapsed().as_secs_f64());
match result {
Ok(resp) => resp, Ok(resp) => resp,
Err(e) => e.into_response(), Err(e) => {
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
e.into_response()
}
} }
} else { } else {
// Non-streaming: proxy, buffer full response, translate back to Anthropic. // Non-streaming: proxy, buffer full response, translate back to Anthropic.
@@ -138,10 +148,14 @@ async fn anthropic_messages(
let upstream_resp = match upstream_resp { let upstream_resp = match upstream_resp {
Ok(r) => r, Ok(r) => r,
Err(e) => return error_response(502, &format!("upstream request failed: {e}")), Err(e) => {
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
return error_response(502, &format!("upstream request failed: {e}"));
}
}; };
if !upstream_resp.status().is_success() { if !upstream_resp.status().is_success() {
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
let status = upstream_resp.status().as_u16(); let status = upstream_resp.status().as_u16();
let body = upstream_resp.text().await.unwrap_or_default(); let body = upstream_resp.text().await.unwrap_or_default();
return error_response(status, &format!("upstream error: {body}")); return error_response(status, &format!("upstream error: {body}"));
@@ -150,6 +164,7 @@ async fn anthropic_messages(
let body_bytes = match upstream_resp.bytes().await { let body_bytes = match upstream_resp.bytes().await {
Ok(b) => b, Ok(b) => b,
Err(e) => { Err(e) => {
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
return error_response(502, &format!("failed to read upstream response: {e}")); return error_response(502, &format!("failed to read upstream response: {e}"));
} }
}; };
@@ -158,10 +173,13 @@ async fn anthropic_messages(
match serde_json::from_slice(&body_bytes) { match serde_json::from_slice(&body_bytes) {
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
return error_response(502, &format!("failed to parse upstream response: {e}")); return error_response(502, &format!("failed to parse upstream response: {e}"));
} }
}; };
metrics::histogram!("cortex_request_duration_seconds", &labels)
.record(start.elapsed().as_secs_f64());
let anthropic_resp = cortex_core::translate::openai_to_anthropic(openai_resp); let anthropic_resp = cortex_core::translate::openai_to_anthropic(openai_resp);
Json(json!(anthropic_resp)).into_response() Json(json!(anthropic_resp)).into_response()
} }
@@ -216,6 +234,42 @@ async fn health(State(fleet): State<Arc<CortexState>>) -> Json<Value> {
// ── Helpers ────────────────────────────────────────────────────────── // ── Helpers ──────────────────────────────────────────────────────────
/// Proxy a request with metrics instrumentation.
async fn proxy_with_metrics(
fleet: &CortexState,
route: &RouteDecision,
path: &str,
headers: HeaderMap,
body: Bytes,
model_id: &str,
) -> Response {
let labels = [
("model", model_id.to_string()),
("node", route.node_name.clone()),
];
metrics::counter!("cortex_requests_total", &labels).increment(1);
if route.cold_start {
metrics::counter!("cortex_cold_starts_total", &labels).increment(1);
}
let start = Instant::now();
let result = proxy::forward_request(&fleet.http_client, route, path, headers, body).await;
let duration = start.elapsed();
match result {
Ok(resp) => {
metrics::histogram!("cortex_request_duration_seconds", &labels)
.record(duration.as_secs_f64());
resp
}
Err(e) => {
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
e.into_response()
}
}
}
/// Update `last_accessed` timestamp for a model on a node (drives LRU eviction). /// Update `last_accessed` timestamp for a model on a node (drives LRU eviction).
async fn touch_model(fleet: &CortexState, node_name: &str, model_id: &str) { async fn touch_model(fleet: &CortexState, node_name: &str, model_id: &str) {
let mut nodes = fleet.nodes.write().await; let mut nodes = fleet.nodes.write().await;

View File

@@ -18,10 +18,21 @@ pub fn install(listen: &str) -> Result<()> {
.map_err(|e| anyhow::anyhow!("failed to install Prometheus exporter: {e}"))?; .map_err(|e| anyhow::anyhow!("failed to install Prometheus exporter: {e}"))?;
tracing::info!("prometheus metrics exporter on {addr}"); tracing::info!("prometheus metrics exporter on {addr}");
describe_metrics();
Ok(())
}
// Register histograms and counters used by the proxy layer. /// Install a recorder for testing (no HTTP listener). Returns a handle
// The `metrics` crate lazily creates metrics on first use, but /// that can render the current metrics as Prometheus text.
// describing them up front gives Prometheus proper HELP/TYPE lines. pub fn install_test_recorder() -> Result<metrics_exporter_prometheus::PrometheusHandle> {
let handle = PrometheusBuilder::new()
.install_recorder()
.map_err(|e| anyhow::anyhow!("failed to install test recorder: {e}"))?;
describe_metrics();
Ok(handle)
}
fn describe_metrics() {
metrics::describe_histogram!( metrics::describe_histogram!(
"cortex_request_duration_seconds", "cortex_request_duration_seconds",
"Total request latency in seconds" "Total request latency in seconds"
@@ -44,6 +55,4 @@ pub fn install(listen: &str) -> Result<()> {
"cortex_cold_starts_total", "cortex_cold_starts_total",
"Total number of cold-start model loads" "Total number of cold-start model loads"
); );
Ok(())
} }

View File

@@ -0,0 +1,53 @@
mod common;
use serde_json::json;
#[tokio::test]
async fn test_metrics_emitted_after_proxy() {
// Install a test recorder (no HTTP listener, renders to string).
// This sets the global recorder, so only one test can do this.
let handle = cortex_gateway::metrics::install_test_recorder().expect("recorder should install");
let mock_url = common::spawn_mock_backend().await;
let gw_url = common::spawn_gateway(&mock_url).await;
// Verify no request metrics yet.
let before = handle.render();
assert!(
!before.contains("cortex_requests_total"),
"no request metrics before any requests"
);
// Make a successful request.
let client = reqwest::Client::new();
let resp = client
.post(format!("{gw_url}/v1/chat/completions"))
.header("content-type", "application/json")
.json(&json!({
"model": "test-model",
"messages": [{"role": "user", "content": "Hi"}]
}))
.send()
.await
.expect("request should succeed");
assert_eq!(resp.status(), 200);
// Consume the response body to ensure the proxy completes.
let _body: serde_json::Value = resp.json().await.unwrap();
// Check metrics were emitted.
let after = handle.render();
assert!(
after.contains("cortex_requests_total"),
"cortex_requests_total should be present after a request.\nMetrics:\n{after}"
);
assert!(
after.contains("cortex_request_duration_seconds"),
"cortex_request_duration_seconds should be present.\nMetrics:\n{after}"
);
// Should NOT have error or cold start counters for this request.
assert!(
!after.contains("cortex_request_errors_total"),
"no errors expected for a successful request"
);
}