Files
cortex/crates/cortex-gateway/tests/streaming.rs
rob thijssen e42e8ee81f
All checks were successful
CI / Format, lint, build, test (push) Successful in 2m46s
CI / Build SRPM (push) Has been skipped
CI / Publish to COPR (push) Has been skipped
refactor: cortex talks to neurons instead of mistral.rs directly
Replace NodeConfig (static vram_mb, pinned) with NeuronEndpoint.
Hardware discovery and model pinning now come from neuron API and
models.toml catalogue respectively.

- config.rs: nodes -> neurons, add models_config path
- catalogue.rs: ModelProfile with pinned_on, ModelCatalogue
- poller.rs: poll neuron GET /models (ModelInfo format)
- router.rs: resolve inference endpoint via neuron GET /models/{id}/endpoint
- evictor.rs: call neuron POST /models/unload
- node.rs: remove vram_mb, pinned fields (come from discovery/catalogue)
- All 22 gateway tests updated to mock neuron API
- Remove MistralModelsResponse, ModelLifecycleRequest (no longer needed)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 14:42:52 +03:00

110 lines
3.2 KiB
Rust

mod common;
use futures::StreamExt;
use serde_json::json;
use std::time::{Duration, Instant};
#[tokio::test]
async fn test_streaming_sse_passthrough() {
let chunk_count = 5;
let chunk_delay = Duration::from_millis(50);
let mock_url = common::spawn_streaming_mock_neuron(chunk_count, chunk_delay).await;
let gw_url = common::spawn_gateway(&mock_url).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("{gw_url}/v1/chat/completions"))
.header("content-type", "application/json")
.json(&json!({
"model": "test-model",
"messages": [{"role": "user", "content": "Hi"}],
"stream": true
}))
.send()
.await
.expect("request should succeed");
assert_eq!(resp.status(), 200);
assert_eq!(
resp.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or(""),
"text/event-stream"
);
let start = Instant::now();
let mut chunk_times = Vec::new();
let mut chunks = Vec::new();
let mut stream = resp.bytes_stream();
while let Some(result) = stream.next().await {
let bytes = result.expect("chunk should be valid");
let text = String::from_utf8_lossy(&bytes);
for line in text.split("data: ").filter(|s| !s.is_empty()) {
let trimmed = line.trim();
if !trimmed.is_empty() {
chunk_times.push(start.elapsed());
chunks.push(trimmed.to_string());
}
}
}
assert!(
chunks.len() >= chunk_count + 1,
"expected at least {} chunks (got {}): {:?}",
chunk_count + 1,
chunks.len(),
chunks,
);
assert_eq!(chunks.last().unwrap(), "[DONE]");
for i in 0..chunk_count {
let chunk_json: serde_json::Value =
serde_json::from_str(&chunks[i]).expect("chunk should be valid JSON");
assert_eq!(
chunk_json["choices"][0]["delta"]["content"],
format!("token{i}")
);
}
let first = chunk_times.first().unwrap();
let last = chunk_times.last().unwrap();
let spread = *last - *first;
assert!(
spread >= Duration::from_millis(100),
"chunks should arrive incrementally (spread: {spread:?})",
);
}
#[tokio::test]
async fn test_streaming_done_terminator() {
let mock_url = common::spawn_streaming_mock_neuron(2, Duration::from_millis(10)).await;
let gw_url = common::spawn_gateway(&mock_url).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("{gw_url}/v1/chat/completions"))
.header("content-type", "application/json")
.json(&json!({
"model": "test-model",
"messages": [{"role": "user", "content": "Hi"}],
"stream": true
}))
.send()
.await
.expect("request should succeed");
let body = resp.text().await.unwrap();
assert!(
body.contains("data: [DONE]"),
"response must contain [DONE] terminator"
);
assert!(body.contains("token0"), "response must contain first token");
assert!(
body.contains("token1"),
"response must contain second token"
);
}