From 1b339b14265f4e25b4245fb055433a6bc7e271c2 Mon Sep 17 00:00:00 2001 From: rob thijssen Date: Tue, 14 Apr 2026 19:26:12 +0300 Subject: [PATCH] test: add Phase 1 integration tests for basic proxy 6 tests proving the scaffold works end-to-end: - chat completion proxied through gateway to mock backend - /health endpoint with healthy node - /v1/models returns seeded model list - 404 for unknown model - 404 when no healthy nodes available - 400 when request body missing model field Co-Authored-By: Claude Opus 4.6 (1M context) --- CLAUDE.md | 200 +++++++++++++++++++-- crates/cortex-gateway/Cargo.toml | 3 + crates/cortex-gateway/tests/common/mod.rs | 115 ++++++++++++ crates/cortex-gateway/tests/proxy_basic.rs | 174 ++++++++++++++++++ 4 files changed, 479 insertions(+), 13 deletions(-) create mode 100644 crates/cortex-gateway/tests/common/mod.rs create mode 100644 crates/cortex-gateway/tests/proxy_basic.rs diff --git a/CLAUDE.md b/CLAUDE.md index 5bb1642..fb7613e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -139,18 +139,192 @@ automatically. Clippy warnings must be resolved, not suppressed with - Log at `info` for request routing, `debug` for proxy details, `warn` for eviction and node health, `error` for proxy failures -## Current status +## mistral.rs API gotchas -**Scaffold phase** — crate structure, types, and handler stubs are in place. -The following needs implementation: +These are sharp edges Claude Code will hit when implementing the proxy. +Read before touching `proxy.rs` or `handlers.rs`. -1. **cortex-core**: Flesh out OpenAI/Anthropic envelope types with all fields - needed for chat completions (streaming + non-streaming) -2. **cortex-gateway/proxy.rs**: Implement streaming HTTP proxy with SSE passthrough -3. **cortex-gateway/router.rs**: Model-to-node routing with fallback to least-loaded -4. **cortex-gateway/evictor.rs**: LRU eviction with pinning support -5. **cortex-gateway/poller.rs**: Background polling of node `/v1/models` endpoints -6. **cortex-gateway/handlers.rs**: Wire up axum routes to proxy logic -7. **cortex-core/translate.rs**: OpenAI <-> Anthropic request/response translation -8. **cortex-agent**: Sidecar for VRAM defrag restarts (lower priority) -9. **Integration tests**: Mock mistralrs backends, test routing + eviction +### Model name validation + +mistral.rs validates that the `model` field in every request matches the +model that was actually loaded. If the names don't match, the request is +rejected outright. The special model name `"default"` bypasses this +validation entirely. + +**Implication for cortex:** The gateway must ensure the `model` field in +the proxied request body matches what mistral.rs expects. Two strategies: + +1. **Passthrough** — the client uses the exact HuggingFace model ID + (e.g. `Qwen/Qwen3-Coder-30B-A3B-Instruct`) and cortex routes based + on that. This is the simplest approach and should be the default. +2. **Rewrite to `"default"`** — if cortex introduces its own model + aliases, it must rewrite the `model` field to `"default"` before + proxying. This is a future feature, not phase 1. + +### Lazy loading latency + +When a request hits an unloaded model, mistral.rs automatically reloads +it before processing. This can take 10-60+ seconds for large models. The +gateway must: +- Set a generous HTTP client timeout (already 300s in the scaffold). +- Mark the request as `cold_start: true` in metrics. +- Not retry or time out prematurely — the upstream is busy loading, not dead. + +### SSE stream format + +mistral.rs streams use standard OpenAI SSE format: +``` +data: {"id":"...","choices":[{"delta":{"content":"token"},...}]}\n\n +data: [DONE]\n\n +``` +The proxy must forward these chunks verbatim. Do not attempt to parse +or re-serialize each chunk — that adds latency and risks breaking the +stream. Parse only for metrics extraction (token counts from the final +`usage` object, timing from chunk arrival). + +### Multi-model mode + +`mistralrs serve` can load multiple models when started with a selector +config or multiple `--text-model` / `--vision-model` flags. The +`/v1/models` response lists all of them with a `status` field. When +sending requests, the `model` field must match one of the listed model +IDs — `"default"` only works if you don't care which model handles it. + +### Unload preserves config + +`POST /v1/models/unload` frees VRAM but keeps the model's config in +memory. A subsequent request to that model (or explicit `reload`) will +reload from disk/HF cache — not re-download. This is fast relative to +initial download but still involves loading weights into VRAM. + + +## Implementation plan + +Each phase is a branch → PR. CI must pass (fmt, clippy, test) before merge. +Phases are sequential — each builds on the previous. + +### Phase 1: Compile and proxy a basic request ✅ + +Completed. 6 integration tests in `cortex-gateway/tests/proxy_basic.rs`: +chat completion proxy, health endpoint, list models, model not found, +no healthy nodes, missing model field. Test helpers in `tests/common/mod.rs` +provide `spawn_mock_backend()` and `spawn_gateway()` using axum as the +mock mistral.rs backend. + +### Phase 2: Streaming SSE passthrough + +**Goal:** `"stream": true` requests proxy SSE chunks in real time. + +**Files to change:** +- `cortex-gateway/src/proxy.rs` — the current implementation already + uses `reqwest::Response::bytes_stream()` piped into `axum::body::Body`. + Verify this actually works for SSE by testing with a mock that emits + `data: {...}\n\n` chunks with delays between them. +- `tests/` — streaming integration test: + 1. Mock backend sends 5 SSE chunks with 100ms between each + 2. Assert cortex forwards each chunk as it arrives (not buffered) + 3. Assert the `data: [DONE]` terminator is forwarded + +**Done when:** Streaming test passes. Manual test against a real +mistral.rs instance confirms chunks arrive incrementally. + +### Phase 3: Poller + live `/v1/models` + +**Goal:** The background poller refreshes node state from real (or mock) +mistral.rs instances. `GET /v1/models` returns live, aggregated data. + +**Files to change:** +- `cortex-gateway/src/poller.rs` — already implemented but needs testing +- `cortex-gateway/src/handlers.rs` — the `list_models` handler reads + from `CortexState`; verify it reflects poller updates +- `tests/` — test that: + 1. Mock backend serves `/v1/models` with 2 models (1 loaded, 1 unloaded) + 2. After poller runs, `GET /v1/models` on cortex returns both with + correct status and node attribution + +**Done when:** Poller test passes. The router in Phase 1 now routes +based on live-polled state instead of seed data. + +### Phase 4: Eviction + +**Goal:** When a request targets a model that requires loading and the +node is at capacity, cortex evicts the LRU non-pinned model first. + +**Files to change:** +- `cortex-gateway/src/evictor.rs` — `evict_lru_on_node` is implemented; + integrate it into the request path +- `cortex-gateway/src/router.rs` — add a `resolve_with_eviction` path + that calls the evictor when the target model is unloaded and the node + has no free VRAM headroom +- `cortex-gateway/src/handlers.rs` — update `last_accessed` on + `ModelEntry` for every successful request (drives LRU ordering) +- `tests/` — eviction test: + 1. Mock node reports 2 loaded models, 0 free VRAM + 2. Request arrives for a 3rd model (unloaded on that node) + 3. Assert cortex calls `POST /v1/models/unload` on the LRU model + 4. Assert the original request is then forwarded (lazy load) + 5. Assert pinned models are never evicted + +**Done when:** Eviction test passes. `lifecycle_cycles` increments. +Defrag warning fires at threshold. + +### Phase 5: Anthropic translation + +**Goal:** `POST /v1/messages` accepts Anthropic-format requests, proxies +to mistral.rs in OpenAI format, returns Anthropic-format responses. + +**Files to change:** +- `cortex-core/src/translate.rs` — the scaffold has a working + `anthropic_to_openai` and `openai_to_anthropic`. Extend to handle: + - Multi-block content (images, tool use, tool results) + - `stop_reason` mapping (`end_turn`, `max_tokens`, `tool_use`) + - Usage token counts +- `cortex-gateway/src/handlers.rs` — the `anthropic_messages` handler + currently has TODO comments for response translation and streaming. + Implement non-streaming first (buffer upstream response, translate, + return). Then streaming (convert OpenAI SSE to Anthropic SSE event + types: `message_start`, `content_block_start`, `content_block_delta`, + `content_block_stop`, `message_delta`, `message_stop`). +- `tests/` — round-trip test: + 1. Send Anthropic-format request to cortex + 2. Assert the proxied request to mock backend is valid OpenAI format + 3. Assert the response back to the client is valid Anthropic format + +**Done when:** Non-streaming Anthropic round-trip test passes. Streaming +is a bonus — flag it as a follow-up if complex. + +### Phase 6: Metrics instrumentation + +**Goal:** Every proxied request emits Prometheus metrics. `/metrics` +on port 9100 returns valid Prometheus text format. + +**Files to change:** +- `cortex-gateway/src/proxy.rs` or `cortex-gateway/src/handlers.rs` — + wrap each proxy call with timing instrumentation: + - `Instant::now()` before the request, compute duration after + - Parse `usage` from the response (non-streaming) or final chunk + (streaming) for token counts + - Emit: `metrics::histogram!("cortex_request_duration_seconds", ...)` + with labels `model` and `node` + - Emit: `metrics::counter!("cortex_requests_total", ...)` + - Emit cold start, eviction, and error counters +- `cortex-gateway/src/metrics.rs` — already installs the exporter; + verify the described metrics appear +- `tests/` — test that after a proxied request, the `/metrics` + endpoint contains the expected metric names + +**Done when:** `curl localhost:9100/metrics` shows request counters +and duration histograms after proxying a test request. + +### Phase 7 (lower priority): Agent sidecar + +**Goal:** Per-node binary that handles VRAM defrag restarts and +reports real VRAM usage via `nvidia-smi`. + +This is deferred. The gateway handles the critical path (model +lifecycle) entirely via the mistral.rs HTTP API. The agent adds +operational polish: automatic process restart when `lifecycle_cycles` +exceeds threshold, real VRAM reporting (vs. estimates), and +potentially GPU temperature/power monitoring. + +**Defer until:** Phases 1-6 are merged and running in production. diff --git a/crates/cortex-gateway/Cargo.toml b/crates/cortex-gateway/Cargo.toml index 7515467..1b73808 100644 --- a/crates/cortex-gateway/Cargo.toml +++ b/crates/cortex-gateway/Cargo.toml @@ -23,3 +23,6 @@ futures.workspace = true tokio-stream.workspace = true eventsource-stream.workspace = true bytes = "1" + +[dev-dependencies] +tokio = { workspace = true, features = ["test-util"] } diff --git a/crates/cortex-gateway/tests/common/mod.rs b/crates/cortex-gateway/tests/common/mod.rs new file mode 100644 index 0000000..11c57a2 --- /dev/null +++ b/crates/cortex-gateway/tests/common/mod.rs @@ -0,0 +1,115 @@ +use axum::routing::{get, post}; +use axum::{Json, Router}; +use cortex_core::config::{ + EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NodeConfig, +}; +use cortex_core::node::{ModelEntry, ModelStatus}; +use cortex_gateway::state::CortexState; +use serde_json::{Value, json}; +use std::sync::Arc; +use tokio::net::TcpListener; + +/// Spawns a mock mistral.rs backend on a random port. +/// Returns the base URL (e.g. "http://127.0.0.1:12345"). +pub async fn spawn_mock_backend() -> String { + let app = Router::new() + .route("/v1/chat/completions", post(mock_chat_completions)) + .route("/v1/models", get(mock_list_models)); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + format!("http://{addr}") +} + +async fn mock_chat_completions(Json(body): Json) -> Json { + let model = body + .get("model") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + + Json(json!({ + "id": "chatcmpl-test-001", + "object": "chat.completion", + "created": 1700000000_u64, + "model": model, + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": "Hello from mock backend" + }, + "finish_reason": "stop" + }], + "usage": { + "prompt_tokens": 10, + "completion_tokens": 5, + "total_tokens": 15 + } + })) +} + +async fn mock_list_models() -> Json { + Json(json!({ + "object": "list", + "data": [{ + "id": "test-model", + "object": "model", + "status": "loaded" + }] + })) +} + +/// Spawns the cortex gateway with a single node pointing at `mock_url`. +/// The node is pre-seeded as healthy with one loaded model ("test-model"). +/// Returns the gateway's base URL. +pub async fn spawn_gateway(mock_url: &str) -> String { + let config = GatewayConfig { + gateway: GatewaySettings { + listen: "127.0.0.1:0".into(), + metrics_listen: "127.0.0.1:0".into(), + }, + eviction: EvictionSettings { + strategy: EvictionStrategy::Lru, + defrag_after_cycles: 0, + }, + nodes: vec![NodeConfig { + name: "mock-node".into(), + endpoint: mock_url.to_string(), + vram_mb: 24000, + pinned: vec![], + }], + }; + + let fleet = Arc::new(CortexState::from_config(&config)); + + // Seed the node as healthy with a loaded model. + // (Bypasses the poller, which is not running in tests.) + { + let mut nodes = fleet.nodes.write().await; + let node = nodes.get_mut("mock-node").expect("node must exist"); + node.healthy = true; + node.models.insert( + "test-model".into(), + ModelEntry { + id: "test-model".into(), + status: ModelStatus::Loaded, + last_accessed: None, + vram_estimate_mb: Some(8000), + }, + ); + } + + let app = cortex_gateway::build_app(fleet); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + format!("http://{addr}") +} diff --git a/crates/cortex-gateway/tests/proxy_basic.rs b/crates/cortex-gateway/tests/proxy_basic.rs new file mode 100644 index 0000000..67102ea --- /dev/null +++ b/crates/cortex-gateway/tests/proxy_basic.rs @@ -0,0 +1,174 @@ +mod common; + +use serde_json::json; + +#[tokio::test] +async fn test_chat_completion_proxy() { + let mock_url = common::spawn_mock_backend().await; + let gw_url = common::spawn_gateway(&mock_url).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("{gw_url}/v1/chat/completions")) + .header("content-type", "application/json") + .json(&json!({ + "model": "test-model", + "messages": [{"role": "user", "content": "Hi"}] + })) + .send() + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), 200); + + let body: serde_json::Value = resp.json().await.expect("valid JSON response"); + assert_eq!(body["id"], "chatcmpl-test-001"); + assert_eq!(body["model"], "test-model"); + assert_eq!( + body["choices"][0]["message"]["content"], + "Hello from mock backend" + ); + assert_eq!(body["usage"]["total_tokens"], 15); +} + +#[tokio::test] +async fn test_health_endpoint() { + let mock_url = common::spawn_mock_backend().await; + let gw_url = common::spawn_gateway(&mock_url).await; + + let client = reqwest::Client::new(); + let resp = client + .get(format!("{gw_url}/health")) + .send() + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), 200); + + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["status"], "ok"); + assert_eq!(body["nodes"]["healthy"], 1); + assert_eq!(body["nodes"]["total"], 1); +} + +#[tokio::test] +async fn test_list_models() { + let mock_url = common::spawn_mock_backend().await; + let gw_url = common::spawn_gateway(&mock_url).await; + + let client = reqwest::Client::new(); + let resp = client + .get(format!("{gw_url}/v1/models")) + .send() + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), 200); + + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["object"], "list"); + + let data = body["data"].as_array().expect("data should be an array"); + assert_eq!(data.len(), 1); + assert_eq!(data[0]["id"], "test-model"); +} + +#[tokio::test] +async fn test_model_not_found() { + let mock_url = common::spawn_mock_backend().await; + let gw_url = common::spawn_gateway(&mock_url).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("{gw_url}/v1/chat/completions")) + .header("content-type", "application/json") + .json(&json!({ + "model": "nonexistent-model", + "messages": [{"role": "user", "content": "Hi"}] + })) + .send() + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), 404); + + let body: serde_json::Value = resp.json().await.unwrap(); + assert!( + body["error"]["message"] + .as_str() + .unwrap() + .contains("not found") + ); +} + +#[tokio::test] +async fn test_no_healthy_nodes() { + let config = cortex_core::config::GatewayConfig { + gateway: cortex_core::config::GatewaySettings { + listen: "127.0.0.1:0".into(), + metrics_listen: "127.0.0.1:0".into(), + }, + eviction: cortex_core::config::EvictionSettings { + strategy: cortex_core::config::EvictionStrategy::Lru, + defrag_after_cycles: 0, + }, + nodes: vec![cortex_core::config::NodeConfig { + name: "dead-node".into(), + endpoint: "http://127.0.0.1:1".into(), + vram_mb: 24000, + pinned: vec![], + }], + }; + let fleet = std::sync::Arc::new(cortex_gateway::state::CortexState::from_config(&config)); + + let app = cortex_gateway::build_app(fleet); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://{addr}/v1/chat/completions")) + .header("content-type", "application/json") + .json(&json!({ + "model": "any-model", + "messages": [{"role": "user", "content": "Hi"}] + })) + .send() + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), 404); + + let body: serde_json::Value = resp.json().await.unwrap(); + assert!( + body["error"]["message"] + .as_str() + .unwrap() + .contains("no healthy nodes") + ); +} + +#[tokio::test] +async fn test_missing_model_field() { + let mock_url = common::spawn_mock_backend().await; + let gw_url = common::spawn_gateway(&mock_url).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("{gw_url}/v1/chat/completions")) + .header("content-type", "application/json") + .json(&json!({ + "messages": [{"role": "user", "content": "Hi"}] + })) + .send() + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), 400); + + let body: serde_json::Value = resp.json().await.unwrap(); + assert!(body["error"]["message"].as_str().unwrap().contains("model")); +}