diff --git a/CLAUDE.md b/CLAUDE.md index fb7613e..298da6b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -211,22 +211,13 @@ no healthy nodes, missing model field. Test helpers in `tests/common/mod.rs` provide `spawn_mock_backend()` and `spawn_gateway()` using axum as the mock mistral.rs backend. -### Phase 2: Streaming SSE passthrough +### Phase 2: Streaming SSE passthrough ✅ -**Goal:** `"stream": true` requests proxy SSE chunks in real time. - -**Files to change:** -- `cortex-gateway/src/proxy.rs` — the current implementation already - uses `reqwest::Response::bytes_stream()` piped into `axum::body::Body`. - Verify this actually works for SSE by testing with a mock that emits - `data: {...}\n\n` chunks with delays between them. -- `tests/` — streaming integration test: - 1. Mock backend sends 5 SSE chunks with 100ms between each - 2. Assert cortex forwards each chunk as it arrives (not buffered) - 3. Assert the `data: [DONE]` terminator is forwarded - -**Done when:** Streaming test passes. Manual test against a real -mistral.rs instance confirms chunks arrive incrementally. +Completed. The existing `Body::from_stream(bytes_stream())` proxy works +for SSE out of the box. 2 integration tests in `cortex-gateway/tests/streaming.rs`: +- `test_streaming_sse_passthrough` — 5 chunks with 50ms delays, verifies + incremental delivery (time spread between first and last chunk) +- `test_streaming_done_terminator` — verifies `data: [DONE]` is forwarded ### Phase 3: Poller + live `/v1/models` diff --git a/crates/cortex-gateway/tests/common/mod.rs b/crates/cortex-gateway/tests/common/mod.rs index 11c57a2..05be7f2 100644 --- a/crates/cortex-gateway/tests/common/mod.rs +++ b/crates/cortex-gateway/tests/common/mod.rs @@ -1,3 +1,8 @@ +#![allow(dead_code)] + +use axum::body::Body; +use axum::http::header; +use axum::response::Response; use axum::routing::{get, post}; use axum::{Json, Router}; use cortex_core::config::{ @@ -5,8 +10,10 @@ use cortex_core::config::{ }; use cortex_core::node::{ModelEntry, ModelStatus}; use cortex_gateway::state::CortexState; +use futures::{StreamExt, stream}; use serde_json::{Value, json}; use std::sync::Arc; +use std::time::Duration; use tokio::net::TcpListener; /// Spawns a mock mistral.rs backend on a random port. @@ -63,6 +70,66 @@ async fn mock_list_models() -> Json { })) } +/// Spawns a mock mistral.rs backend that returns SSE streaming responses. +/// Each chunk is delayed by `chunk_delay` to prove the proxy streams incrementally. +pub async fn spawn_streaming_mock_backend(chunk_count: usize, chunk_delay: Duration) -> String { + let app = Router::new() + .route( + "/v1/chat/completions", + post(move |Json(body): Json| async move { + let model = body + .get("model") + .and_then(|v| v.as_str()) + .unwrap_or("unknown") + .to_string(); + + let chunks: Vec = (0..chunk_count) + .map(|i| { + let content = format!("token{i}"); + let chunk = json!({ + "id": "chatcmpl-stream-001", + "object": "chat.completion.chunk", + "created": 1700000000_u64, + "model": model, + "choices": [{ + "index": 0, + "delta": { "content": content }, + "finish_reason": null + }] + }); + format!("data: {chunk}\n\n") + }) + .collect(); + + let delay = chunk_delay; + let stream = stream::iter( + chunks + .into_iter() + .chain(std::iter::once("data: [DONE]\n\n".to_string())), + ) + .then(move |chunk| async move { + tokio::time::sleep(delay).await; + Ok::<_, std::convert::Infallible>(chunk) + }); + + Response::builder() + .header(header::CONTENT_TYPE, "text/event-stream") + .header(header::CACHE_CONTROL, "no-cache") + .body(Body::from_stream(stream)) + .unwrap() + }), + ) + .route("/v1/models", get(mock_list_models)); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + format!("http://{addr}") +} + /// Spawns the cortex gateway with a single node pointing at `mock_url`. /// The node is pre-seeded as healthy with one loaded model ("test-model"). /// Returns the gateway's base URL. diff --git a/crates/cortex-gateway/tests/streaming.rs b/crates/cortex-gateway/tests/streaming.rs new file mode 100644 index 0000000..320ef8e --- /dev/null +++ b/crates/cortex-gateway/tests/streaming.rs @@ -0,0 +1,117 @@ +mod common; + +use futures::StreamExt; +use serde_json::json; +use std::time::{Duration, Instant}; + +#[tokio::test] +async fn test_streaming_sse_passthrough() { + let chunk_count = 5; + let chunk_delay = Duration::from_millis(50); + let mock_url = common::spawn_streaming_mock_backend(chunk_count, chunk_delay).await; + let gw_url = common::spawn_gateway(&mock_url).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("{gw_url}/v1/chat/completions")) + .header("content-type", "application/json") + .json(&json!({ + "model": "test-model", + "messages": [{"role": "user", "content": "Hi"}], + "stream": true + })) + .send() + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), 200); + assert_eq!( + resp.headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or(""), + "text/event-stream" + ); + + // Collect SSE chunks as they arrive, recording arrival times. + let start = Instant::now(); + let mut chunk_times = Vec::new(); + let mut chunks = Vec::new(); + let mut stream = resp.bytes_stream(); + + while let Some(result) = stream.next().await { + let bytes = result.expect("chunk should be valid"); + let text = String::from_utf8_lossy(&bytes); + for line in text.split("data: ").filter(|s| !s.is_empty()) { + let trimmed = line.trim(); + if !trimmed.is_empty() { + chunk_times.push(start.elapsed()); + chunks.push(trimmed.to_string()); + } + } + } + + // Verify we got all content chunks plus [DONE]. + assert!( + chunks.len() >= chunk_count + 1, + "expected at least {} chunks (got {}): {:?}", + chunk_count + 1, + chunks.len(), + chunks, + ); + + // The last chunk should be [DONE]. + assert_eq!(chunks.last().unwrap(), "[DONE]"); + + // Verify the content chunks contain expected tokens. + for i in 0..chunk_count { + let chunk_json: serde_json::Value = + serde_json::from_str(&chunks[i]).expect("chunk should be valid JSON"); + assert_eq!( + chunk_json["choices"][0]["delta"]["content"], + format!("token{i}") + ); + } + + // Verify streaming behavior: total time should reflect incremental delivery, + // not a single batch. With 5 chunks at 50ms each + [DONE], we expect ~300ms total. + // If buffered, all chunks would arrive at once after ~300ms with no spread. + // We verify that the last chunk arrived noticeably after the first. + let first = chunk_times.first().unwrap(); + let last = chunk_times.last().unwrap(); + let spread = *last - *first; + assert!( + spread >= Duration::from_millis(100), + "chunks should arrive incrementally (spread: {spread:?})", + ); +} + +#[tokio::test] +async fn test_streaming_done_terminator() { + let mock_url = common::spawn_streaming_mock_backend(2, Duration::from_millis(10)).await; + let gw_url = common::spawn_gateway(&mock_url).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("{gw_url}/v1/chat/completions")) + .header("content-type", "application/json") + .json(&json!({ + "model": "test-model", + "messages": [{"role": "user", "content": "Hi"}], + "stream": true + })) + .send() + .await + .expect("request should succeed"); + + let body = resp.text().await.unwrap(); + assert!( + body.contains("data: [DONE]"), + "response must contain [DONE] terminator" + ); + assert!(body.contains("token0"), "response must contain first token"); + assert!( + body.contains("token1"), + "response must contain second token" + ); +}