Compare commits
2 Commits
feat/47-ph
...
feat/47-ph
| Author | SHA1 | Date | |
|---|---|---|---|
|
057bc71e80
|
|||
|
dd31c3cd49
|
@@ -1,4 +1,4 @@
|
||||
use crate::discovery::{ActivationStatus, DiscoveryResponse};
|
||||
use crate::discovery::{ActivationStatus, DiscoveryResponse, ModelLoad};
|
||||
use crate::harness::{ModelCost, ModelLimit};
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -27,6 +27,11 @@ pub struct NodeState {
|
||||
/// to synthesize `Loading` locations so clients see a catalogued
|
||||
/// model that's mid-prewarm as "loading", not "missing".
|
||||
pub activation: Option<ActivationStatus>,
|
||||
/// Last-seen per-model admission load from this neuron's `/health`
|
||||
/// (#53), keyed by model id. The router (#55) reads it to pick the
|
||||
/// least-busy replica when a model is loaded on more than one neuron.
|
||||
/// Empty until the first /health poll reports load.
|
||||
pub model_load: HashMap<String, ModelLoad>,
|
||||
}
|
||||
|
||||
/// A model registered on a node, with its runtime status.
|
||||
|
||||
@@ -761,6 +761,19 @@ async fn proxy_with_metrics(
|
||||
body: Bytes,
|
||||
model_id: &str,
|
||||
) -> Response {
|
||||
// Fail-fast prompt pre-validation (#56): refuse a prompt that already
|
||||
// exceeds the model's advertised context window *before* dispatching to
|
||||
// neuron — the same `400 context_length_exceeded` neuron would emit on
|
||||
// overflow, just earlier and without burning a cold-load/queue slot.
|
||||
// cortex has no tokenizer, so the estimate under-counts and neuron stays
|
||||
// the exact wall; we only catch gross overages (the A0 failure mode).
|
||||
if let Some(context) = advertised_context(fleet, &route.node_name, model_id).await {
|
||||
let est = estimate_prompt_tokens(&body);
|
||||
if est > context {
|
||||
return context_length_exceeded_response(context, est, &headers);
|
||||
}
|
||||
}
|
||||
|
||||
let labels = [
|
||||
("model", model_id.to_string()),
|
||||
("node", route.node_name.clone()),
|
||||
@@ -844,6 +857,98 @@ async fn advertised_output_limit(
|
||||
.map(|l| l.output as u64)
|
||||
}
|
||||
|
||||
/// The model's advertised hard context window (`limit.context`, #62/#67) on a
|
||||
/// node, used for fail-fast prompt pre-validation (#56). `None` when no limit
|
||||
/// is known — pre-validation is then skipped and neuron remains the wall.
|
||||
async fn advertised_context(fleet: &CortexState, node_name: &str, model_id: &str) -> Option<u64> {
|
||||
let nodes = fleet.nodes.read().await;
|
||||
nodes
|
||||
.get(node_name)?
|
||||
.models
|
||||
.get(model_id)?
|
||||
.limit
|
||||
.as_ref()
|
||||
.map(|l| l.context as u64)
|
||||
}
|
||||
|
||||
/// Conservative prompt-token estimate (~4 chars/token over message text).
|
||||
/// cortex has no tokenizer; under-counting is the safe direction — we only
|
||||
/// pre-reject gross overages (#56), and neuron enforces the exact wall.
|
||||
fn estimate_prompt_tokens(body: &[u8]) -> u64 {
|
||||
let Ok(v) = serde_json::from_slice::<Value>(body) else {
|
||||
return (body.len() as u64 / 4).max(1);
|
||||
};
|
||||
let mut chars = 0usize;
|
||||
if let Some(messages) = v.get("messages").and_then(Value::as_array) {
|
||||
for m in messages {
|
||||
match m.get("content") {
|
||||
Some(Value::String(s)) => chars += s.len(),
|
||||
Some(Value::Array(parts)) => {
|
||||
for p in parts {
|
||||
if let Some(t) = p.get("text").and_then(Value::as_str) {
|
||||
chars += t.len();
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
chars += 8; // rough per-message role/formatting overhead
|
||||
}
|
||||
} else if let Some(prompt) = v.get("prompt").and_then(Value::as_str) {
|
||||
chars += prompt.len(); // legacy /v1/completions
|
||||
} else {
|
||||
return (body.len() as u64 / 4).max(1);
|
||||
}
|
||||
(chars as u64 / 4).max(1)
|
||||
}
|
||||
|
||||
/// Client-specific, advisory guidance for an over-long prompt (#56),
|
||||
/// fingerprinted from `User-Agent`. Strictly advisory: it rides the
|
||||
/// `X-Helexa-Advice` header only, never the error envelope, and behaviour
|
||||
/// never depends on it. Unknown clients get nothing.
|
||||
fn client_advice(headers: &HeaderMap) -> Option<&'static str> {
|
||||
let ua = headers
|
||||
.get(axum::http::header::USER_AGENT)?
|
||||
.to_str()
|
||||
.ok()?
|
||||
.to_ascii_lowercase();
|
||||
if ua.contains("litellm") {
|
||||
Some(
|
||||
"litellm forwards the full context; lower the configured context window or enable client-side compaction",
|
||||
)
|
||||
} else if ua.contains("agent-zero") || ua.contains("agent zero") {
|
||||
Some("reduce the conversation/context size or summarize earlier turns before resending")
|
||||
} else if ua.contains("zed") {
|
||||
Some("reduce the assistant context window in Zed's settings")
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// `400 context_length_exceeded` for an over-long prompt caught at the edge
|
||||
/// (#56), in the #60 envelope — the same shape neuron emits on overflow, so
|
||||
/// clients (opencode auto-compacts) handle it identically. Attaches the
|
||||
/// advisory `X-Helexa-Advice` header for fingerprinted clients.
|
||||
fn context_length_exceeded_response(
|
||||
context: u64,
|
||||
prompt_est: u64,
|
||||
headers: &HeaderMap,
|
||||
) -> Response {
|
||||
let env = OpenAiError::context_length_exceeded(format!(
|
||||
"This model's maximum context length is {context} tokens. Your request is \
|
||||
estimated at ~{prompt_est} tokens. Please reduce the length of the messages."
|
||||
))
|
||||
.with_extra("max", json!(context))
|
||||
.with_extra("estimated_prompt_tokens", json!(prompt_est));
|
||||
let mut response = crate::error::envelope_response(env);
|
||||
if let Some(advice) = client_advice(headers)
|
||||
&& let Ok(value) = axum::http::HeaderValue::from_str(advice)
|
||||
{
|
||||
response.headers_mut().insert("x-helexa-advice", value);
|
||||
}
|
||||
response
|
||||
}
|
||||
|
||||
/// Update `last_accessed` timestamp for a model on a node (drives LRU eviction).
|
||||
async fn touch_model(fleet: &CortexState, node_name: &str, model_id: &str) {
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
|
||||
@@ -200,6 +200,9 @@ async fn poll_health(fleet: &CortexState, name: &str, endpoint: &str) {
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
if let Some(node) = nodes.get_mut(name) {
|
||||
node.activation = Some(h.activation);
|
||||
// Per-model admission load (#53) → keyed by id for the
|
||||
// load-aware router (#55).
|
||||
node.model_load = h.models.into_iter().map(|m| (m.id.clone(), m)).collect();
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -132,7 +132,9 @@ pub async fn resolve(
|
||||
// Snapshot loaded / unloaded / recovering state from the poller cache.
|
||||
let (loaded_route, unloaded_route, recovering_node, any_healthy) = {
|
||||
let nodes = fleet.nodes.read().await;
|
||||
let mut loaded_route = None;
|
||||
// All healthy nodes with the model loaded, each with its current
|
||||
// admission load (#53) so we can pick the least-busy replica (#55).
|
||||
let mut loaded_candidates: Vec<(String, String, usize)> = Vec::new();
|
||||
let mut unloaded_route = None;
|
||||
let mut recovering_node = None;
|
||||
let mut any_healthy = false;
|
||||
@@ -144,8 +146,15 @@ pub async fn resolve(
|
||||
if let Some(entry) = node.models.get(model_id) {
|
||||
match entry.status {
|
||||
ModelStatus::Loaded | ModelStatus::Reloading => {
|
||||
loaded_route = Some((node.name.clone(), node.endpoint.clone(), false));
|
||||
break;
|
||||
// Least-busy score: in-flight + queued from the
|
||||
// neuron's last /health (#53). Unknown load (no poll
|
||||
// yet) scores 0 so the replica stays eligible.
|
||||
let score = node
|
||||
.model_load
|
||||
.get(model_id)
|
||||
.map(|l| l.in_flight + l.queue_depth)
|
||||
.unwrap_or(0);
|
||||
loaded_candidates.push((node.name.clone(), node.endpoint.clone(), score));
|
||||
}
|
||||
ModelStatus::Unloaded => {
|
||||
if unloaded_route.is_none() {
|
||||
@@ -175,6 +184,12 @@ pub async fn resolve(
|
||||
}
|
||||
}
|
||||
}
|
||||
// Pick the least-busy loaded replica; ties break by node name for
|
||||
// deterministic routing. `false` = not a cold start.
|
||||
let loaded_route = loaded_candidates
|
||||
.into_iter()
|
||||
.min_by(|a, b| a.2.cmp(&b.2).then_with(|| a.0.cmp(&b.0)))
|
||||
.map(|(name, endpoint, _score)| (name, endpoint, false));
|
||||
(loaded_route, unloaded_route, recovering_node, any_healthy)
|
||||
};
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ impl CortexState {
|
||||
last_poll: None,
|
||||
discovery: None,
|
||||
activation: None,
|
||||
model_load: HashMap::new(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
189
crates/cortex-gateway/tests/load_routing.rs
Normal file
189
crates/cortex-gateway/tests/load_routing.rs
Normal file
@@ -0,0 +1,189 @@
|
||||
//! Load-aware routing across replicas (#55).
|
||||
//!
|
||||
//! When a model is loaded on more than one healthy neuron, the router picks
|
||||
//! the least-busy replica using the per-model admission load each neuron
|
||||
//! reports on `GET /health` (#53), rather than always taking the first.
|
||||
|
||||
mod common;
|
||||
|
||||
use axum::Json;
|
||||
use axum::extract::Path;
|
||||
use axum::http::{StatusCode, header};
|
||||
use axum::response::IntoResponse;
|
||||
use axum::routing::{get, post};
|
||||
use cortex_core::config::{
|
||||
EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NeuronEndpoint,
|
||||
};
|
||||
use cortex_core::discovery::ModelLoad;
|
||||
use cortex_core::node::{ModelEntry, ModelStatus};
|
||||
use cortex_gateway::state::CortexState;
|
||||
use serde_json::{Value, json};
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
/// Seed a node as healthy with `test-model` loaded and a given admission load.
|
||||
async fn seed_loaded(fleet: &CortexState, node: &str, in_flight: usize, queue_depth: usize) {
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
let n = nodes.get_mut(node).expect("node exists");
|
||||
n.healthy = true;
|
||||
n.models.insert(
|
||||
"test-model".into(),
|
||||
ModelEntry {
|
||||
id: "test-model".into(),
|
||||
status: ModelStatus::Loaded,
|
||||
last_accessed: None,
|
||||
vram_estimate_mb: Some(8000),
|
||||
capabilities: Vec::new(),
|
||||
tool_call: false,
|
||||
reasoning: false,
|
||||
limit: None,
|
||||
},
|
||||
);
|
||||
n.model_load.insert(
|
||||
"test-model".into(),
|
||||
ModelLoad {
|
||||
id: "test-model".into(),
|
||||
in_flight,
|
||||
queue_depth,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/// Build a gateway state over two mock neurons (no poller; we seed state).
|
||||
async fn two_neuron_fleet(endpoint_a: &str, endpoint_b: &str) -> Arc<CortexState> {
|
||||
let config = GatewayConfig {
|
||||
gateway: GatewaySettings {
|
||||
listen: "127.0.0.1:0".into(),
|
||||
metrics_listen: "127.0.0.1:0".into(),
|
||||
},
|
||||
eviction: EvictionSettings {
|
||||
strategy: EvictionStrategy::Lru,
|
||||
defrag_after_cycles: 0,
|
||||
},
|
||||
neurons: vec![
|
||||
NeuronEndpoint {
|
||||
name: "node-a".into(),
|
||||
endpoint: endpoint_a.to_string(),
|
||||
},
|
||||
NeuronEndpoint {
|
||||
name: "node-b".into(),
|
||||
endpoint: endpoint_b.to_string(),
|
||||
},
|
||||
],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
};
|
||||
Arc::new(CortexState::from_config(&config))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn routes_to_least_busy_replica() {
|
||||
let neuron_a = common::spawn_mock_neuron().await;
|
||||
let neuron_b = common::spawn_mock_neuron().await;
|
||||
let fleet = two_neuron_fleet(&neuron_a, &neuron_b).await;
|
||||
|
||||
// A is busy (1 running + 3 queued), B is idle.
|
||||
seed_loaded(&fleet, "node-a", 1, 3).await;
|
||||
seed_loaded(&fleet, "node-b", 0, 0).await;
|
||||
|
||||
let route = cortex_gateway::router::resolve(&fleet, "test-model")
|
||||
.await
|
||||
.expect("model is loaded on both nodes");
|
||||
assert_eq!(route.node_name, "node-b", "should pick the idle replica");
|
||||
|
||||
// Flip the load: now B is the busy one.
|
||||
seed_loaded(&fleet, "node-a", 0, 0).await;
|
||||
seed_loaded(&fleet, "node-b", 1, 5).await;
|
||||
let route = cortex_gateway::router::resolve(&fleet, "test-model")
|
||||
.await
|
||||
.expect("still loaded");
|
||||
assert_eq!(route.node_name, "node-a", "should follow the lighter load");
|
||||
}
|
||||
|
||||
/// Mock neuron whose inference endpoint always returns a #63 backpressure
|
||||
/// envelope (503 + Retry-After) — simulating a saturated neuron.
|
||||
async fn spawn_busy_neuron() -> String {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let base_url = format!("http://{addr}");
|
||||
let inference_url = base_url.clone();
|
||||
let app = axum::Router::new()
|
||||
.route(
|
||||
"/models/{model_id}/endpoint",
|
||||
get(move |Path(_): Path<String>| {
|
||||
let url = inference_url.clone();
|
||||
async move { Json(json!({ "url": url })) }
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/v1/chat/completions",
|
||||
post(|| async {
|
||||
let body = json!({"error": {
|
||||
"message": "model is busy (admission queue full); retry shortly",
|
||||
"type": "rate_limit_error",
|
||||
"code": "rate_limit_exceeded",
|
||||
"param": null
|
||||
}});
|
||||
(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
[(header::RETRY_AFTER, "6")],
|
||||
Json(body),
|
||||
)
|
||||
.into_response()
|
||||
}),
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
base_url
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn neuron_backpressure_is_propagated_intact() {
|
||||
// A saturated neuron's 503 + Retry-After + envelope must reach the client
|
||||
// verbatim — not unwrapped, remapped, or stripped (#55 / #63).
|
||||
let neuron = spawn_busy_neuron().await;
|
||||
let fleet = two_neuron_fleet(&neuron, &neuron).await;
|
||||
seed_loaded(&fleet, "node-a", 1, 8).await;
|
||||
|
||||
let app = cortex_gateway::build_app(Arc::clone(&fleet));
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("http://{addr}/v1/chat/completions"))
|
||||
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE);
|
||||
assert_eq!(
|
||||
resp.headers()
|
||||
.get(reqwest::header::RETRY_AFTER)
|
||||
.and_then(|v| v.to_str().ok()),
|
||||
Some("6"),
|
||||
"Retry-After must survive the proxy"
|
||||
);
|
||||
let body: Value = resp.json().await.unwrap();
|
||||
assert_eq!(body["error"]["code"], "rate_limit_exceeded");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ties_break_deterministically_by_name() {
|
||||
let neuron_a = common::spawn_mock_neuron().await;
|
||||
let neuron_b = common::spawn_mock_neuron().await;
|
||||
let fleet = two_neuron_fleet(&neuron_a, &neuron_b).await;
|
||||
|
||||
// Equal load on both → stable pick (lowest node name).
|
||||
seed_loaded(&fleet, "node-a", 0, 0).await;
|
||||
seed_loaded(&fleet, "node-b", 0, 0).await;
|
||||
|
||||
let route = cortex_gateway::router::resolve(&fleet, "test-model")
|
||||
.await
|
||||
.expect("loaded");
|
||||
assert_eq!(route.node_name, "node-a", "ties break by name");
|
||||
}
|
||||
174
crates/cortex-gateway/tests/prompt_prevalidation.rs
Normal file
174
crates/cortex-gateway/tests/prompt_prevalidation.rs
Normal file
@@ -0,0 +1,174 @@
|
||||
//! Fail-fast prompt pre-validation + advisory client hints (#56).
|
||||
//!
|
||||
//! cortex refuses a prompt that already exceeds the model's advertised
|
||||
//! context window before dispatching to neuron — the same #60
|
||||
//! `context_length_exceeded` envelope neuron would emit, just earlier — and
|
||||
//! attaches an advisory `X-Helexa-Advice` header for fingerprinted clients.
|
||||
|
||||
use axum::Json;
|
||||
use axum::extract::Path;
|
||||
use axum::routing::{get, post};
|
||||
use cortex_core::config::{
|
||||
EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NeuronEndpoint,
|
||||
};
|
||||
use cortex_core::harness::ModelLimit;
|
||||
use cortex_core::node::{ModelEntry, ModelStatus};
|
||||
use cortex_gateway::state::CortexState;
|
||||
use serde_json::{Value, json};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
/// Mock neuron with a hit counter, so a test can prove a request was (or
|
||||
/// wasn't) dispatched past the gateway's pre-validation.
|
||||
async fn spawn_counting_neuron() -> (String, Arc<AtomicU64>) {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let base_url = format!("http://{addr}");
|
||||
let inference_url = base_url.clone();
|
||||
let hits = Arc::new(AtomicU64::new(0));
|
||||
let sink = Arc::clone(&hits);
|
||||
let app = axum::Router::new()
|
||||
.route(
|
||||
"/models/{model_id}/endpoint",
|
||||
get(move |Path(_): Path<String>| {
|
||||
let url = inference_url.clone();
|
||||
async move { Json(json!({ "url": url })) }
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/v1/chat/completions",
|
||||
post(move || {
|
||||
let sink = Arc::clone(&sink);
|
||||
async move {
|
||||
sink.fetch_add(1, Ordering::SeqCst);
|
||||
Json(json!({
|
||||
"id": "c", "object": "chat.completion", "created": 1_700_000_000_u64,
|
||||
"model": "test-model",
|
||||
"choices": [{"index": 0, "message": {"role": "assistant", "content": "ok"}, "finish_reason": "stop"}],
|
||||
"usage": {"prompt_tokens": 3, "completion_tokens": 1, "total_tokens": 4}
|
||||
}))
|
||||
}
|
||||
}),
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
(base_url, hits)
|
||||
}
|
||||
|
||||
/// Gateway over one neuron with `test-model` loaded and a tiny advertised
|
||||
/// context window (so a modest prompt overflows it).
|
||||
async fn spawn_gateway(neuron: &str, context: usize) -> String {
|
||||
let config = GatewayConfig {
|
||||
gateway: GatewaySettings {
|
||||
listen: "127.0.0.1:0".into(),
|
||||
metrics_listen: "127.0.0.1:0".into(),
|
||||
},
|
||||
eviction: EvictionSettings {
|
||||
strategy: EvictionStrategy::Lru,
|
||||
defrag_after_cycles: 0,
|
||||
},
|
||||
neurons: vec![NeuronEndpoint {
|
||||
name: "mock-node".into(),
|
||||
endpoint: neuron.to_string(),
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
};
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
{
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
let n = nodes.get_mut("mock-node").unwrap();
|
||||
n.healthy = true;
|
||||
n.models.insert(
|
||||
"test-model".into(),
|
||||
ModelEntry {
|
||||
id: "test-model".into(),
|
||||
status: ModelStatus::Loaded,
|
||||
last_accessed: None,
|
||||
vram_estimate_mb: Some(8000),
|
||||
capabilities: Vec::new(),
|
||||
tool_call: false,
|
||||
reasoning: false,
|
||||
limit: Some(ModelLimit {
|
||||
context,
|
||||
input: None,
|
||||
output: 16,
|
||||
}),
|
||||
},
|
||||
);
|
||||
}
|
||||
let app = cortex_gateway::build_app(Arc::clone(&fleet));
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
format!("http://{addr}")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn over_long_prompt_is_rejected_before_dispatch() {
|
||||
let (neuron, hits) = spawn_counting_neuron().await;
|
||||
let gateway = spawn_gateway(&neuron, 50).await; // tiny 50-token window
|
||||
|
||||
// ~1200 chars → ~300 est tokens, well over 50.
|
||||
let big = "word ".repeat(240);
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("{gateway}/v1/chat/completions"))
|
||||
.header("user-agent", "litellm/1.0")
|
||||
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": big}]}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
|
||||
// Advisory hint for the fingerprinted client (header only, never body).
|
||||
assert!(
|
||||
resp.headers().get("x-helexa-advice").is_some(),
|
||||
"litellm should get advice"
|
||||
);
|
||||
let body: Value = resp.json().await.unwrap();
|
||||
assert_eq!(body["error"]["code"], "context_length_exceeded");
|
||||
assert_eq!(body["error"]["max"], 50);
|
||||
// Refused at the edge — neuron never saw it.
|
||||
assert_eq!(hits.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn within_context_passes_through() {
|
||||
let (neuron, hits) = spawn_counting_neuron().await;
|
||||
let gateway = spawn_gateway(&neuron, 4096).await;
|
||||
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("{gateway}/v1/chat/completions"))
|
||||
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": "hi"}]}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::OK);
|
||||
let _ = resp.bytes().await.unwrap();
|
||||
assert_eq!(hits.load(Ordering::SeqCst), 1, "served by neuron");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unknown_client_gets_no_advice_header() {
|
||||
let (neuron, _hits) = spawn_counting_neuron().await;
|
||||
let gateway = spawn_gateway(&neuron, 50).await;
|
||||
|
||||
let big = "word ".repeat(240);
|
||||
let resp = reqwest::Client::new()
|
||||
.post(format!("{gateway}/v1/chat/completions"))
|
||||
// no/unknown User-Agent → no advice, but still a clean 400
|
||||
.json(&json!({"model": "test-model", "messages": [{"role": "user", "content": big}]}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
|
||||
assert!(resp.headers().get("x-helexa-advice").is_none());
|
||||
let body: Value = resp.json().await.unwrap();
|
||||
assert_eq!(body["error"]["code"], "context_length_exceeded");
|
||||
}
|
||||
Reference in New Issue
Block a user