feat(cortex): unified /v1/models — catalogue × topology feasibility + cold-load
Some checks failed
build-prerelease / Resolve version stamps (push) Successful in 45s
CI / Format (push) Successful in 48s
CI / Clippy (push) Successful in 2m12s
CI / Test (push) Successful in 4m42s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Build cortex binary (push) Successful in 5m10s
build-prerelease / Build neuron-blackwell (push) Successful in 3m35s
build-prerelease / Package cortex RPM (push) Successful in 1m19s
build-prerelease / Build neuron-ada (push) Has been cancelled
build-prerelease / Package helexa-neuron-ada RPM (push) Has been cancelled
build-prerelease / Package helexa-neuron-ampere RPM (push) Has been cancelled
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been cancelled
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Has been cancelled
build-prerelease / Build neuron-ampere (push) Has been cancelled

Realises [project-unified-models-endpoint]: cortex now surfaces every
model the operator has provisioned in the catalogue, transparently
cold-loads on the first request, and routes the request once the load
is done — without per-node configuration or client awareness of which
neuron hosts what.

cortex-core changes:
- NodeState gains `discovery: Option<DiscoveryResponse>` — populated
  once per neuron on first successful poll, cached forever after
  (topology is invariant for a neuron process).
- ModelProfile gains `is_feasible_on(neuron, devices)` with the
  pinned_on / min_devices / min_device_vram_mb logic + 5 unit tests.
- CortexModelEntry expanded with OpenAI-compatible (`id`, `object`,
  `created`, `owned_by`) plus helexa-specific extension fields
  (`loaded`, `feasible_on`, `locations`).

cortex-gateway changes:
- poller.rs: `maybe_poll_discovery` fetches `GET /discovery` once per
  neuron and caches on NodeState.
- handlers.rs::list_models rewritten as union of (catalogue × topology
  feasibility) + (currently loaded somewhere). Catalogue-defined models
  surface even when not yet loaded.
- router.rs::resolve gains priority 3 (catalogue cold-load):
    1. loaded somewhere → route there
    2. unloaded somewhere → route + lazy load via neuron
    3. in catalogue → pick feasible neuron, POST /models/load, wait,
       route. Cache the new entry locally so subsequent requests skip
       the poll wait.
    4. else 404
- pick_feasible_neuron prefers pinned_on neurons, falls back to any
  feasible one (stable by name).
- profile_to_spec translates ModelProfile → ModelSpec, picking devices
  by VRAM floor and setting tensor_parallel = min_devices for multi-
  device profiles.
- "already loaded" responses from neuron are tolerated (two concurrent
  requests racing the same cold-load is a benign outcome).

models.example.toml rewritten to reflect the canonical helexa fleet
(beast = 2x RTX 5090, benjy = RTX 4090, quadbrat = RTX 3060) with a
working TP example (Qwen3.6-27B pinned on beast) plus single-GPU
profiles for the smaller models.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-20 07:39:04 +03:00
parent f72dee094f
commit 735945ee81
7 changed files with 528 additions and 54 deletions

View File

