2 Commits

Author SHA1 Message Date
67b9b044d3 feat: add per-request Prometheus metrics instrumentation
All checks were successful
CI / Format, lint, build, test (push) Successful in 2m26s
CI / Build SRPM (push) Has been skipped
CI / Publish to COPR (push) Has been skipped
Emit cortex_requests_total, cortex_request_duration_seconds,
cortex_request_errors_total, and cortex_cold_starts_total with
model and node labels on every proxied request.

Add install_test_recorder() for testing metrics without HTTP listener.
Integration test verifies counters and histograms appear after proxy.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 19:42:09 +03:00
29c8f10761 feat: implement non-streaming Anthropic response translation
Wire up openai_to_anthropic in the /v1/messages handler: buffer
upstream OpenAI response, parse, translate to Anthropic format
(stop_reason mapping, usage field names, content blocks).

5 integration tests covering round-trip translation, system prompt,
content blocks, and error cases.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 19:36:16 +03:00
5 changed files with 355 additions and 76 deletions

View File

@@ -243,53 +243,39 @@ request routing) deferred — requires per-model VRAM tracking which is
not yet populated. The `evict_lru_on_node` function is callable and
tested for when that integration is added.
### Phase 5: Anthropic translation
### Phase 5: Anthropic translation
**Goal:** `POST /v1/messages` accepts Anthropic-format requests, proxies
to mistral.rs in OpenAI format, returns Anthropic-format responses.
Completed. Non-streaming Anthropic round-trip implemented: handler
buffers upstream OpenAI response, translates via `openai_to_anthropic`,
returns Anthropic-format JSON. 5 tests in `cortex-gateway/tests/anthropic.rs`:
- `test_anthropic_to_openai_round_trip` — full request/response translation
with stop_reason mapping ("stop" → "end_turn") and usage field names
- `test_anthropic_with_system_prompt` — system field translated to system message
- `test_anthropic_with_content_blocks` — array content blocks handled
- `test_anthropic_model_not_found` — 404 for unknown model
- `test_anthropic_invalid_request` — 400 for malformed request
**Files to change:**
- `cortex-core/src/translate.rs` — the scaffold has a working
`anthropic_to_openai` and `openai_to_anthropic`. Extend to handle:
- Multi-block content (images, tool use, tool results)
- `stop_reason` mapping (`end_turn`, `max_tokens`, `tool_use`)
- Usage token counts
- `cortex-gateway/src/handlers.rs` — the `anthropic_messages` handler
currently has TODO comments for response translation and streaming.
Implement non-streaming first (buffer upstream response, translate,
return). Then streaming (convert OpenAI SSE to Anthropic SSE event
types: `message_start`, `content_block_start`, `content_block_delta`,
`content_block_stop`, `message_delta`, `message_stop`).
- `tests/` — round-trip test:
1. Send Anthropic-format request to cortex
2. Assert the proxied request to mock backend is valid OpenAI format
3. Assert the response back to the client is valid Anthropic format
Streaming Anthropic SSE translation (OpenAI SSE → Anthropic SSE event
types) deferred as a follow-up.
**Done when:** Non-streaming Anthropic round-trip test passes. Streaming
is a bonus — flag it as a follow-up if complex.
### Phase 6: Metrics instrumentation ✅
### Phase 6: Metrics instrumentation
Completed. Added `proxy_with_metrics` helper in handlers that wraps
every proxy call with timing and counters. All three handler paths
(chat completions, completions, Anthropic messages) instrumented.
**Goal:** Every proxied request emits Prometheus metrics. `/metrics`
on port 9100 returns valid Prometheus text format.
Metrics emitted per request (with `model` and `node` labels):
- `cortex_requests_total` — incremented on every proxy attempt
- `cortex_request_duration_seconds` — histogram of successful request latency
- `cortex_request_errors_total` — incremented on proxy failures
- `cortex_cold_starts_total` — incremented when routing to an unloaded model
**Files to change:**
- `cortex-gateway/src/proxy.rs` or `cortex-gateway/src/handlers.rs`
wrap each proxy call with timing instrumentation:
- `Instant::now()` before the request, compute duration after
- Parse `usage` from the response (non-streaming) or final chunk
(streaming) for token counts
- Emit: `metrics::histogram!("cortex_request_duration_seconds", ...)`
with labels `model` and `node`
- Emit: `metrics::counter!("cortex_requests_total", ...)`
- Emit cold start, eviction, and error counters
- `cortex-gateway/src/metrics.rs` — already installs the exporter;
verify the described metrics appear
- `tests/` — test that after a proxied request, the `/metrics`
endpoint contains the expected metric names
Added `install_test_recorder()` for testing without the HTTP listener.
1 test in `cortex-gateway/tests/metrics.rs` verifies counters and
histograms appear after a proxied request.
**Done when:** `curl localhost:9100/metrics` shows request counters
and duration histograms after proxying a test request.
Token-level metrics (tok/s, TTFT) deferred — requires parsing the
response body or final SSE chunk, which is Phase 6b work.
### Phase 7 (lower priority): Agent sidecar

