Compare commits
1 Commits
fix/cortex
...
fix/neuron
| Author | SHA1 | Date | |
|---|---|---|---|
|
db7e373b90
|
@@ -32,12 +32,6 @@ pub struct NodeState {
|
||||
/// least-busy replica when a model is loaded on more than one neuron.
|
||||
/// Empty until the first /health poll reports load.
|
||||
pub model_load: HashMap<String, ModelLoad>,
|
||||
/// Consecutive failed `/models` polls. The poller marks a node
|
||||
/// unhealthy only once this crosses a threshold, so a single transient
|
||||
/// miss (e.g. a neuron momentarily slow to answer while busy) doesn't
|
||||
/// yank the node — and all its models — out of routing. Reset to 0 on
|
||||
/// any successful poll.
|
||||
pub consecutive_poll_failures: u32,
|
||||
}
|
||||
|
||||
/// A model registered on a node, with its runtime status.
|
||||
|
||||
@@ -5,29 +5,12 @@ use crate::state::CortexState;
|
||||
use chrono::Utc;
|
||||
use cortex_core::discovery::{DiscoveryResponse, HealthResponse};
|
||||
use cortex_core::harness::ModelInfo;
|
||||
use cortex_core::node::{ModelEntry, ModelStatus, NodeState};
|
||||
use cortex_core::node::{ModelEntry, ModelStatus};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
const POLL_INTERVAL: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Consecutive failed `/models` polls before a node is marked unhealthy.
|
||||
/// Debounces transient misses (a busy neuron briefly slow to answer) so a
|
||||
/// single blip can't yank a node — and its models — out of routing. At the
|
||||
/// 10s poll interval this tolerates ~20s of flapping before evicting.
|
||||
const POLL_FAILURE_THRESHOLD: u32 = 3;
|
||||
|
||||
/// Record a failed poll for `node`, marking it unhealthy only once failures
|
||||
/// reach [`POLL_FAILURE_THRESHOLD`]. Below the threshold the node keeps its
|
||||
/// last-known health, riding over transient misses. A successful poll resets
|
||||
/// the counter (see the success arm in `poll_once`).
|
||||
fn record_poll_failure(node: &mut NodeState) {
|
||||
node.consecutive_poll_failures = node.consecutive_poll_failures.saturating_add(1);
|
||||
if node.consecutive_poll_failures >= POLL_FAILURE_THRESHOLD {
|
||||
node.healthy = false;
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs forever, polling all neurons on a fixed interval.
|
||||
pub async fn poll_loop(fleet: Arc<CortexState>) {
|
||||
loop {
|
||||
@@ -155,14 +138,13 @@ async fn poll_neuron(fleet: &CortexState, name: &str, endpoint: &str) {
|
||||
// Remove models no longer reported by the neuron.
|
||||
node.models.retain(|id, _| seen.contains(id));
|
||||
|
||||
node.consecutive_poll_failures = 0;
|
||||
node.healthy = true;
|
||||
node.last_poll = Some(Utc::now());
|
||||
tracing::debug!(node = name, models = models.len(), "poll ok");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(node = name, error = %e, "failed to parse /models response");
|
||||
record_poll_failure(node);
|
||||
node.healthy = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -172,11 +154,11 @@ async fn poll_neuron(fleet: &CortexState, name: &str, endpoint: &str) {
|
||||
status = %resp.status(),
|
||||
"neuron returned non-success status"
|
||||
);
|
||||
record_poll_failure(node);
|
||||
node.healthy = false;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(node = name, error = %e, "failed to reach neuron");
|
||||
record_poll_failure(node);
|
||||
node.healthy = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -50,10 +50,6 @@ pub enum RouteError {
|
||||
"model '{model_id}' is in the catalogue but no healthy neuron's topology satisfies its constraints"
|
||||
)]
|
||||
NoFeasibleNeuron { model_id: String },
|
||||
#[error(
|
||||
"model '{model_id}' is feasible on a neuron that is currently unhealthy — retry shortly"
|
||||
)]
|
||||
FeasibleNodeUnhealthy { model_id: String },
|
||||
#[error("cold-load of '{model_id}' on '{node}' failed: {message}")]
|
||||
ColdLoadFailed {
|
||||
model_id: String,
|
||||
@@ -72,9 +68,7 @@ impl RouteError {
|
||||
/// safe to retry the same request); everything else is 404.
|
||||
pub fn http_status(&self) -> u16 {
|
||||
match self {
|
||||
RouteError::NoHealthyNodes
|
||||
| RouteError::ModelRecovering { .. }
|
||||
| RouteError::FeasibleNodeUnhealthy { .. } => 503,
|
||||
RouteError::NoHealthyNodes | RouteError::ModelRecovering { .. } => 503,
|
||||
_ => 404,
|
||||
}
|
||||
}
|
||||
@@ -87,8 +81,7 @@ impl RouteError {
|
||||
| RouteError::EndpointResolveFailed(_, _)
|
||||
| RouteError::NoFeasibleNeuron { .. }
|
||||
| RouteError::ColdLoadFailed { .. }
|
||||
| RouteError::ModelRecovering { .. }
|
||||
| RouteError::FeasibleNodeUnhealthy { .. } => "api_error",
|
||||
| RouteError::ModelRecovering { .. } => "api_error",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,7 +94,6 @@ impl RouteError {
|
||||
RouteError::NoFeasibleNeuron { .. } => "service_unavailable",
|
||||
RouteError::ColdLoadFailed { .. } => "service_unavailable",
|
||||
RouteError::ModelRecovering { .. } => "service_unavailable",
|
||||
RouteError::FeasibleNodeUnhealthy { .. } => "service_unavailable",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,7 +105,6 @@ impl RouteError {
|
||||
pub fn retry_after_secs(&self) -> Option<u64> {
|
||||
match self {
|
||||
RouteError::ModelRecovering { .. } => Some(2),
|
||||
RouteError::FeasibleNodeUnhealthy { .. } => Some(3),
|
||||
RouteError::NoHealthyNodes => Some(5),
|
||||
_ => None,
|
||||
}
|
||||
@@ -261,32 +252,11 @@ async fn pick_feasible_neuron(
|
||||
b.2.cmp(&a.2) // pinned first (true > false)
|
||||
.then(a.0.cmp(&b.0))
|
||||
});
|
||||
if let Some((n, e, _)) = candidates.into_iter().next() {
|
||||
return Ok((n, e));
|
||||
}
|
||||
|
||||
// No *healthy* feasible neuron. Distinguish a transient outage from a
|
||||
// permanent misconfiguration: if some neuron is topologically feasible
|
||||
// but currently unhealthy (e.g. it briefly missed polls while busy),
|
||||
// this is retryable — return 503 + Retry-After so the client backs off
|
||||
// and retries instead of treating a 404 as a hard failure. Only when no
|
||||
// neuron could *ever* satisfy the topology is it a permanent 404.
|
||||
let feasible_but_unhealthy = nodes.values().any(|node| {
|
||||
!node.healthy
|
||||
&& node
|
||||
.discovery
|
||||
.as_ref()
|
||||
.is_some_and(|disc| profile.is_feasible_on(&node.name, &disc.devices))
|
||||
});
|
||||
if feasible_but_unhealthy {
|
||||
Err(RouteError::FeasibleNodeUnhealthy {
|
||||
let pick = candidates.into_iter().next();
|
||||
pick.map(|(n, e, _)| (n, e))
|
||||
.ok_or_else(|| RouteError::NoFeasibleNeuron {
|
||||
model_id: profile.id.clone(),
|
||||
})
|
||||
} else {
|
||||
Err(RouteError::NoFeasibleNeuron {
|
||||
model_id: profile.id.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Issue `POST {endpoint}/models/load` for this profile on this neuron,
|
||||
|
||||
@@ -38,7 +38,6 @@ impl CortexState {
|
||||
discovery: None,
|
||||
activation: None,
|
||||
model_load: HashMap::new(),
|
||||
consecutive_poll_failures: 0,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,124 +0,0 @@
|
||||
//! Router: a catalogued model whose only topologically-feasible neuron is
|
||||
//! currently unhealthy is a *transient* condition (retryable 503), not a
|
||||
//! permanent 404. This is the exact shape of the beast incident: benjy/
|
||||
//! quadbrat (1 GPU, healthy) can't host the 27B, and beast (2 GPU) — the
|
||||
//! sole feasible node — briefly drops out → clients must back off and retry,
|
||||
//! not hard-fail.
|
||||
|
||||
use cortex_core::config::{
|
||||
EvictionSettings, EvictionStrategy, GatewayConfig, GatewaySettings, NeuronEndpoint,
|
||||
};
|
||||
use cortex_core::discovery::{DeviceInfo, DiscoveryResponse};
|
||||
use cortex_gateway::router::{self, RouteError};
|
||||
use cortex_gateway::state::CortexState;
|
||||
use std::sync::Arc;
|
||||
|
||||
fn devices(n: usize) -> Vec<DeviceInfo> {
|
||||
(0..n)
|
||||
.map(|i| DeviceInfo {
|
||||
index: i as u32,
|
||||
name: "RTX 5090".into(),
|
||||
vram_total_mb: 32_768,
|
||||
compute_capability: "9.0".into(),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn discovery(host: &str, n_devices: usize) -> DiscoveryResponse {
|
||||
DiscoveryResponse {
|
||||
hostname: host.into(),
|
||||
os: "Linux".into(),
|
||||
kernel: "7.0".into(),
|
||||
cuda_version: Some("13.0".into()),
|
||||
driver_version: Some("999".into()),
|
||||
devices: devices(n_devices),
|
||||
harnesses: vec!["candle".into()],
|
||||
cuda_unavailable_reason: None,
|
||||
max_prompt_tokens: 49_152,
|
||||
}
|
||||
}
|
||||
|
||||
/// Catalogue with one model needing 2 devices. Returns a temp path.
|
||||
fn write_catalogue() -> std::path::PathBuf {
|
||||
let toml = r#"
|
||||
[[models]]
|
||||
id = "big-model"
|
||||
harness = "candle"
|
||||
min_devices = 2
|
||||
"#;
|
||||
let path = std::env::temp_dir().join("cortex_test_feasibility_models.toml");
|
||||
std::fs::write(&path, toml).unwrap();
|
||||
path
|
||||
}
|
||||
|
||||
async fn fleet_with(big_healthy: bool, big_devices: usize) -> Arc<CortexState> {
|
||||
let cat = write_catalogue();
|
||||
let config = GatewayConfig {
|
||||
gateway: GatewaySettings {
|
||||
listen: "127.0.0.1:0".into(),
|
||||
metrics_listen: "127.0.0.1:0".into(),
|
||||
},
|
||||
eviction: EvictionSettings {
|
||||
strategy: EvictionStrategy::Lru,
|
||||
defrag_after_cycles: 0,
|
||||
},
|
||||
neurons: vec![
|
||||
NeuronEndpoint {
|
||||
name: "small".into(),
|
||||
endpoint: "http://127.0.0.1:1".into(),
|
||||
},
|
||||
NeuronEndpoint {
|
||||
name: "big".into(),
|
||||
endpoint: "http://127.0.0.1:2".into(),
|
||||
},
|
||||
],
|
||||
models_config: cat.to_string_lossy().into_owned(),
|
||||
entitlements: Default::default(),
|
||||
};
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
{
|
||||
let mut nodes = fleet.nodes.write().await;
|
||||
// "small" is healthy but only has 1 GPU → not feasible for the model.
|
||||
let small = nodes.get_mut("small").unwrap();
|
||||
small.healthy = true;
|
||||
small.discovery = Some(discovery("small", 1));
|
||||
// "big" has enough GPUs but its health is the variable under test.
|
||||
let big = nodes.get_mut("big").unwrap();
|
||||
big.healthy = big_healthy;
|
||||
big.discovery = Some(discovery("big", big_devices));
|
||||
}
|
||||
fleet
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn feasible_node_unhealthy_is_transient_503() {
|
||||
// big (2 GPU, the only feasible node) is unhealthy; small (1 GPU) is
|
||||
// healthy but can't host the model → retryable, not a permanent 404.
|
||||
let fleet = fleet_with(false, 2).await;
|
||||
let err = router::resolve(&fleet, "big-model")
|
||||
.await
|
||||
.expect_err("model can't be served right now");
|
||||
assert!(
|
||||
matches!(err, RouteError::FeasibleNodeUnhealthy { .. }),
|
||||
"expected FeasibleNodeUnhealthy, got {err:?}"
|
||||
);
|
||||
assert_eq!(err.http_status(), 503);
|
||||
assert_eq!(err.retry_after_secs(), Some(3));
|
||||
assert_eq!(err.code(), "service_unavailable");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_node_can_ever_satisfy_is_permanent_404() {
|
||||
// big is healthy but only has 1 GPU now (e.g. topology genuinely can't
|
||||
// satisfy min_devices=2 anywhere) → permanent, non-retryable 404.
|
||||
let fleet = fleet_with(true, 1).await;
|
||||
let err = router::resolve(&fleet, "big-model")
|
||||
.await
|
||||
.expect_err("no feasible topology");
|
||||
assert!(
|
||||
matches!(err, RouteError::NoFeasibleNeuron { .. }),
|
||||
"expected NoFeasibleNeuron, got {err:?}"
|
||||
);
|
||||
assert_eq!(err.http_status(), 404);
|
||||
assert_eq!(err.retry_after_secs(), None);
|
||||
}
|
||||
@@ -228,26 +228,10 @@ async fn test_poller_marks_unreachable_node_unhealthy() {
|
||||
nodes.get_mut("dead-node").unwrap().healthy = true;
|
||||
}
|
||||
|
||||
// Debounce (#53 follow-up): a single missed poll must NOT evict a
|
||||
// previously-healthy node — a busy neuron briefly slow to answer
|
||||
// shouldn't yank its models out of routing.
|
||||
cortex_gateway::poller::poll_once(&fleet).await;
|
||||
assert!(
|
||||
fleet.nodes.read().await.get("dead-node").unwrap().healthy,
|
||||
"one failed poll should not mark a healthy node unhealthy"
|
||||
);
|
||||
|
||||
// It flips unhealthy only after POLL_FAILURE_THRESHOLD (3) consecutive
|
||||
// failures.
|
||||
cortex_gateway::poller::poll_once(&fleet).await;
|
||||
cortex_gateway::poller::poll_once(&fleet).await;
|
||||
assert!(
|
||||
!fleet.nodes.read().await.get("dead-node").unwrap().healthy,
|
||||
"three consecutive failed polls should mark the node unhealthy"
|
||||
);
|
||||
|
||||
// A subsequent successful poll would reset the counter and restore
|
||||
// health; covered implicitly by the discovery tests above.
|
||||
let nodes = fleet.nodes.read().await;
|
||||
assert!(!nodes.get("dead-node").unwrap().healthy);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -33,7 +33,7 @@ use crate::wire::{
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
|
||||
#[cfg(feature = "cuda")]
|
||||
use std::time::Duration;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
@@ -205,23 +205,50 @@ impl LoadedHandle {
|
||||
/// `NEURON_MAX_PROMPT_TOKENS`, when explicitly set, is applied as a
|
||||
/// clamp-only upper bound on the derived `context` — a backstop, not
|
||||
/// the authority. Unset → no clamp; the derivation stands alone.
|
||||
pub async fn derived_limit(
|
||||
/// Refresh the cached free-VRAM reading used by [`Self::derived_limit`]
|
||||
/// (#53). Queries the device worker — so it MUST run off the request
|
||||
/// path (background refresher / load-time seed), never from a control
|
||||
/// endpoint, since the query queues behind inference on the worker.
|
||||
/// Single-GPU caches the device's free VRAM; TP caches the tightest
|
||||
/// free across ranks (the same value `derived_limit` used pre-cache).
|
||||
pub async fn refresh_free_mb(&self) {
|
||||
let free = match self {
|
||||
LoadedHandle::Single(m) => m.query_vram().await.0,
|
||||
#[cfg(feature = "cuda")]
|
||||
LoadedHandle::Tp(m) => m.query_vram_tightest_free_mb().await,
|
||||
};
|
||||
// Don't clobber a good cached value with a transient `0`
|
||||
// (worker gone/poisoned sentinel).
|
||||
if free > 0 {
|
||||
match self {
|
||||
LoadedHandle::Single(m) => m.last_free_mb.store(free, Ordering::Release),
|
||||
#[cfg(feature = "cuda")]
|
||||
LoadedHandle::Tp(m) => m.last_free_mb.store(free, Ordering::Release),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn derived_limit(
|
||||
&self,
|
||||
cfg: &crate::config::ContextLimitConfig,
|
||||
) -> Option<cortex_core::harness::ModelLimit> {
|
||||
if !cfg.enabled {
|
||||
return None;
|
||||
}
|
||||
// Read the *cached* free VRAM — never query the device worker here.
|
||||
// This runs on `GET /models`; a live query would queue behind
|
||||
// inference on the worker thread and stall the control plane (#53).
|
||||
// The cache is refreshed off the request path (load + background task).
|
||||
let (profile, free_mb, rate) = match self {
|
||||
LoadedHandle::Single(m) => (
|
||||
m.context_profile?,
|
||||
m.query_vram().await.0,
|
||||
m.last_free_mb.load(Ordering::Acquire),
|
||||
m.prefill_rate.get(),
|
||||
),
|
||||
#[cfg(feature = "cuda")]
|
||||
LoadedHandle::Tp(m) => (
|
||||
m.context_profile?,
|
||||
m.query_vram_tightest_free_mb().await,
|
||||
m.last_free_mb.load(Ordering::Acquire),
|
||||
m.prefill_rate.get(),
|
||||
),
|
||||
};
|
||||
@@ -391,6 +418,13 @@ pub struct LoadedModel {
|
||||
/// request-path enforcement reads this — `0` means "not derived yet"
|
||||
/// → fall back to the static `NEURON_MAX_PROMPT_TOKENS`.
|
||||
pub derived_input_cap: AtomicUsize,
|
||||
/// Cached free VRAM (MiB) for the control plane (#53). `derived_limit`
|
||||
/// (served by `GET /models`) reads this instead of querying the device
|
||||
/// worker, which during inference is saturated processing forward jobs —
|
||||
/// a live query would queue behind them and stall `/models`, tripping
|
||||
/// cortex's health poller into marking the node unhealthy. Refreshed off
|
||||
/// the request path: seeded at load, then by a background task.
|
||||
pub last_free_mb: AtomicU64,
|
||||
}
|
||||
|
||||
impl LoadedModel {
|
||||
@@ -503,6 +537,10 @@ pub struct TpLoadedModel {
|
||||
/// Mint for pool-wide snapshot ids. Plain counter; uniqueness only
|
||||
/// needs to hold per model lifetime (snapshots die with the model).
|
||||
pub next_snapshot_id: std::sync::atomic::AtomicU64,
|
||||
/// Cached tightest free VRAM (MiB) for the control plane (#53) — see
|
||||
/// [`LoadedModel::last_free_mb`]. Read by `derived_limit` so `GET /models`
|
||||
/// never fans a VRAM query out to the (inference-saturated) TP workers.
|
||||
pub last_free_mb: AtomicU64,
|
||||
}
|
||||
|
||||
#[cfg(feature = "cuda")]
|
||||
@@ -1109,6 +1147,32 @@ fn debug_poison_armed(model_id: &str) -> bool {
|
||||
armed && !FIRED.swap(true, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Background control-plane VRAM cache refresher (#53). Every few seconds,
|
||||
/// refreshes each loaded model's `last_free_mb` so `derived_limit` (served
|
||||
/// by `GET /models`) reads a cached value and never queries the device
|
||||
/// worker on the request path — a live query would queue behind inference
|
||||
/// forward jobs on the worker thread, stalling `/models` for seconds and
|
||||
/// tripping cortex's health poller into evicting the node from routing.
|
||||
/// Holds a `Weak` so a shutting-down harness lets the task exit. The query
|
||||
/// itself may queue behind inference, but that only delays this background
|
||||
/// refresh — no request-path caller is ever blocked.
|
||||
async fn vram_cache_refresh_loop(weak: std::sync::Weak<CandleHarness>) {
|
||||
const REFRESH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
|
||||
loop {
|
||||
tokio::time::sleep(REFRESH_INTERVAL).await;
|
||||
let Some(this) = weak.upgrade() else {
|
||||
return; // harness dropped — exit
|
||||
};
|
||||
// Snapshot handles, then release the read lock before awaiting the
|
||||
// (possibly slow) worker queries so we never hold it across an await.
|
||||
let handles: Vec<LoadedHandle> = this.models.read().await.values().cloned().collect();
|
||||
drop(this);
|
||||
for handle in handles {
|
||||
handle.refresh_free_mb().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Background auto-recovery task (#17). Drains poisoned model ids and
|
||||
/// rebuilds each via [`CandleHarness::recover_one`]. Holds a `Weak` so a
|
||||
/// shutting-down harness lets the task exit; processes one id at a time,
|
||||
@@ -1595,6 +1659,11 @@ impl CandleHarness {
|
||||
if tokio::runtime::Handle::try_current().is_ok() {
|
||||
let weak = Arc::downgrade(&this);
|
||||
tokio::spawn(recovery_loop(weak, recovery_rx));
|
||||
// Control-plane VRAM cache refresher (#53): keeps each loaded
|
||||
// model's `last_free_mb` current off the request path, so
|
||||
// `derived_limit` / `GET /models` never query the device worker
|
||||
// (which is saturated during inference) and never stall.
|
||||
tokio::spawn(vram_cache_refresh_loop(Arc::downgrade(&this)));
|
||||
}
|
||||
this
|
||||
}
|
||||
@@ -2959,7 +3028,7 @@ impl Harness for CandleHarness {
|
||||
// physics + live free VRAM + measured prefill rate. `None`
|
||||
// for arches without a context profile. `cost` stays
|
||||
// operator-set in the catalogue, filled by the gateway.
|
||||
let limit = h.derived_limit(&self.context_limit_cfg).await;
|
||||
let limit = h.derived_limit(&self.context_limit_cfg);
|
||||
out.push(ModelInfo {
|
||||
id: h.model_id().into(),
|
||||
harness: "candle".into(),
|
||||
@@ -3209,6 +3278,7 @@ impl Harness for CandleHarness {
|
||||
context_profile,
|
||||
prefill_rate: super::context_limit::PrefillRateEma::new(),
|
||||
derived_input_cap: AtomicUsize::new(0),
|
||||
last_free_mb: AtomicU64::new(0),
|
||||
});
|
||||
if loaded.prefix_cache.is_some() {
|
||||
tracing::info!(
|
||||
@@ -3219,6 +3289,14 @@ impl Harness for CandleHarness {
|
||||
);
|
||||
}
|
||||
|
||||
// Seed the control-plane VRAM cache (#53) while the worker is idle
|
||||
// (load just finished), so `/models` has a value before the
|
||||
// background refresher's first tick and never queries the worker.
|
||||
let (free_mb, _) = loaded.query_vram().await;
|
||||
if free_mb > 0 {
|
||||
loaded.last_free_mb.store(free_mb, Ordering::Release);
|
||||
}
|
||||
|
||||
let mut models = self.models.write().await;
|
||||
models.insert(spec.model_id.clone(), LoadedHandle::Single(loaded));
|
||||
tracing::info!(model = %spec.model_id, "model loaded");
|
||||
@@ -3469,6 +3547,7 @@ impl CandleHarness {
|
||||
),
|
||||
prefill_rate: super::context_limit::PrefillRateEma::new(),
|
||||
derived_input_cap: AtomicUsize::new(0),
|
||||
last_free_mb: AtomicU64::new(0),
|
||||
next_snapshot_id: std::sync::atomic::AtomicU64::new(1),
|
||||
});
|
||||
if tp_loaded.prefix_cache.is_some() {
|
||||
@@ -3480,6 +3559,14 @@ impl CandleHarness {
|
||||
);
|
||||
}
|
||||
|
||||
// Seed the control-plane VRAM cache (#53) — tightest free across
|
||||
// ranks, while the workers are idle post-load — so `/models` never
|
||||
// fans a query out to the inference-busy TP workers.
|
||||
let free_mb = tp_loaded.query_vram_tightest_free_mb().await;
|
||||
if free_mb > 0 {
|
||||
tp_loaded.last_free_mb.store(free_mb, Ordering::Release);
|
||||
}
|
||||
|
||||
let mut models = self.models.write().await;
|
||||
models.insert(spec.model_id.clone(), LoadedHandle::Tp(tp_loaded));
|
||||
tracing::info!(
|
||||
|
||||
Reference in New Issue
Block a user