@@ -185,12 +185,62 @@ async fn anthropic_messages(
}
}
/// `GET /v1/models` — aggregate models from all nodes.
/// `GET /v1/models` — union of (catalogue × topology feasibility) and
/// (currently loaded somewhere). The result is what the fleet *could*
/// serve, not just what's already loaded — so OpenAI-compatible tools
/// see every model the operator has provisioned, and cortex
/// transparently cold-loads the first time one is requested.
async fn list_models(State(fleet): State<Arc<CortexState>>) -> Json<Value> {
use std::collections::HashMap;
let now = Utc::now().timestamp() as u64;
let nodes = fleet.nodes.read().await;
let mut model_map: std::collections::HashMap<String, CortexModelEntry> =
std::collections::HashMap::new();
let catalogue = &fleet.catalogue;
let mut entries: HashMap<String, CortexModelEntry> = HashMap::new();
// Pass 1: catalogue × topology. For every catalogue profile, find
// healthy neurons whose discovered devices satisfy the profile.
// Catalogue-defined models surface here even if nothing has loaded
// them yet — that's the point of the unified endpoint.
for profile in &catalogue.models {
let mut feasible_on = Vec::new();
for node in nodes.values() {
if !node.healthy {
continue;
}
let Some(disc) = node.discovery.as_ref() else {
continue;
};
if profile.is_feasible_on(&node.name, &disc.devices) {
feasible_on.push(node.name.clone());
}
}
if feasible_on.is_empty() {
// The catalogue lists this model but no neuron's topology
// matches — surface it as not-loaded with no feasible
// location. Hides nothing; lets operators see why a
// configured model isn't reachable.
feasible_on.clear();
}
entries.insert(
profile.id.clone(),
CortexModelEntry {
id: profile.id.clone(),
object: "model".into(),
created: now,
owned_by: "helexa".into(),
loaded: false,
feasible_on,
locations: Vec::new(),
},
);
}
// Pass 2: layer the actually-loaded state on top. For each
// (node, model) entry, attach a ModelLocation. If the model isn't
// in the catalogue, create a new CortexModelEntry from scratch —
// cortex doesn't refuse to surface a manually-loaded model just
// because the operator didn't enumerate it in models.toml.
for node in nodes.values() {
for (model_id, entry) in &node.models {
let location = ModelLocation {
@@ -198,19 +248,30 @@ async fn list_models(State(fleet): State<Arc<CortexState>>) -> Json<Value> {
status: entry.status,
vram_estimate_mb: entry.vram_estimate_mb,
};
model_map
let was_loaded = matches!(entry.status, cortex_core::node::ModelStatus::Loaded);
entries
.entry(model_id.clone())
.and_modify(|e| e.locations.push(location.clone()))
.and_modify(|e| {
e.locations.push(location.clone());
if was_loaded {
e.loaded = true;
}
})
.or_insert_with(|| CortexModelEntry {
id: model_id.clone(),
object: "model".into(),
created: now,
owned_by: "helexa".into(),
loaded: was_loaded,
// Not in catalogue — cortex has no opinion on
// feasibility; leave empty.
feasible_on: Vec::new(),
locations: vec![location],
});
}
}
let data: Vec<Value> = model_map.values().map(|e| json!(e)).collect();
let data: Vec<Value> = entries.values().map(|e| json!(e)).collect();
Json(json!({
"object": "list",
"data": data,

View File

@@ -3,6 +3,7 @@
use crate::state::CortexState;
use chrono::Utc;
use cortex_core::discovery::DiscoveryResponse;
use cortex_core::harness::ModelInfo;
use cortex_core::node::{ModelEntry, ModelStatus};
use std::sync::Arc;
@@ -25,7 +26,59 @@ pub async fn poll_once(fleet: &CortexState) {
}
}
/// One-shot fetch of `GET /discovery`. Cached on the NodeState forever
/// after the first success — topology is invariant for a given neuron
/// process. Skipped when the cache is already populated.
async fn maybe_poll_discovery(fleet: &CortexState, name: &str, endpoint: &str) {
{
let nodes = fleet.nodes.read().await;
match nodes.get(name) {
Some(n) if n.discovery.is_some() => return,
_ => {}
}
}
let url = format!("{endpoint}/discovery");
let resp = match fleet
.http_client
.get(&url)
.timeout(Duration::from_secs(5))
.send()
.await
{
Ok(r) if r.status().is_success() => r,
Ok(r) => {
tracing::debug!(node = name, status = %r.status(), "discovery probe non-success");
return;
}
Err(e) => {
tracing::debug!(node = name, error = %e, "discovery probe unreachable");
return;
}
};
match resp.json::<DiscoveryResponse>().await {
Ok(d) => {
let mut nodes = fleet.nodes.write().await;
if let Some(node) = nodes.get_mut(name) {
tracing::info!(
node = name,
hostname = %d.hostname,
devices = d.devices.len(),
"discovery cached"
);
node.discovery = Some(d);
}
}
Err(e) => {
tracing::warn!(node = name, error = %e, "failed to parse /discovery response");
}
}
}
async fn poll_neuron(fleet: &CortexState, name: &str, endpoint: &str) {
// Topology first — cheap once cached, and the router needs it to
// route requests against catalogue entries that aren't loaded yet.
maybe_poll_discovery(fleet, name, endpoint).await;
let url = format!("{endpoint}/models");
let result = fleet

View File

@@ -2,13 +2,21 @@
//!
//! Given a model ID from an inbound request, determine which node should
//! handle it. Priority:
//! 1. Node where the model is currently `Loaded`
//! 2. Node where the model is `Unloaded` (will lazy-load on request)
//! 3. Error: model not found on any node
//! 1. Node where the model is currently `Loaded` → use it.
//! 2. Node where the model is `Unloaded` → use it; neuron's existing
//! lazy-load behaviour will reload before serving the request.
//! 3. Model is in the catalogue → pick a feasible neuron, call
//! `POST /models/load`, wait for the load to complete, then
//! proxy. First-request cold-load latency is acceptable per the
//! unified-endpoint contract.
//! 4. Not in catalogue, not loaded anywhere → 404.
use crate::state::CortexState;
use cortex_core::catalogue::ModelProfile;
use cortex_core::harness::ModelSpec;
use cortex_core::node::ModelStatus;
use std::sync::Arc;
use std::time::Duration;
/// The routing decision: which node endpoint to proxy the request to.
#[derive(Debug, Clone)]
@@ -16,18 +24,31 @@ pub struct RouteDecision {
pub node_name: String,
/// The inference endpoint to proxy to (from neuron's /models/{id}/endpoint).
pub endpoint: String,
/// Whether the model will need to load (cold start).
/// Whether the model will need to load (cold start). Set to true
/// when we proxied to an `Unloaded` node (lazy load on neuron) or
/// when we just triggered an explicit cold-load via the catalogue
/// path.
pub cold_start: bool,
}
#[derive(Debug, thiserror::Error)]
pub enum RouteError {
#[error("model '{0}' not found on any node")]
#[error("model '{0}' not found on any node and not in catalogue")]
ModelNotFound(String),
#[error("no healthy nodes available")]
NoHealthyNodes,
#[error("failed to resolve inference endpoint for model '{0}' on node '{1}'")]
EndpointResolveFailed(String, String),
#[error(
"model '{model_id}' is in the catalogue but no healthy neuron's topology satisfies its constraints"
)]
NoFeasibleNeuron { model_id: String },
#[error("cold-load of '{model_id}' on '{node}' failed: {message}")]
ColdLoadFailed {
model_id: String,
node: String,
message: String,
},
}
/// Resolve which node should serve a request for the given model.
@@ -36,42 +57,231 @@ pub async fn resolve(
fleet: &Arc<CortexState>,
model_id: &str,
) -> Result<RouteDecision, RouteError> {
let (node_name, neuron_endpoint, cold_start) = {
// Snapshot loaded / unloaded state from the poller cache.
let (loaded_route, unloaded_route, any_healthy) = {
let nodes = fleet.nodes.read().await;
let mut loaded_candidate = None;
let mut unloaded_candidate = None;
let mut loaded_route = None;
let mut unloaded_route = None;
let mut any_healthy = false;
for node in nodes.values() {
if !node.healthy {
continue;
}
any_healthy = true;
if let Some(entry) = node.models.get(model_id) {
match entry.status {
ModelStatus::Loaded | ModelStatus::Reloading => {
loaded_candidate = Some((node.name.clone(), node.endpoint.clone(), false));
loaded_route = Some((node.name.clone(), node.endpoint.clone(), false));
break;
}
ModelStatus::Unloaded => {
if unloaded_candidate.is_none() {
unloaded_candidate =
Some((node.name.clone(), node.endpoint.clone(), true));
if unloaded_route.is_none() {
unloaded_route = Some((node.name.clone(), node.endpoint.clone(), true));
}
}
}
}
}
loaded_candidate.or(unloaded_candidate).ok_or_else(|| {
if nodes.values().any(|n| n.healthy) {
RouteError::ModelNotFound(model_id.to_string())
} else {
RouteError::NoHealthyNodes
}
})?
(loaded_route, unloaded_route, any_healthy)
};
// Ask the neuron for the inference endpoint for this model.
if !any_healthy {
return Err(RouteError::NoHealthyNodes);
}
// Priority 1: already loaded.
if let Some((node_name, neuron_endpoint, cold_start)) = loaded_route {
return finish(fleet, &node_name, &neuron_endpoint, model_id, cold_start).await;
}
// Priority 2: known to neuron but unloaded (neuron's lazy load).
if let Some((node_name, neuron_endpoint, cold_start)) = unloaded_route {
return finish(fleet, &node_name, &neuron_endpoint, model_id, cold_start).await;
}
// Priority 3: catalogue × topology cold-load.
if let Some(profile) = fleet.catalogue.get(model_id) {
let (node_name, neuron_endpoint) = pick_feasible_neuron(fleet, profile).await?;
cold_load(fleet, &node_name, &neuron_endpoint, profile).await?;
return finish(fleet, &node_name, &neuron_endpoint, model_id, true).await;
}
Err(RouteError::ModelNotFound(model_id.to_string()))
}
/// Pick a healthy neuron whose discovered topology satisfies the
/// profile. Preference order:
/// 1. A neuron from `profile.pinned_on` that is healthy + feasible.
/// 2. Otherwise, any healthy + feasible neuron, stable by name.
async fn pick_feasible_neuron(
fleet: &Arc<CortexState>,
profile: &ModelProfile,
) -> Result<(String, String), RouteError> {
let nodes = fleet.nodes.read().await;
let mut candidates: Vec<(String, String, bool)> = Vec::new();
for node in nodes.values() {
if !node.healthy {
continue;
}
let Some(disc) = node.discovery.as_ref() else {
continue;
};
if !profile.is_feasible_on(&node.name, &disc.devices) {
continue;
}
let pinned = profile.pinned_on.iter().any(|n| n == &node.name);
candidates.push((node.name.clone(), node.endpoint.clone(), pinned));
}
candidates.sort_by(|a, b| {
b.2.cmp(&a.2) // pinned first (true > false)
.then(a.0.cmp(&b.0))
});
let pick = candidates.into_iter().next();
pick.map(|(n, e, _)| (n, e))
.ok_or_else(|| RouteError::NoFeasibleNeuron {
model_id: profile.id.clone(),
})
}
/// Issue `POST {endpoint}/models/load` for this profile on this neuron,
/// blocking until the load completes (neuron's load endpoint is
/// synchronous — it returns 200 once VRAM is materialised). On success
/// also inserts a `Loaded` entry into the local NodeState cache so the
/// caller's subsequent endpoint lookup sees the new model without
/// waiting for the next poll cycle.
async fn cold_load(
fleet: &Arc<CortexState>,
node_name: &str,
neuron_endpoint: &str,
profile: &ModelProfile,
) -> Result<(), RouteError> {
let spec = profile_to_spec(fleet, node_name, profile).await;
let url = format!("{neuron_endpoint}/models/load");
tracing::info!(model = %profile.id, node = node_name, "cold-loading via /models/load");
// Generous timeout: a fresh download + safetensors mmap + device
// copy for a 30B-class dense model can comfortably exceed 5 min on
// a slow link. The HTTP client's own default already covers most
// of this; pin a longer per-request bound just here.
let resp = match fleet
.http_client
.post(&url)
.timeout(Duration::from_secs(1800))
.json(&spec)
.send()
.await
{
Ok(r) => r,
Err(e) => {
return Err(RouteError::ColdLoadFailed {
model_id: profile.id.clone(),
node: node_name.to_string(),
message: format!("HTTP request failed: {e}"),
});
}
};
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
// Neuron returns 400 "already loaded" when two concurrent
// requests race the same model. Treat that as success — both
// requests effectively achieved the same end state.
if body.contains("already loaded") {
tracing::info!(
model = %profile.id,
node = node_name,
"cold-load saw 'already loaded' — treating as success"
);
} else {
return Err(RouteError::ColdLoadFailed {
model_id: profile.id.clone(),
node: node_name.to_string(),
message: format!("HTTP {status}: {body}"),
});
}
} else {
tracing::info!(model = %profile.id, node = node_name, "cold-load returned 200");
}
// Warm the cache: insert a Loaded ModelEntry so the next
// resolve() finds the model without waiting for the poll loop.
{
let mut nodes = fleet.nodes.write().await;
if let Some(node) = nodes.get_mut(node_name) {
node.models.insert(
profile.id.clone(),
cortex_core::node::ModelEntry {
id: profile.id.clone(),
status: ModelStatus::Loaded,
last_accessed: Some(chrono::Utc::now()),
vram_estimate_mb: profile.vram_mb,
},
);
}
}
Ok(())
}
/// Translate a `ModelProfile` to a `ModelSpec` neuron's /models/load
/// accepts. Devices are picked from the neuron's discovered topology —
/// the first `min_devices` indices that meet `min_device_vram_mb`.
async fn profile_to_spec(
fleet: &Arc<CortexState>,
node_name: &str,
profile: &ModelProfile,
) -> ModelSpec {
let devices = {
let nodes = fleet.nodes.read().await;
let mut picked: Vec<u32> = Vec::new();
if let Some(node) = nodes.get(node_name)
&& let Some(disc) = &node.discovery
{
let min_vram = profile.min_device_vram_mb.unwrap_or(0);
for d in &disc.devices {
if d.vram_total_mb >= min_vram {
picked.push(d.index);
if picked.len() as u32 >= profile.min_devices {
break;
}
}
}
}
if picked.is_empty() {
// Fall back to a 0..min_devices default; pick_feasible_neuron
// already verified the topology satisfies the constraints,
// so this only fires if discovery raced or was lost.
(0..profile.min_devices).collect()
} else {
picked
}
};
let tensor_parallel = if profile.min_devices > 1 {
Some(profile.min_devices)
} else {
None
};
ModelSpec {
model_id: profile.id.clone(),
harness: profile.harness.clone(),
quant: profile.quant.clone(),
tensor_parallel,
devices: Some(devices),
}
}
/// Resolve neuron's `/models/{id}/endpoint` to its inference URL and
/// build the final `RouteDecision`. Shared by all three priority
/// branches above.
async fn finish(
fleet: &Arc<CortexState>,
node_name: &str,
neuron_endpoint: &str,
model_id: &str,
cold_start: bool,
) -> Result<RouteDecision, RouteError> {
let endpoint_url = format!(
"{}/models/{}/endpoint",
neuron_endpoint,
@@ -90,11 +300,11 @@ pub async fn resolve(
};
let endpoint = inference_endpoint.ok_or_else(|| {
RouteError::EndpointResolveFailed(model_id.to_string(), node_name.clone())
RouteError::EndpointResolveFailed(model_id.to_string(), node_name.to_string())
})?;
Ok(RouteDecision {
node_name,
node_name: node_name.to_string(),
endpoint,
cold_start,
})

View File

@@ -26,6 +26,7 @@ impl CortexState {
models: HashMap::new(),
lifecycle_cycles: 0,
last_poll: None,
discovery: None,
},
);
}