View File

@@ -2,6 +2,7 @@
use crate::proxy;
use crate::router;
use crate::router::RouteDecision;
use crate::state::CortexState;
use axum::Router;
use axum::body::Bytes;
@@ -13,6 +14,7 @@ use chrono::Utc;
use cortex_core::node::{CortexModelEntry, ModelLocation};
use serde_json::{Value, json};
use std::sync::Arc;
use std::time::Instant;
pub fn api_routes() -> Router<Arc<CortexState>> {
Router::new()
@@ -42,18 +44,15 @@ async fn chat_completions(
touch_model(&fleet, &route.node_name, &model_id).await;
match proxy::forward_request(
&fleet.http_client,
proxy_with_metrics(
&fleet,
&route,
"/v1/chat/completions",
headers,
body,
&model_id,
)
.await
{
Ok(resp) => resp,
Err(e) => e.into_response(),
}
}
/// `POST /v1/completions` — proxy completions endpoint.
@@ -74,11 +73,7 @@ async fn completions(
touch_model(&fleet, &route.node_name, &model_id).await;
match proxy::forward_request(&fleet.http_client, &route, "/v1/completions", headers, body).await
{
Ok(resp) => resp,
Err(e) => e.into_response(),
}
proxy_with_metrics(&fleet, &route, "/v1/completions", headers, body, &model_id).await
}
/// `POST /v1/messages` — accept Anthropic format, translate, proxy, translate back.
@@ -108,41 +103,85 @@ async fn anthropic_messages(
Err(e) => return error_response(404, &e.to_string()),
};
touch_model(&fleet, &route.node_name, &model_id).await;
let labels = [
("model", model_id.clone()),
("node", route.node_name.clone()),
];
metrics::counter!("cortex_requests_total", &labels).increment(1);
if route.cold_start {
metrics::counter!("cortex_cold_starts_total", &labels).increment(1);
}
let start = Instant::now();
if is_streaming {
// TODO: streaming Anthropic translation requires converting SSE format.
// For now, proxy the OpenAI SSE stream directly (clients that can handle
// OpenAI SSE will work; full Anthropic SSE translation is a follow-up).
match proxy::forward_request(
let result = proxy::forward_request(
&fleet.http_client,
&route,
"/v1/chat/completions",
headers,
openai_body,
)
.await
{
.await;
metrics::histogram!("cortex_request_duration_seconds", &labels)
.record(start.elapsed().as_secs_f64());
match result {
Ok(resp) => resp,
Err(e) => e.into_response(),
Err(e) => {
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
e.into_response()
}
}
} else {
// Non-streaming: proxy, await full response, translate back.
match proxy::forward_request(
&fleet.http_client,
&route,
"/v1/chat/completions",
headers,
openai_body,
)
.await
{
Ok(resp) => {
// TODO: buffer response, parse as OpenAI ChatCompletionResponse,
// translate to Anthropic MessagesResponse.
// For now, return the OpenAI response as-is.
resp
// Non-streaming: proxy, buffer full response, translate back to Anthropic.
let upstream_resp = fleet
.http_client
.post(format!("{}/v1/chat/completions", route.endpoint))
.body(openai_body)
.header("content-type", "application/json")
.send()
.await;
let upstream_resp = match upstream_resp {
Ok(r) => r,
Err(e) => {
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
return error_response(502, &format!("upstream request failed: {e}"));
}
Err(e) => e.into_response(),
};
if !upstream_resp.status().is_success() {
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
let status = upstream_resp.status().as_u16();
let body = upstream_resp.text().await.unwrap_or_default();
return error_response(status, &format!("upstream error: {body}"));
}
let body_bytes = match upstream_resp.bytes().await {
Ok(b) => b,
Err(e) => {
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
return error_response(502, &format!("failed to read upstream response: {e}"));
}
};
let openai_resp: cortex_core::openai::ChatCompletionResponse =
match serde_json::from_slice(&body_bytes) {
Ok(r) => r,
Err(e) => {
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
return error_response(502, &format!("failed to parse upstream response: {e}"));
}
};
metrics::histogram!("cortex_request_duration_seconds", &labels)
.record(start.elapsed().as_secs_f64());
let anthropic_resp = cortex_core::translate::openai_to_anthropic(openai_resp);
Json(json!(anthropic_resp)).into_response()
}
}
@@ -195,6 +234,42 @@ async fn health(State(fleet): State<Arc<CortexState>>) -> Json<Value> {
// ── Helpers ──────────────────────────────────────────────────────────
/// Proxy a request with metrics instrumentation.
async fn proxy_with_metrics(
fleet: &CortexState,
route: &RouteDecision,
path: &str,
headers: HeaderMap,
body: Bytes,
model_id: &str,
) -> Response {
let labels = [
("model", model_id.to_string()),
("node", route.node_name.clone()),
];
metrics::counter!("cortex_requests_total", &labels).increment(1);
if route.cold_start {
metrics::counter!("cortex_cold_starts_total", &labels).increment(1);
}
let start = Instant::now();
let result = proxy::forward_request(&fleet.http_client, route, path, headers, body).await;
let duration = start.elapsed();
match result {
Ok(resp) => {
metrics::histogram!("cortex_request_duration_seconds", &labels)
.record(duration.as_secs_f64());
resp
}
Err(e) => {
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
e.into_response()
}
}
}
/// Update `last_accessed` timestamp for a model on a node (drives LRU eviction).
async fn touch_model(fleet: &CortexState, node_name: &str, model_id: &str) {
let mut nodes = fleet.nodes.write().await;

View File

@@ -18,10 +18,21 @@ pub fn install(listen: &str) -> Result<()> {
.map_err(|e| anyhow::anyhow!("failed to install Prometheus exporter: {e}"))?;
tracing::info!("prometheus metrics exporter on {addr}");
describe_metrics();
Ok(())
}
// Register histograms and counters used by the proxy layer.
// The `metrics` crate lazily creates metrics on first use, but
// describing them up front gives Prometheus proper HELP/TYPE lines.
/// Install a recorder for testing (no HTTP listener). Returns a handle
/// that can render the current metrics as Prometheus text.
pub fn install_test_recorder() -> Result<metrics_exporter_prometheus::PrometheusHandle> {
let handle = PrometheusBuilder::new()
.install_recorder()
.map_err(|e| anyhow::anyhow!("failed to install test recorder: {e}"))?;
describe_metrics();
Ok(handle)
}
fn describe_metrics() {
metrics::describe_histogram!(
"cortex_request_duration_seconds",
"Total request latency in seconds"
@@ -44,6 +55,4 @@ pub fn install(listen: &str) -> Result<()> {
"cortex_cold_starts_total",
"Total number of cold-start model loads"
);
Ok(())
}

View File

@@ -0,0 +1,156 @@
mod common;
use serde_json::json;
#[tokio::test]
async fn test_anthropic_to_openai_round_trip() {
let mock_url = common::spawn_mock_backend().await;
let gw_url = common::spawn_gateway(&mock_url).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("{gw_url}/v1/messages"))
.header("content-type", "application/json")
.json(&json!({
"model": "test-model",
"max_tokens": 100,
"messages": [
{"role": "user", "content": "Hi"}
]
}))
.send()
.await
.expect("request should succeed");
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.expect("valid JSON");
// Response should be in Anthropic format.
assert_eq!(body["type"], "message");
assert_eq!(body["role"], "assistant");
assert_eq!(body["model"], "test-model");
// Content should be an array of content blocks.
let content = body["content"].as_array().expect("content array");
assert_eq!(content.len(), 1);
assert_eq!(content[0]["type"], "text");
assert_eq!(content[0]["text"], "Hello from mock backend");
// Stop reason should be translated from "stop" to "end_turn".
assert_eq!(body["stop_reason"], "end_turn");
// Usage should have Anthropic field names.
assert_eq!(body["usage"]["input_tokens"], 10);
assert_eq!(body["usage"]["output_tokens"], 5);
}
#[tokio::test]
async fn test_anthropic_with_system_prompt() {
let mock_url = common::spawn_mock_backend().await;
let gw_url = common::spawn_gateway(&mock_url).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("{gw_url}/v1/messages"))
.header("content-type", "application/json")
.json(&json!({
"model": "test-model",
"max_tokens": 100,
"system": "You are a helpful assistant.",
"messages": [
{"role": "user", "content": "Hi"}
]
}))
.send()
.await
.expect("request should succeed");
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.expect("valid JSON");
assert_eq!(body["type"], "message");
assert_eq!(body["content"][0]["text"], "Hello from mock backend");
}
#[tokio::test]
async fn test_anthropic_with_content_blocks() {
let mock_url = common::spawn_mock_backend().await;
let gw_url = common::spawn_gateway(&mock_url).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("{gw_url}/v1/messages"))
.header("content-type", "application/json")
.json(&json!({
"model": "test-model",
"max_tokens": 100,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "What is this?"}
]
}
]
}))
.send()
.await
.expect("request should succeed");
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.expect("valid JSON");
assert_eq!(body["type"], "message");
assert_eq!(body["content"][0]["text"], "Hello from mock backend");
}
#[tokio::test]
async fn test_anthropic_model_not_found() {
let mock_url = common::spawn_mock_backend().await;
let gw_url = common::spawn_gateway(&mock_url).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("{gw_url}/v1/messages"))
.header("content-type", "application/json")
.json(&json!({
"model": "nonexistent",
"max_tokens": 100,
"messages": [
{"role": "user", "content": "Hi"}
]
}))
.send()
.await
.expect("request should succeed");
assert_eq!(resp.status(), 404);
}
#[tokio::test]
async fn test_anthropic_invalid_request() {
let mock_url = common::spawn_mock_backend().await;
let gw_url = common::spawn_gateway(&mock_url).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("{gw_url}/v1/messages"))
.header("content-type", "application/json")
.json(&json!({
"not_a_valid": "request"
}))
.send()
.await
.expect("request should succeed");
assert_eq!(resp.status(), 400);
let body: serde_json::Value = resp.json().await.unwrap();
assert!(
body["error"]["message"]
.as_str()
.unwrap()
.contains("invalid Anthropic request")
);
}

