Compare commits
2 Commits
24c5e1e361
...
67b9b044d3
| Author | SHA1 | Date | |
|---|---|---|---|
|
67b9b044d3
|
|||
|
29c8f10761
|
66
CLAUDE.md
66
CLAUDE.md
@@ -243,53 +243,39 @@ request routing) deferred — requires per-model VRAM tracking which is
|
||||
not yet populated. The `evict_lru_on_node` function is callable and
|
||||
tested for when that integration is added.
|
||||
|
||||
### Phase 5: Anthropic translation
|
||||
### Phase 5: Anthropic translation ✅
|
||||
|
||||
**Goal:** `POST /v1/messages` accepts Anthropic-format requests, proxies
|
||||
to mistral.rs in OpenAI format, returns Anthropic-format responses.
|
||||
Completed. Non-streaming Anthropic round-trip implemented: handler
|
||||
buffers upstream OpenAI response, translates via `openai_to_anthropic`,
|
||||
returns Anthropic-format JSON. 5 tests in `cortex-gateway/tests/anthropic.rs`:
|
||||
- `test_anthropic_to_openai_round_trip` — full request/response translation
|
||||
with stop_reason mapping ("stop" → "end_turn") and usage field names
|
||||
- `test_anthropic_with_system_prompt` — system field translated to system message
|
||||
- `test_anthropic_with_content_blocks` — array content blocks handled
|
||||
- `test_anthropic_model_not_found` — 404 for unknown model
|
||||
- `test_anthropic_invalid_request` — 400 for malformed request
|
||||
|
||||
**Files to change:**
|
||||
- `cortex-core/src/translate.rs` — the scaffold has a working
|
||||
`anthropic_to_openai` and `openai_to_anthropic`. Extend to handle:
|
||||
- Multi-block content (images, tool use, tool results)
|
||||
- `stop_reason` mapping (`end_turn`, `max_tokens`, `tool_use`)
|
||||
- Usage token counts
|
||||
- `cortex-gateway/src/handlers.rs` — the `anthropic_messages` handler
|
||||
currently has TODO comments for response translation and streaming.
|
||||
Implement non-streaming first (buffer upstream response, translate,
|
||||
return). Then streaming (convert OpenAI SSE to Anthropic SSE event
|
||||
types: `message_start`, `content_block_start`, `content_block_delta`,
|
||||
`content_block_stop`, `message_delta`, `message_stop`).
|
||||
- `tests/` — round-trip test:
|
||||
1. Send Anthropic-format request to cortex
|
||||
2. Assert the proxied request to mock backend is valid OpenAI format
|
||||
3. Assert the response back to the client is valid Anthropic format
|
||||
Streaming Anthropic SSE translation (OpenAI SSE → Anthropic SSE event
|
||||
types) deferred as a follow-up.
|
||||
|
||||
**Done when:** Non-streaming Anthropic round-trip test passes. Streaming
|
||||
is a bonus — flag it as a follow-up if complex.
|
||||
### Phase 6: Metrics instrumentation ✅
|
||||
|
||||
### Phase 6: Metrics instrumentation
|
||||
Completed. Added `proxy_with_metrics` helper in handlers that wraps
|
||||
every proxy call with timing and counters. All three handler paths
|
||||
(chat completions, completions, Anthropic messages) instrumented.
|
||||
|
||||
**Goal:** Every proxied request emits Prometheus metrics. `/metrics`
|
||||
on port 9100 returns valid Prometheus text format.
|
||||
Metrics emitted per request (with `model` and `node` labels):
|
||||
- `cortex_requests_total` — incremented on every proxy attempt
|
||||
- `cortex_request_duration_seconds` — histogram of successful request latency
|
||||
- `cortex_request_errors_total` — incremented on proxy failures
|
||||
- `cortex_cold_starts_total` — incremented when routing to an unloaded model
|
||||
|
||||
**Files to change:**
|
||||
- `cortex-gateway/src/proxy.rs` or `cortex-gateway/src/handlers.rs` —
|
||||
wrap each proxy call with timing instrumentation:
|
||||
- `Instant::now()` before the request, compute duration after
|
||||
- Parse `usage` from the response (non-streaming) or final chunk
|
||||
(streaming) for token counts
|
||||
- Emit: `metrics::histogram!("cortex_request_duration_seconds", ...)`
|
||||
with labels `model` and `node`
|
||||
- Emit: `metrics::counter!("cortex_requests_total", ...)`
|
||||
- Emit cold start, eviction, and error counters
|
||||
- `cortex-gateway/src/metrics.rs` — already installs the exporter;
|
||||
verify the described metrics appear
|
||||
- `tests/` — test that after a proxied request, the `/metrics`
|
||||
endpoint contains the expected metric names
|
||||
Added `install_test_recorder()` for testing without the HTTP listener.
|
||||
1 test in `cortex-gateway/tests/metrics.rs` verifies counters and
|
||||
histograms appear after a proxied request.
|
||||
|
||||
**Done when:** `curl localhost:9100/metrics` shows request counters
|
||||
and duration histograms after proxying a test request.
|
||||
Token-level metrics (tok/s, TTFT) deferred — requires parsing the
|
||||
response body or final SSE chunk, which is Phase 6b work.
|
||||
|
||||
### Phase 7 (lower priority): Agent sidecar
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
use crate::proxy;
|
||||
use crate::router;
|
||||
use crate::router::RouteDecision;
|
||||
use crate::state::CortexState;
|
||||
use axum::Router;
|
||||
use axum::body::Bytes;
|
||||
@@ -13,6 +14,7 @@ use chrono::Utc;
|
||||
use cortex_core::node::{CortexModelEntry, ModelLocation};
|
||||
use serde_json::{Value, json};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
pub fn api_routes() -> Router<Arc<CortexState>> {
|
||||
Router::new()
|
||||
@@ -42,18 +44,15 @@ async fn chat_completions(
|
||||
|
||||
touch_model(&fleet, &route.node_name, &model_id).await;
|
||||
|
||||
match proxy::forward_request(
|
||||
&fleet.http_client,
|
||||
proxy_with_metrics(
|
||||
&fleet,
|
||||
&route,
|
||||
"/v1/chat/completions",
|
||||
headers,
|
||||
body,
|
||||
&model_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(resp) => resp,
|
||||
Err(e) => e.into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
/// `POST /v1/completions` — proxy completions endpoint.
|
||||
@@ -74,11 +73,7 @@ async fn completions(
|
||||
|
||||
touch_model(&fleet, &route.node_name, &model_id).await;
|
||||
|
||||
match proxy::forward_request(&fleet.http_client, &route, "/v1/completions", headers, body).await
|
||||
{
|
||||
Ok(resp) => resp,
|
||||
Err(e) => e.into_response(),
|
||||
}
|
||||
proxy_with_metrics(&fleet, &route, "/v1/completions", headers, body, &model_id).await
|
||||
}
|
||||
|
||||
/// `POST /v1/messages` — accept Anthropic format, translate, proxy, translate back.
|
||||
@@ -108,41 +103,85 @@ async fn anthropic_messages(
|
||||
Err(e) => return error_response(404, &e.to_string()),
|
||||
};
|
||||
|
||||
touch_model(&fleet, &route.node_name, &model_id).await;
|
||||
|
||||
let labels = [
|
||||
("model", model_id.clone()),
|
||||
("node", route.node_name.clone()),
|
||||
];
|
||||
metrics::counter!("cortex_requests_total", &labels).increment(1);
|
||||
if route.cold_start {
|
||||
metrics::counter!("cortex_cold_starts_total", &labels).increment(1);
|
||||
}
|
||||
let start = Instant::now();
|
||||
|
||||
if is_streaming {
|
||||
// TODO: streaming Anthropic translation requires converting SSE format.
|
||||
// For now, proxy the OpenAI SSE stream directly (clients that can handle
|
||||
// OpenAI SSE will work; full Anthropic SSE translation is a follow-up).
|
||||
match proxy::forward_request(
|
||||
let result = proxy::forward_request(
|
||||
&fleet.http_client,
|
||||
&route,
|
||||
"/v1/chat/completions",
|
||||
headers,
|
||||
openai_body,
|
||||
)
|
||||
.await
|
||||
{
|
||||
.await;
|
||||
metrics::histogram!("cortex_request_duration_seconds", &labels)
|
||||
.record(start.elapsed().as_secs_f64());
|
||||
match result {
|
||||
Ok(resp) => resp,
|
||||
Err(e) => e.into_response(),
|
||||
Err(e) => {
|
||||
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
|
||||
e.into_response()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Non-streaming: proxy, await full response, translate back.
|
||||
match proxy::forward_request(
|
||||
&fleet.http_client,
|
||||
&route,
|
||||
"/v1/chat/completions",
|
||||
headers,
|
||||
openai_body,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(resp) => {
|
||||
// TODO: buffer response, parse as OpenAI ChatCompletionResponse,
|
||||
// translate to Anthropic MessagesResponse.
|
||||
// For now, return the OpenAI response as-is.
|
||||
resp
|
||||
// Non-streaming: proxy, buffer full response, translate back to Anthropic.
|
||||
let upstream_resp = fleet
|
||||
.http_client
|
||||
.post(format!("{}/v1/chat/completions", route.endpoint))
|
||||
.body(openai_body)
|
||||
.header("content-type", "application/json")
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let upstream_resp = match upstream_resp {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
|
||||
return error_response(502, &format!("upstream request failed: {e}"));
|
||||
}
|
||||
Err(e) => e.into_response(),
|
||||
};
|
||||
|
||||
if !upstream_resp.status().is_success() {
|
||||
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
|
||||
let status = upstream_resp.status().as_u16();
|
||||
let body = upstream_resp.text().await.unwrap_or_default();
|
||||
return error_response(status, &format!("upstream error: {body}"));
|
||||
}
|
||||
|
||||
let body_bytes = match upstream_resp.bytes().await {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
|
||||
return error_response(502, &format!("failed to read upstream response: {e}"));
|
||||
}
|
||||
};
|
||||
|
||||
let openai_resp: cortex_core::openai::ChatCompletionResponse =
|
||||
match serde_json::from_slice(&body_bytes) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
|
||||
return error_response(502, &format!("failed to parse upstream response: {e}"));
|
||||
}
|
||||
};
|
||||
|
||||
metrics::histogram!("cortex_request_duration_seconds", &labels)
|
||||
.record(start.elapsed().as_secs_f64());
|
||||
let anthropic_resp = cortex_core::translate::openai_to_anthropic(openai_resp);
|
||||
Json(json!(anthropic_resp)).into_response()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,6 +234,42 @@ async fn health(State(fleet): State<Arc<CortexState>>) -> Json<Value> {
|
||||
|
||||
// ── Helpers ──────────────────────────────────────────────────────────
|
||||
|
||||
/// Proxy a request with metrics instrumentation.
|
||||
async fn proxy_with_metrics(
|
||||
fleet: &CortexState,
|
||||
route: &RouteDecision,
|
||||
path: &str,
|
||||
headers: HeaderMap,
|
||||
body: Bytes,
|
||||
model_id: &str,
|
||||
) -> Response {
|
||||
let labels = [
|
||||
("model", model_id.to_string()),
|
||||
("node", route.node_name.clone()),
|
||||
];
|
||||
|
||||
metrics::counter!("cortex_requests_total", &labels).increment(1);
|
||||
if route.cold_start {
|
||||
metrics::counter!("cortex_cold_starts_total", &labels).increment(1);
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
let result = proxy::forward_request(&fleet.http_client, route, path, headers, body).await;
|
||||
let duration = start.elapsed();
|
||||
|
||||
match result {
|
||||
Ok(resp) => {
|
||||
metrics::histogram!("cortex_request_duration_seconds", &labels)
|
||||
.record(duration.as_secs_f64());
|
||||
resp
|
||||
}
|
||||
Err(e) => {
|
||||
metrics::counter!("cortex_request_errors_total", &labels).increment(1);
|
||||
e.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update `last_accessed` timestamp for a model on a node (drives LRU eviction).
|
||||
async fn touch_model(fleet: &CortexState, node_name: &str, model_id: &str) {
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
|
||||
@@ -18,10 +18,21 @@ pub fn install(listen: &str) -> Result<()> {
|
||||
.map_err(|e| anyhow::anyhow!("failed to install Prometheus exporter: {e}"))?;
|
||||
|
||||
tracing::info!("prometheus metrics exporter on {addr}");
|
||||
describe_metrics();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Register histograms and counters used by the proxy layer.
|
||||
// The `metrics` crate lazily creates metrics on first use, but
|
||||
// describing them up front gives Prometheus proper HELP/TYPE lines.
|
||||
/// Install a recorder for testing (no HTTP listener). Returns a handle
|
||||
/// that can render the current metrics as Prometheus text.
|
||||
pub fn install_test_recorder() -> Result<metrics_exporter_prometheus::PrometheusHandle> {
|
||||
let handle = PrometheusBuilder::new()
|
||||
.install_recorder()
|
||||
.map_err(|e| anyhow::anyhow!("failed to install test recorder: {e}"))?;
|
||||
describe_metrics();
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
fn describe_metrics() {
|
||||
metrics::describe_histogram!(
|
||||
"cortex_request_duration_seconds",
|
||||
"Total request latency in seconds"
|
||||
@@ -44,6 +55,4 @@ pub fn install(listen: &str) -> Result<()> {
|
||||
"cortex_cold_starts_total",
|
||||
"Total number of cold-start model loads"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
156
crates/cortex-gateway/tests/anthropic.rs
Normal file
156
crates/cortex-gateway/tests/anthropic.rs
Normal file
@@ -0,0 +1,156 @@
|
||||
mod common;
|
||||
|
||||
use serde_json::json;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_anthropic_to_openai_round_trip() {
|
||||
let mock_url = common::spawn_mock_backend().await;
|
||||
let gw_url = common::spawn_gateway(&mock_url).await;
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let resp = client
|
||||
.post(format!("{gw_url}/v1/messages"))
|
||||
.header("content-type", "application/json")
|
||||
.json(&json!({
|
||||
"model": "test-model",
|
||||
"max_tokens": 100,
|
||||
"messages": [
|
||||
{"role": "user", "content": "Hi"}
|
||||
]
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.expect("request should succeed");
|
||||
|
||||
assert_eq!(resp.status(), 200);
|
||||
|
||||
let body: serde_json::Value = resp.json().await.expect("valid JSON");
|
||||
|
||||
// Response should be in Anthropic format.
|
||||
assert_eq!(body["type"], "message");
|
||||
assert_eq!(body["role"], "assistant");
|
||||
assert_eq!(body["model"], "test-model");
|
||||
|
||||
// Content should be an array of content blocks.
|
||||
let content = body["content"].as_array().expect("content array");
|
||||
assert_eq!(content.len(), 1);
|
||||
assert_eq!(content[0]["type"], "text");
|
||||
assert_eq!(content[0]["text"], "Hello from mock backend");
|
||||
|
||||
// Stop reason should be translated from "stop" to "end_turn".
|
||||
assert_eq!(body["stop_reason"], "end_turn");
|
||||
|
||||
// Usage should have Anthropic field names.
|
||||
assert_eq!(body["usage"]["input_tokens"], 10);
|
||||
assert_eq!(body["usage"]["output_tokens"], 5);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_anthropic_with_system_prompt() {
|
||||
let mock_url = common::spawn_mock_backend().await;
|
||||
let gw_url = common::spawn_gateway(&mock_url).await;
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let resp = client
|
||||
.post(format!("{gw_url}/v1/messages"))
|
||||
.header("content-type", "application/json")
|
||||
.json(&json!({
|
||||
"model": "test-model",
|
||||
"max_tokens": 100,
|
||||
"system": "You are a helpful assistant.",
|
||||
"messages": [
|
||||
{"role": "user", "content": "Hi"}
|
||||
]
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.expect("request should succeed");
|
||||
|
||||
assert_eq!(resp.status(), 200);
|
||||
|
||||
let body: serde_json::Value = resp.json().await.expect("valid JSON");
|
||||
assert_eq!(body["type"], "message");
|
||||
assert_eq!(body["content"][0]["text"], "Hello from mock backend");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_anthropic_with_content_blocks() {
|
||||
let mock_url = common::spawn_mock_backend().await;
|
||||
let gw_url = common::spawn_gateway(&mock_url).await;
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let resp = client
|
||||
.post(format!("{gw_url}/v1/messages"))
|
||||
.header("content-type", "application/json")
|
||||
.json(&json!({
|
||||
"model": "test-model",
|
||||
"max_tokens": 100,
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": "What is this?"}
|
||||
]
|
||||
}
|
||||
]
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.expect("request should succeed");
|
||||
|
||||
assert_eq!(resp.status(), 200);
|
||||
|
||||
let body: serde_json::Value = resp.json().await.expect("valid JSON");
|
||||
assert_eq!(body["type"], "message");
|
||||
assert_eq!(body["content"][0]["text"], "Hello from mock backend");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_anthropic_model_not_found() {
|
||||
let mock_url = common::spawn_mock_backend().await;
|
||||
let gw_url = common::spawn_gateway(&mock_url).await;
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let resp = client
|
||||
.post(format!("{gw_url}/v1/messages"))
|
||||
.header("content-type", "application/json")
|
||||
.json(&json!({
|
||||
"model": "nonexistent",
|
||||
"max_tokens": 100,
|
||||
"messages": [
|
||||
{"role": "user", "content": "Hi"}
|
||||
]
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.expect("request should succeed");
|
||||
|
||||
assert_eq!(resp.status(), 404);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_anthropic_invalid_request() {
|
||||
let mock_url = common::spawn_mock_backend().await;
|
||||
let gw_url = common::spawn_gateway(&mock_url).await;
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let resp = client
|
||||
.post(format!("{gw_url}/v1/messages"))
|
||||
.header("content-type", "application/json")
|
||||
.json(&json!({
|
||||
"not_a_valid": "request"
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.expect("request should succeed");
|
||||
|
||||
assert_eq!(resp.status(), 400);
|
||||
|
||||
let body: serde_json::Value = resp.json().await.unwrap();
|
||||
assert!(
|
||||
body["error"]["message"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.contains("invalid Anthropic request")
|
||||
);
|
||||
}
|
||||
53
crates/cortex-gateway/tests/metrics.rs
Normal file
53
crates/cortex-gateway/tests/metrics.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
mod common;
|
||||
|
||||
use serde_json::json;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_metrics_emitted_after_proxy() {
|
||||
// Install a test recorder (no HTTP listener, renders to string).
|
||||
// This sets the global recorder, so only one test can do this.
|
||||
let handle = cortex_gateway::metrics::install_test_recorder().expect("recorder should install");
|
||||
|
||||
let mock_url = common::spawn_mock_backend().await;
|
||||
let gw_url = common::spawn_gateway(&mock_url).await;
|
||||
|
||||
// Verify no request metrics yet.
|
||||
let before = handle.render();
|
||||
assert!(
|
||||
!before.contains("cortex_requests_total"),
|
||||
"no request metrics before any requests"
|
||||
);
|
||||
|
||||
// Make a successful request.
|
||||
let client = reqwest::Client::new();
|
||||
let resp = client
|
||||
.post(format!("{gw_url}/v1/chat/completions"))
|
||||
.header("content-type", "application/json")
|
||||
.json(&json!({
|
||||
"model": "test-model",
|
||||
"messages": [{"role": "user", "content": "Hi"}]
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.expect("request should succeed");
|
||||
assert_eq!(resp.status(), 200);
|
||||
// Consume the response body to ensure the proxy completes.
|
||||
let _body: serde_json::Value = resp.json().await.unwrap();
|
||||
|
||||
// Check metrics were emitted.
|
||||
let after = handle.render();
|
||||
|
||||
assert!(
|
||||
after.contains("cortex_requests_total"),
|
||||
"cortex_requests_total should be present after a request.\nMetrics:\n{after}"
|
||||
);
|
||||
assert!(
|
||||
after.contains("cortex_request_duration_seconds"),
|
||||
"cortex_request_duration_seconds should be present.\nMetrics:\n{after}"
|
||||
);
|
||||
// Should NOT have error or cold start counters for this request.
|
||||
assert!(
|
||||
!after.contains("cortex_request_errors_total"),
|
||||
"no errors expected for a successful request"
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user