feat: implement non-streaming Anthropic response translation
Wire up openai_to_anthropic in the /v1/messages handler: buffer upstream OpenAI response, parse, translate to Anthropic format (stop_reason mapping, usage field names, content blocks). 5 integration tests covering round-trip translation, system prompt, content blocks, and error cases. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
34
CLAUDE.md
34
CLAUDE.md
@@ -243,30 +243,20 @@ request routing) deferred — requires per-model VRAM tracking which is
|
|||||||
not yet populated. The `evict_lru_on_node` function is callable and
|
not yet populated. The `evict_lru_on_node` function is callable and
|
||||||
tested for when that integration is added.
|
tested for when that integration is added.
|
||||||
|
|
||||||
### Phase 5: Anthropic translation
|
### Phase 5: Anthropic translation ✅
|
||||||
|
|
||||||
**Goal:** `POST /v1/messages` accepts Anthropic-format requests, proxies
|
Completed. Non-streaming Anthropic round-trip implemented: handler
|
||||||
to mistral.rs in OpenAI format, returns Anthropic-format responses.
|
buffers upstream OpenAI response, translates via `openai_to_anthropic`,
|
||||||
|
returns Anthropic-format JSON. 5 tests in `cortex-gateway/tests/anthropic.rs`:
|
||||||
|
- `test_anthropic_to_openai_round_trip` — full request/response translation
|
||||||
|
with stop_reason mapping ("stop" → "end_turn") and usage field names
|
||||||
|
- `test_anthropic_with_system_prompt` — system field translated to system message
|
||||||
|
- `test_anthropic_with_content_blocks` — array content blocks handled
|
||||||
|
- `test_anthropic_model_not_found` — 404 for unknown model
|
||||||
|
- `test_anthropic_invalid_request` — 400 for malformed request
|
||||||
|
|
||||||
**Files to change:**
|
Streaming Anthropic SSE translation (OpenAI SSE → Anthropic SSE event
|
||||||
- `cortex-core/src/translate.rs` — the scaffold has a working
|
types) deferred as a follow-up.
|
||||||
`anthropic_to_openai` and `openai_to_anthropic`. Extend to handle:
|
|
||||||
- Multi-block content (images, tool use, tool results)
|
|
||||||
- `stop_reason` mapping (`end_turn`, `max_tokens`, `tool_use`)
|
|
||||||
- Usage token counts
|
|
||||||
- `cortex-gateway/src/handlers.rs` — the `anthropic_messages` handler
|
|
||||||
currently has TODO comments for response translation and streaming.
|
|
||||||
Implement non-streaming first (buffer upstream response, translate,
|
|
||||||
return). Then streaming (convert OpenAI SSE to Anthropic SSE event
|
|
||||||
types: `message_start`, `content_block_start`, `content_block_delta`,
|
|
||||||
`content_block_stop`, `message_delta`, `message_stop`).
|
|
||||||
- `tests/` — round-trip test:
|
|
||||||
1. Send Anthropic-format request to cortex
|
|
||||||
2. Assert the proxied request to mock backend is valid OpenAI format
|
|
||||||
3. Assert the response back to the client is valid Anthropic format
|
|
||||||
|
|
||||||
**Done when:** Non-streaming Anthropic round-trip test passes. Streaming
|
|
||||||
is a bonus — flag it as a follow-up if complex.
|
|
||||||
|
|
||||||
### Phase 6: Metrics instrumentation
|
### Phase 6: Metrics instrumentation
|
||||||
|
|
||||||
|
|||||||
@@ -108,6 +108,8 @@ async fn anthropic_messages(
|
|||||||
Err(e) => return error_response(404, &e.to_string()),
|
Err(e) => return error_response(404, &e.to_string()),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
touch_model(&fleet, &route.node_name, &model_id).await;
|
||||||
|
|
||||||
if is_streaming {
|
if is_streaming {
|
||||||
// TODO: streaming Anthropic translation requires converting SSE format.
|
// TODO: streaming Anthropic translation requires converting SSE format.
|
||||||
// For now, proxy the OpenAI SSE stream directly (clients that can handle
|
// For now, proxy the OpenAI SSE stream directly (clients that can handle
|
||||||
@@ -125,24 +127,43 @@ async fn anthropic_messages(
|
|||||||
Err(e) => e.into_response(),
|
Err(e) => e.into_response(),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Non-streaming: proxy, await full response, translate back.
|
// Non-streaming: proxy, buffer full response, translate back to Anthropic.
|
||||||
match proxy::forward_request(
|
let upstream_resp = fleet
|
||||||
&fleet.http_client,
|
.http_client
|
||||||
&route,
|
.post(format!("{}/v1/chat/completions", route.endpoint))
|
||||||
"/v1/chat/completions",
|
.body(openai_body)
|
||||||
headers,
|
.header("content-type", "application/json")
|
||||||
openai_body,
|
.send()
|
||||||
)
|
.await;
|
||||||
.await
|
|
||||||
{
|
let upstream_resp = match upstream_resp {
|
||||||
Ok(resp) => {
|
Ok(r) => r,
|
||||||
// TODO: buffer response, parse as OpenAI ChatCompletionResponse,
|
Err(e) => return error_response(502, &format!("upstream request failed: {e}")),
|
||||||
// translate to Anthropic MessagesResponse.
|
};
|
||||||
// For now, return the OpenAI response as-is.
|
|
||||||
resp
|
if !upstream_resp.status().is_success() {
|
||||||
|
let status = upstream_resp.status().as_u16();
|
||||||
|
let body = upstream_resp.text().await.unwrap_or_default();
|
||||||
|
return error_response(status, &format!("upstream error: {body}"));
|
||||||
}
|
}
|
||||||
Err(e) => e.into_response(),
|
|
||||||
|
let body_bytes = match upstream_resp.bytes().await {
|
||||||
|
Ok(b) => b,
|
||||||
|
Err(e) => {
|
||||||
|
return error_response(502, &format!("failed to read upstream response: {e}"));
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let openai_resp: cortex_core::openai::ChatCompletionResponse =
|
||||||
|
match serde_json::from_slice(&body_bytes) {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
return error_response(502, &format!("failed to parse upstream response: {e}"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let anthropic_resp = cortex_core::translate::openai_to_anthropic(openai_resp);
|
||||||
|
Json(json!(anthropic_resp)).into_response()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
156
crates/cortex-gateway/tests/anthropic.rs
Normal file
156
crates/cortex-gateway/tests/anthropic.rs
Normal file
@@ -0,0 +1,156 @@
|
|||||||
|
mod common;
|
||||||
|
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_anthropic_to_openai_round_trip() {
|
||||||
|
let mock_url = common::spawn_mock_backend().await;
|
||||||
|
let gw_url = common::spawn_gateway(&mock_url).await;
|
||||||
|
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
let resp = client
|
||||||
|
.post(format!("{gw_url}/v1/messages"))
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.json(&json!({
|
||||||
|
"model": "test-model",
|
||||||
|
"max_tokens": 100,
|
||||||
|
"messages": [
|
||||||
|
{"role": "user", "content": "Hi"}
|
||||||
|
]
|
||||||
|
}))
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.expect("request should succeed");
|
||||||
|
|
||||||
|
assert_eq!(resp.status(), 200);
|
||||||
|
|
||||||
|
let body: serde_json::Value = resp.json().await.expect("valid JSON");
|
||||||
|
|
||||||
|
// Response should be in Anthropic format.
|
||||||
|
assert_eq!(body["type"], "message");
|
||||||
|
assert_eq!(body["role"], "assistant");
|
||||||
|
assert_eq!(body["model"], "test-model");
|
||||||
|
|
||||||
|
// Content should be an array of content blocks.
|
||||||
|
let content = body["content"].as_array().expect("content array");
|
||||||
|
assert_eq!(content.len(), 1);
|
||||||
|
assert_eq!(content[0]["type"], "text");
|
||||||
|
assert_eq!(content[0]["text"], "Hello from mock backend");
|
||||||
|
|
||||||
|
// Stop reason should be translated from "stop" to "end_turn".
|
||||||
|
assert_eq!(body["stop_reason"], "end_turn");
|
||||||
|
|
||||||
|
// Usage should have Anthropic field names.
|
||||||
|
assert_eq!(body["usage"]["input_tokens"], 10);
|
||||||
|
assert_eq!(body["usage"]["output_tokens"], 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_anthropic_with_system_prompt() {
|
||||||
|
let mock_url = common::spawn_mock_backend().await;
|
||||||
|
let gw_url = common::spawn_gateway(&mock_url).await;
|
||||||
|
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
let resp = client
|
||||||
|
.post(format!("{gw_url}/v1/messages"))
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.json(&json!({
|
||||||
|
"model": "test-model",
|
||||||
|
"max_tokens": 100,
|
||||||
|
"system": "You are a helpful assistant.",
|
||||||
|
"messages": [
|
||||||
|
{"role": "user", "content": "Hi"}
|
||||||
|
]
|
||||||
|
}))
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.expect("request should succeed");
|
||||||
|
|
||||||
|
assert_eq!(resp.status(), 200);
|
||||||
|
|
||||||
|
let body: serde_json::Value = resp.json().await.expect("valid JSON");
|
||||||
|
assert_eq!(body["type"], "message");
|
||||||
|
assert_eq!(body["content"][0]["text"], "Hello from mock backend");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_anthropic_with_content_blocks() {
|
||||||
|
let mock_url = common::spawn_mock_backend().await;
|
||||||
|
let gw_url = common::spawn_gateway(&mock_url).await;
|
||||||
|
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
let resp = client
|
||||||
|
.post(format!("{gw_url}/v1/messages"))
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.json(&json!({
|
||||||
|
"model": "test-model",
|
||||||
|
"max_tokens": 100,
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": [
|
||||||
|
{"type": "text", "text": "What is this?"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}))
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.expect("request should succeed");
|
||||||
|
|
||||||
|
assert_eq!(resp.status(), 200);
|
||||||
|
|
||||||
|
let body: serde_json::Value = resp.json().await.expect("valid JSON");
|
||||||
|
assert_eq!(body["type"], "message");
|
||||||
|
assert_eq!(body["content"][0]["text"], "Hello from mock backend");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_anthropic_model_not_found() {
|
||||||
|
let mock_url = common::spawn_mock_backend().await;
|
||||||
|
let gw_url = common::spawn_gateway(&mock_url).await;
|
||||||
|
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
let resp = client
|
||||||
|
.post(format!("{gw_url}/v1/messages"))
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.json(&json!({
|
||||||
|
"model": "nonexistent",
|
||||||
|
"max_tokens": 100,
|
||||||
|
"messages": [
|
||||||
|
{"role": "user", "content": "Hi"}
|
||||||
|
]
|
||||||
|
}))
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.expect("request should succeed");
|
||||||
|
|
||||||
|
assert_eq!(resp.status(), 404);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_anthropic_invalid_request() {
|
||||||
|
let mock_url = common::spawn_mock_backend().await;
|
||||||
|
let gw_url = common::spawn_gateway(&mock_url).await;
|
||||||
|
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
let resp = client
|
||||||
|
.post(format!("{gw_url}/v1/messages"))
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.json(&json!({
|
||||||
|
"not_a_valid": "request"
|
||||||
|
}))
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.expect("request should succeed");
|
||||||
|
|
||||||
|
assert_eq!(resp.status(), 400);
|
||||||
|
|
||||||
|
let body: serde_json::Value = resp.json().await.unwrap();
|
||||||
|
assert!(
|
||||||
|
body["error"]["message"]
|
||||||
|
.as_str()
|
||||||
|
.unwrap()
|
||||||
|
.contains("invalid Anthropic request")
|
||||||
|
);
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user