View File

@@ -0,0 +1,53 @@
mod common;
use serde_json::json;
#[tokio::test]
async fn test_metrics_emitted_after_proxy() {
// Install a test recorder (no HTTP listener, renders to string).
// This sets the global recorder, so only one test can do this.
let handle = cortex_gateway::metrics::install_test_recorder().expect("recorder should install");
let mock_url = common::spawn_mock_backend().await;
let gw_url = common::spawn_gateway(&mock_url).await;
// Verify no request metrics yet.
let before = handle.render();
assert!(
!before.contains("cortex_requests_total"),
"no request metrics before any requests"
);
// Make a successful request.
let client = reqwest::Client::new();
let resp = client
.post(format!("{gw_url}/v1/chat/completions"))
.header("content-type", "application/json")
.json(&json!({
"model": "test-model",
"messages": [{"role": "user", "content": "Hi"}]
}))
.send()
.await
.expect("request should succeed");
assert_eq!(resp.status(), 200);
// Consume the response body to ensure the proxy completes.
let _body: serde_json::Value = resp.json().await.unwrap();
// Check metrics were emitted.
let after = handle.render();
assert!(
after.contains("cortex_requests_total"),
"cortex_requests_total should be present after a request.\nMetrics:\n{after}"
);
assert!(
after.contains("cortex_request_duration_seconds"),
"cortex_request_duration_seconds should be present.\nMetrics:\n{after}"
);
// Should NOT have error or cold start counters for this request.
assert!(
!after.contains("cortex_request_errors_total"),
"no errors expected for a successful request"
);
}