From 61adff347a61522452645abf8f536bb2e58fc6ef Mon Sep 17 00:00:00 2001 From: rob thijssen Date: Mon, 1 Jun 2026 13:24:30 +0300 Subject: [PATCH] feat(neuron): preflight placement check with structured errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2 of plan-source-aware-loader-preflight. Adds a one-RTT placement feasibility check that runs before any device allocation, NCCL handshake, or weight fetch. Replaces today's opaque "fetch config.json … 404" failure mode (when an operator points `tensor_parallel = 2` at a GGUF-only repo) with a structured error that names the failure class and points at the fix. What lands: - `crates/neuron/src/harness/preflight.rs` — new module. Classifies a repo's siblings listing into `SourceFormat` (Gguf | DenseSafetensors | Mixed | Empty), applies the tp/quant feasibility table, returns a `PlacementPlan` on success or a typed `PreflightError` on rejection. `PreflightError` is `serde::Serialize` so the HTTP layer can emit the structured shape verbatim; it's `thiserror::Error` so log lines get a single-line Display when downcasting from anyhow. Includes best-effort Levenshtein-nearest suggestion for malformed quant names (the second sharp edge the HauhauCS scenario surfaced — operator writes `q6k` against filenames containing `Q6_K_P`, and today's matcher just says "no GGUF file matching quant"). - `CandleHarness::load_model` — calls `preflight(...)` first thing after the "already loaded" guard, before any `ensure_device_worker` or `resolve_*`. Failure wraps the typed error in `anyhow::Error` so the existing trait surface is unchanged; the HTTP handler and the startup logger downcast to recover the structured form. - `crates/neuron/src/api.rs::load_model` handler — maps `PreflightError` to 422 Unprocessable Entity with `{"error": {"kind": "...", "model_id": "...", "suggestion": "..." }}`. Other failures keep the existing 400 + free-form `format!("{e:#}")` shape. - `crates/neuron/src/startup.rs::load_default_models` — when the failure is a preflight rejection, log as `reason= detail=` instead of the opaque `error=`, so journalctl on beast will now show `reason=tp_requires_safetensors detail="repo is GGUF-only (8 .gguf files); TP requires dense safetensors..."` instead of `error=fetch config.json from HauhauCS/...: 404 Not Found`. Tests: - 18 unit tests in `harness/preflight.rs` covering classifier, quant matching, Levenshtein, error serialization, and the full feasibility table (gguf+tp rejected, gguf+bad-quant suggests nearest, gguf+good-quant ok, dense+tp ok, empty rejected, mixed prefers safetensors). - 7 integration tests in `tests/preflight.rs` exercising the network path through an axum mock that serves hf-hub-compatible `/api/models/{org}/{name}/revision/main` payloads. Adds `tempfile` as a dev-dependency for per-test cache dirs. Out of scope (deferred to subsequent phases): - Phase 1 (source-aware loader plumbing — `scheme:org/name` parsing, per-scheme `SourceConfig`, cache disambiguation). Preflight runs against the single configured HuggingFace source today; the scheme threading lands cleanly when Phase 1 ships. - Phase 3 (cortex catalogue source field). - GGUF tensor-parallel loading. Preflight rejects this combination with `TpRequiresSafetensors`; the underlying loader gap is the separate `Helexa` curated-registry / heretic-rs conversation. Refs #4-#9 architectural follow-up; no specific issue closed. Co-Authored-By: Claude Opus 4.7 --- Cargo.lock | 1 + crates/neuron/Cargo.toml | 1 + crates/neuron/src/api.rs | 31 ++ crates/neuron/src/harness/candle.rs | 14 + crates/neuron/src/harness/mod.rs | 1 + crates/neuron/src/harness/preflight.rs | 575 +++++++++++++++++++++++++ crates/neuron/src/startup.rs | 40 +- crates/neuron/tests/preflight.rs | 269 ++++++++++++ 8 files changed, 926 insertions(+), 6 deletions(-) create mode 100644 crates/neuron/src/harness/preflight.rs create mode 100644 crates/neuron/tests/preflight.rs diff --git a/Cargo.lock b/Cargo.lock index f8bc7b4..d411ace 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2538,6 +2538,7 @@ dependencies = [ "safetensors 0.7.0", "serde", "serde_json", + "tempfile", "thiserror 2.0.18", "tokenizers", "tokio", diff --git a/crates/neuron/Cargo.toml b/crates/neuron/Cargo.toml index 80711aa..12e57f6 100644 --- a/crates/neuron/Cargo.toml +++ b/crates/neuron/Cargo.toml @@ -94,6 +94,7 @@ safetensors = "0.7" [dev-dependencies] tokio = { workspace = true, features = ["test-util"] } reqwest.workspace = true +tempfile = "3" [build-dependencies] # Used by `build.rs` to compile `src/cuda/*.cu` into `libneuroncuda.a` diff --git a/crates/neuron/src/api.rs b/crates/neuron/src/api.rs index 0a81789..3106ea8 100644 --- a/crates/neuron/src/api.rs +++ b/crates/neuron/src/api.rs @@ -3,6 +3,7 @@ use crate::activation::ActivationTracker; use crate::harness::HarnessRegistry; use crate::harness::candle::{CandleHarness, InferenceError}; +use crate::harness::preflight::PreflightError; use crate::health::HealthCache; use crate::wire::{openai_chat, openai_responses}; use axum::Router; @@ -84,6 +85,24 @@ async fn load_model( match registry.load_model(&spec).await { Ok(()) => Json(json!({"status": "loaded"})).into_response(), Err(e) => { + // If the underlying failure is a structured preflight + // rejection, surface it as 422 Unprocessable Entity with + // the typed JSON body. The kind/model_id/suggestion/etc. + // fields let cortex (and operators reading the response + // directly) act on the failure without parsing free text. + if let Some(pf) = e.downcast_ref::() { + tracing::warn!( + model = %spec.model_id, + reason = preflight_kind(pf), + detail = %pf, + "load_model rejected by preflight" + ); + return ( + StatusCode::UNPROCESSABLE_ENTITY, + Json(json!({ "error": pf })), + ) + .into_response(); + } // Log the full anyhow chain server-side so journalctl shows // the underlying failure (hf-hub timeout, permission denied, // disk full, etc.) without needing to inspect the HTTP @@ -102,6 +121,18 @@ async fn load_model( } } +/// Short kebab-case tag for a preflight failure, used as a structured +/// log field for journalctl-side filtering. Mirrors the same helper in +/// `startup.rs`; duplicated to keep the module surfaces independent. +fn preflight_kind(err: &PreflightError) -> &'static str { + match err { + PreflightError::RepoFetchFailed { .. } => "repo_fetch_failed", + PreflightError::EmptyRepo { .. } => "empty_repo", + PreflightError::TpRequiresSafetensors { .. } => "tp_requires_safetensors", + PreflightError::QuantNotFound { .. } => "quant_not_found", + } +} + async fn unload_model( State(state): State>, Json(body): Json, diff --git a/crates/neuron/src/harness/candle.rs b/crates/neuron/src/harness/candle.rs index f6eaf55..d456ef8 100644 --- a/crates/neuron/src/harness/candle.rs +++ b/crates/neuron/src/harness/candle.rs @@ -2002,6 +2002,20 @@ impl Harness for CandleHarness { } } + // Preflight: classify the source repo and apply the + // tp/quant/source feasibility table before any device + // allocation, NCCL handshake, or weight fetch. Failures bubble + // up as `super::preflight::PreflightError` wrapped in anyhow; + // the api.rs handler downcasts to produce a 422 with structured + // JSON. The plan it returns is not yet threaded through the + // dispatch — downstream `resolve_files` / `resolve_dense_files` + // re-run their own substring match — but the structured error + // surface is the main payoff. + let api = self.hf_api()?; + super::preflight::preflight(&api, spec) + .await + .map_err(anyhow::Error::new)?; + let tp_size = spec.tensor_parallel.unwrap_or(1); if tp_size > 1 { #[cfg(feature = "cuda")] diff --git a/crates/neuron/src/harness/mod.rs b/crates/neuron/src/harness/mod.rs index 9e56380..e9570ab 100644 --- a/crates/neuron/src/harness/mod.rs +++ b/crates/neuron/src/harness/mod.rs @@ -4,6 +4,7 @@ pub mod arch; pub mod candle; pub mod chat_template; pub mod device_worker; +pub mod preflight; pub mod tp; use anyhow::Result; diff --git a/crates/neuron/src/harness/preflight.rs b/crates/neuron/src/harness/preflight.rs new file mode 100644 index 0000000..d80fee0 --- /dev/null +++ b/crates/neuron/src/harness/preflight.rs @@ -0,0 +1,575 @@ +//! Placement feasibility check that runs before any device allocation, +//! NCCL handshake, or weight download. +//! +//! The loader path in `candle.rs` historically discovers an +//! incompatibility *after* it has already started fetching files — +//! "fetch config.json from HauhauCS/...: 404 Not Found" surfaces hours +//! after operators set `tensor_parallel = 2` on a GGUF-only repo, with +//! no hint about what's actually wrong. Preflight closes that gap: +//! +//! 1. one `repo.info()` round-trip (siblings listing, no blob fetch) +//! 2. classify the repo: GGUF-only, dense safetensors, mixed, empty +//! 3. apply the feasibility table against the requested +//! `ModelSpec` (tp_size, quant) +//! 4. return a structured `PreflightError` the API layer can map to +//! 422 + JSON, or `Ok(PlacementPlan)` carrying the decisions the +//! downstream load path needs (which GGUF file to fetch, etc.). +//! +//! Phase 2 of plan-source-aware-loader-preflight. The Phase 1 scheme +//! work — `ModelSourceId` and per-scheme `SourceConfig` — is a +//! separate PR; preflight runs against the single configured +//! HuggingFace source for now and the scheme threading drops in +//! cleanly when Phase 1 lands. + +use cortex_core::harness::ModelSpec; +use hf_hub::api::tokio::Api; +use serde::Serialize; + +/// What the repo's siblings listing tells us about how to load it. +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum SourceFormat { + /// Only GGUF files present. Single-GPU load path. `quants` is the + /// lowercased filename list so the operator can be told what's + /// actually available when their `quant=` choice doesn't match. + Gguf { quants: Vec }, + /// Dense safetensors (single-file or sharded via index.json). + /// Goes through `load_arch_dense` on single-GPU, or `load_tp` (with + /// optional in-situ quantization) when `tensor_parallel > 1`. + DenseSafetensors { sharded: bool }, + /// Both safetensors and GGUF present — prefer the dense path + /// because it composes with TP and ISQ. We surface the GGUF + /// filenames anyway so operators with a strong preference can + /// see they exist. + Mixed { gguf_quants: Vec }, + /// No recognised weight files. Either a tokenizer-only repo + /// (e.g. some base-model repos that only host `tokenizer.json` and + /// expect the operator to use a `-GGUF` sibling repo) or a + /// genuinely empty entry. + Empty, +} + +/// Output of `preflight` for a load that can proceed. Carries the +/// decisions downstream resolve_* paths would otherwise re-derive. +#[derive(Debug, Clone, Serialize)] +pub struct PlacementPlan { + pub model_id: String, + pub format: SourceFormat, + pub tp_size: u32, + /// Filename of the GGUF to fetch, populated when `format` is + /// `Gguf` and a single-GPU load was requested. None for the + /// dense/TP path. + pub picked_quant_file: Option, +} + +/// Structured failure modes. Each variant carries the fields the API +/// layer needs to produce an actionable 422 body. +#[derive(Debug, Clone, Serialize, thiserror::Error)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum PreflightError { + /// `repo.info()` failed. Captures the underlying cause as a string + /// so the operator log shows whether it's auth, 404, or transport. + #[error("failed to fetch repo info for '{model_id}': {cause}")] + RepoFetchFailed { model_id: String, cause: String }, + + /// The repo exists but has no recognised weight files. + #[error( + "repo '{model_id}' has no recognised weight files (no .gguf, no .safetensors); \ + a tokenizer-only repo cannot be loaded directly" + )] + EmptyRepo { model_id: String }, + + /// Operator asked for `tensor_parallel > 1` on a GGUF-only repo. + /// The TP path requires safetensors+config for in-situ + /// quantization; GGUF-TP isn't implemented (see CLAUDE.md). + #[error( + "cannot load '{model_id}' with tensor_parallel={tp_size}: repo is GGUF-only \ + ({} .gguf files); TP requires dense safetensors. {suggestion}", + gguf_quants.len() + )] + TpRequiresSafetensors { + model_id: String, + tp_size: u32, + gguf_quants: Vec, + suggestion: String, + }, + + /// Operator asked for a GGUF quant whose substring doesn't match + /// any filename in the repo. `nearest` is a best-effort Levenshtein + /// suggestion against the available quant names. + #[error( + "no GGUF file in '{model_id}' matches quant '{requested}'; \ + available: {available:?}{}", + nearest.as_ref().map(|n| format!("; did you mean '{n}'?")).unwrap_or_default() + )] + QuantNotFound { + model_id: String, + requested: String, + available: Vec, + nearest: Option, + }, +} + +/// Run the placement check. +/// +/// One network round-trip (`repo.info()`); no blob fetches. Returns +/// `Ok(PlacementPlan)` when the requested combination is feasible, or +/// a structured `PreflightError` describing what's wrong. +pub async fn preflight(api: &Api, spec: &ModelSpec) -> Result { + let repo = api.model(spec.model_id.clone()); + let info = repo + .info() + .await + .map_err(|e| PreflightError::RepoFetchFailed { + model_id: spec.model_id.clone(), + cause: format!("{e}"), + })?; + + let filenames: Vec<&str> = info.siblings.iter().map(|s| s.rfilename.as_str()).collect(); + let format = classify(&filenames); + let tp_size = spec.tensor_parallel.unwrap_or(1); + + match (&format, tp_size, spec.quant.as_deref()) { + // No weights at all — nothing to do. + (SourceFormat::Empty, _, _) => Err(PreflightError::EmptyRepo { + model_id: spec.model_id.clone(), + }), + + // GGUF-only + TP: not supported. Today's HauhauCS failure. + (SourceFormat::Gguf { quants }, tp, _) if tp > 1 => { + Err(PreflightError::TpRequiresSafetensors { + model_id: spec.model_id.clone(), + tp_size: tp, + gguf_quants: quants.clone(), + suggestion: format!( + "Set tensor_parallel=1 and pick a quant from {quants:?}, \ + or use a dense safetensors release of this model." + ), + }) + } + + // GGUF-only + single-GPU: pick the file that matches the + // operator's quant. Empty quant matches the first GGUF. + (SourceFormat::Gguf { quants }, _, requested) => { + let picked = pick_gguf_file(&filenames, requested.unwrap_or("")); + match picked { + Some(fname) => Ok(PlacementPlan { + model_id: spec.model_id.clone(), + format: format.clone(), + tp_size, + picked_quant_file: Some(fname), + }), + None => Err(PreflightError::QuantNotFound { + model_id: spec.model_id.clone(), + requested: requested.unwrap_or("").to_string(), + available: quants.clone(), + nearest: nearest_quant(requested.unwrap_or(""), quants), + }), + } + } + + // Dense or mixed: dense path handles both single-GPU and TP. + // The architecture compatibility check stays where it is — + // `check_dense_config_supported` runs once `config.json` is + // on disk, since it needs the parsed JSON. + (SourceFormat::DenseSafetensors { .. } | SourceFormat::Mixed { .. }, _, _) => { + Ok(PlacementPlan { + model_id: spec.model_id.clone(), + format: format.clone(), + tp_size, + picked_quant_file: None, + }) + } + } +} + +/// Classify a siblings file list into a `SourceFormat`. Pulled out so +/// the unit tests can exercise it against fixture JSON without +/// spinning up an Api. +pub fn classify(filenames: &[&str]) -> SourceFormat { + let mut gguf_quants: Vec = filenames + .iter() + .filter(|f| f.to_lowercase().ends_with(".gguf")) + .map(|f| f.to_lowercase()) + .collect(); + gguf_quants.sort(); + gguf_quants.dedup(); + + let has_safetensors = filenames.iter().any(|f| f.ends_with(".safetensors")); + let sharded = filenames + .iter() + .any(|f| f.ends_with("model.safetensors.index.json")); + + match (has_safetensors, gguf_quants.is_empty()) { + (true, true) => SourceFormat::DenseSafetensors { sharded }, + (true, false) => SourceFormat::Mixed { gguf_quants }, + (false, false) => SourceFormat::Gguf { + quants: gguf_quants, + }, + (false, true) => SourceFormat::Empty, + } +} + +/// Mirror of the quant-matching logic in `candle.rs::resolve_files` so +/// preflight picks the same file the downstream loader would. Empty +/// quant returns the first `.gguf` (any quant). Lowercased substring +/// match otherwise. +fn pick_gguf_file(filenames: &[&str], quant_lc: &str) -> Option { + filenames + .iter() + .filter(|f| f.to_lowercase().ends_with(".gguf")) + .find(|f| quant_lc.is_empty() || f.to_lowercase().contains(quant_lc)) + .map(|f| f.to_string()) +} + +/// Best-effort suggestion when the operator's quant name doesn't +/// substring-match any filename. Extracts the quant-ish token from +/// each `.gguf` filename and picks the one with the smallest +/// Levenshtein distance to the requested string. Returns None when +/// the input is empty or no candidates exist. +fn nearest_quant(requested: &str, candidates: &[String]) -> Option { + if requested.is_empty() || candidates.is_empty() { + return None; + } + // Pull the "Q6_K_P"/"IQ4_XS"-ish token out of each filename for a + // fairer comparison. Filenames look like + // `Qwen3.6-27B-Uncensored-HauhauCS-Aggressive-Q6_K_P.gguf`, so the + // quant is the last `-`-separated segment before the extension, + // lowercased. + let tokens: Vec<(String, String)> = candidates + .iter() + .map(|f| (extract_quant_token(f), f.clone())) + .collect(); + + let req_lc = requested.to_lowercase(); + tokens + .into_iter() + .min_by_key(|(token, _)| levenshtein(&req_lc, token)) + .map(|(token, _)| token) +} + +fn extract_quant_token(filename: &str) -> String { + let stem = filename + .rsplit_once('.') + .map(|(s, _)| s) + .unwrap_or(filename); + let token = stem.rsplit('-').next().unwrap_or(stem); + token.to_lowercase() +} + +/// Iterative Levenshtein. Small inputs (quant names are <=12 chars), +/// no need for the `levenshtein` crate. +fn levenshtein(a: &str, b: &str) -> usize { + let a: Vec = a.chars().collect(); + let b: Vec = b.chars().collect(); + let (m, n) = (a.len(), b.len()); + if m == 0 { + return n; + } + if n == 0 { + return m; + } + let mut prev: Vec = (0..=n).collect(); + let mut curr = vec![0usize; n + 1]; + for i in 1..=m { + curr[0] = i; + for j in 1..=n { + let cost = if a[i - 1] == b[j - 1] { 0 } else { 1 }; + curr[j] = (prev[j] + 1).min(curr[j - 1] + 1).min(prev[j - 1] + cost); + } + std::mem::swap(&mut prev, &mut curr); + } + prev[n] +} + +#[cfg(test)] +mod tests { + use super::*; + + fn spec(model_id: &str, tp: Option, quant: Option<&str>) -> ModelSpec { + ModelSpec { + model_id: model_id.into(), + harness: "candle".into(), + quant: quant.map(String::from), + tensor_parallel: tp, + devices: None, + } + } + + #[test] + fn classify_gguf_only() { + let files = [ + "README.md", + ".gitattributes", + "Qwen3.6-27B-Q6_K_P.gguf", + "Qwen3.6-27B-Q4_K_P.gguf", + ]; + match classify(&files) { + SourceFormat::Gguf { quants } => { + assert_eq!(quants.len(), 2); + assert!(quants.iter().any(|q| q.contains("q6_k_p"))); + } + other => panic!("expected Gguf, got {other:?}"), + } + } + + #[test] + fn classify_dense_sharded() { + let files = [ + "config.json", + "tokenizer.json", + "model.safetensors.index.json", + "model-00001-of-00002.safetensors", + "model-00002-of-00002.safetensors", + ]; + assert_eq!( + classify(&files), + SourceFormat::DenseSafetensors { sharded: true } + ); + } + + #[test] + fn classify_dense_single_file() { + let files = ["config.json", "tokenizer.json", "model.safetensors"]; + assert_eq!( + classify(&files), + SourceFormat::DenseSafetensors { sharded: false } + ); + } + + #[test] + fn classify_mixed() { + let files = [ + "config.json", + "tokenizer.json", + "model.safetensors", + "model-Q4_K_M.gguf", + ]; + match classify(&files) { + SourceFormat::Mixed { gguf_quants } => { + assert_eq!(gguf_quants, vec!["model-q4_k_m.gguf"]); + } + other => panic!("expected Mixed, got {other:?}"), + } + } + + #[test] + fn classify_empty() { + let files = ["README.md", "tokenizer.json"]; + assert_eq!(classify(&files), SourceFormat::Empty); + } + + #[test] + fn pick_gguf_substring_match() { + let files = ["model-Q4_K_M.gguf", "model-Q6_K.gguf", "model-Q8_0.gguf"]; + assert_eq!( + pick_gguf_file(&files, "q6_k"), + Some("model-Q6_K.gguf".into()) + ); + } + + #[test] + fn pick_gguf_empty_returns_first() { + let files = ["model-Q4_K_M.gguf", "model-Q6_K.gguf"]; + assert_eq!(pick_gguf_file(&files, ""), Some("model-Q4_K_M.gguf".into())); + } + + #[test] + fn pick_gguf_no_match() { + let files = ["model-Q4_K_M.gguf", "model-Q6_K.gguf"]; + assert_eq!(pick_gguf_file(&files, "iq2_xs"), None); + } + + #[test] + fn nearest_quant_suggests_close_match() { + // Today's HauhauCS scenario: operator wrote "q6k", actual + // filename token is "q6_k_p". Should suggest the latter. + let candidates = vec![ + "qwen-q4_k_p.gguf".to_string(), + "qwen-q5_k_p.gguf".to_string(), + "qwen-q6_k_p.gguf".to_string(), + "qwen-q8_k_p.gguf".to_string(), + ]; + assert_eq!(nearest_quant("q6k", &candidates), Some("q6_k_p".into())); + } + + #[test] + fn nearest_quant_empty_input() { + assert_eq!(nearest_quant("", &[]), None); + assert_eq!(nearest_quant("q6k", &[]), None); + assert_eq!(nearest_quant("", &["model-q4.gguf".into()]), None); + } + + #[test] + fn extract_quant_handles_typical_filenames() { + assert_eq!(extract_quant_token("Qwen3.6-27B-Q6_K_P.gguf"), "q6_k_p"); + assert_eq!(extract_quant_token("model-IQ4_XS.gguf"), "iq4_xs"); + assert_eq!(extract_quant_token("simple.gguf"), "simple"); + } + + #[test] + fn levenshtein_basics() { + assert_eq!(levenshtein("", ""), 0); + assert_eq!(levenshtein("abc", ""), 3); + assert_eq!(levenshtein("", "abc"), 3); + assert_eq!(levenshtein("kitten", "sitting"), 3); + assert_eq!(levenshtein("q6k", "q6_k_p"), 3); + assert_eq!(levenshtein("q6k", "q4_k_p"), 4); + } + + // Higher-level preflight tests below exercise the full feasibility + // table via a thin wrapper that bypasses the network — we hand it + // a pre-built `SourceFormat` and request shape, then drive the + // same decision logic. The end-to-end test with a mock HTTP + // server lives in tests/preflight.rs (integration). + + /// Mirror of the `match` in `preflight()` but takes a classified + /// `SourceFormat` directly. Lets us unit-test the feasibility + /// table without making the API trait object-safe / boxable. + fn decide( + spec: &ModelSpec, + format: &SourceFormat, + filenames: &[&str], + ) -> Result { + let tp_size = spec.tensor_parallel.unwrap_or(1); + match (format, tp_size, spec.quant.as_deref()) { + (SourceFormat::Empty, _, _) => Err(PreflightError::EmptyRepo { + model_id: spec.model_id.clone(), + }), + (SourceFormat::Gguf { quants }, tp, _) if tp > 1 => { + Err(PreflightError::TpRequiresSafetensors { + model_id: spec.model_id.clone(), + tp_size: tp, + gguf_quants: quants.clone(), + suggestion: format!( + "Set tensor_parallel=1 and pick a quant from {quants:?}, \ + or use a dense safetensors release of this model." + ), + }) + } + (SourceFormat::Gguf { quants }, _, requested) => { + let picked = pick_gguf_file(filenames, requested.unwrap_or("")); + match picked { + Some(fname) => Ok(PlacementPlan { + model_id: spec.model_id.clone(), + format: format.clone(), + tp_size, + picked_quant_file: Some(fname), + }), + None => Err(PreflightError::QuantNotFound { + model_id: spec.model_id.clone(), + requested: requested.unwrap_or("").to_string(), + available: quants.clone(), + nearest: nearest_quant(requested.unwrap_or(""), quants), + }), + } + } + (SourceFormat::DenseSafetensors { .. } | SourceFormat::Mixed { .. }, _, _) => { + Ok(PlacementPlan { + model_id: spec.model_id.clone(), + format: format.clone(), + tp_size, + picked_quant_file: None, + }) + } + } + } + + #[test] + fn feasibility_gguf_tp_rejected() { + let files = ["Qwen-Q6_K_P.gguf", "Qwen-Q4_K_P.gguf"]; + let fmt = classify(&files); + let s = spec("HauhauCS/Qwen3.6", Some(2), Some("q6k")); + match decide(&s, &fmt, &files).unwrap_err() { + PreflightError::TpRequiresSafetensors { + model_id, + tp_size, + gguf_quants, + .. + } => { + assert_eq!(model_id, "HauhauCS/Qwen3.6"); + assert_eq!(tp_size, 2); + assert_eq!(gguf_quants.len(), 2); + } + other => panic!("expected TpRequiresSafetensors, got {other:?}"), + } + } + + #[test] + fn feasibility_gguf_single_gpu_bad_quant() { + let files = [ + "Qwen-Q4_K_P.gguf", + "Qwen-Q5_K_P.gguf", + "Qwen-Q6_K_P.gguf", + "Qwen-Q8_K_P.gguf", + ]; + let fmt = classify(&files); + let s = spec("HauhauCS/Qwen3.6", Some(1), Some("q6k")); + match decide(&s, &fmt, &files).unwrap_err() { + PreflightError::QuantNotFound { + requested, + nearest, + available, + .. + } => { + assert_eq!(requested, "q6k"); + assert_eq!(nearest.as_deref(), Some("q6_k_p")); + assert_eq!(available.len(), 4); + } + other => panic!("expected QuantNotFound, got {other:?}"), + } + } + + #[test] + fn feasibility_gguf_single_gpu_good_quant() { + let files = ["Qwen-Q4_K_M.gguf", "Qwen-Q6_K.gguf"]; + let fmt = classify(&files); + let s = spec("Qwen/Q-GGUF", Some(1), Some("q6_k")); + let plan = decide(&s, &fmt, &files).unwrap(); + assert_eq!(plan.picked_quant_file.as_deref(), Some("Qwen-Q6_K.gguf")); + } + + #[test] + fn feasibility_dense_tp_ok() { + let files = [ + "config.json", + "tokenizer.json", + "model.safetensors.index.json", + "model-00001-of-00002.safetensors", + ]; + let fmt = classify(&files); + let s = spec("Qwen/Q3-30B", Some(2), Some("q5k")); + let plan = decide(&s, &fmt, &files).unwrap(); + assert_eq!(plan.tp_size, 2); + assert!(plan.picked_quant_file.is_none()); + assert!(matches!( + plan.format, + SourceFormat::DenseSafetensors { sharded: true } + )); + } + + #[test] + fn feasibility_empty_rejected() { + let files = ["README.md", "tokenizer.json"]; + let fmt = classify(&files); + let s = spec("Empty/Repo", Some(1), None); + match decide(&s, &fmt, &files).unwrap_err() { + PreflightError::EmptyRepo { model_id } => assert_eq!(model_id, "Empty/Repo"), + other => panic!("expected EmptyRepo, got {other:?}"), + } + } + + #[test] + fn error_serialization_carries_kind_field() { + let err = PreflightError::TpRequiresSafetensors { + model_id: "x/y".into(), + tp_size: 2, + gguf_quants: vec!["q6_k_p".into()], + suggestion: "...".into(), + }; + let v: serde_json::Value = serde_json::to_value(&err).unwrap(); + assert_eq!(v["kind"], "tp_requires_safetensors"); + assert_eq!(v["model_id"], "x/y"); + assert_eq!(v["tp_size"], 2); + } +} diff --git a/crates/neuron/src/startup.rs b/crates/neuron/src/startup.rs index 35b68f9..52a3ecb 100644 --- a/crates/neuron/src/startup.rs +++ b/crates/neuron/src/startup.rs @@ -7,6 +7,7 @@ use crate::activation::ActivationTracker; use crate::harness::HarnessRegistry; +use crate::harness::preflight::PreflightError; use cortex_core::harness::ModelSpec; use std::time::{Duration, Instant}; use tokio::signal; @@ -53,18 +54,45 @@ pub async fn load_default_models( Err(e) => { let rendered = format!("{e:#}"); activation.fail_loading(&spec.model_id, &rendered).await; - tracing::warn!( - model = %spec.model_id, - error = %rendered, - elapsed_ms = start.elapsed().as_millis() as u64, - "failed to load default model, continuing" - ); + // When the underlying failure is a preflight rejection, + // pull the structured fields out so journalctl shows + // `reason=tp_requires_safetensors detail="..."` instead + // of an opaque "fetch config.json … 404". The operator + // can act on the structured form directly. + if let Some(pf) = e.downcast_ref::() { + tracing::warn!( + model = %spec.model_id, + reason = preflight_kind(pf), + detail = %pf, + elapsed_ms = start.elapsed().as_millis() as u64, + "failed to load default model, continuing" + ); + } else { + tracing::warn!( + model = %spec.model_id, + error = %rendered, + elapsed_ms = start.elapsed().as_millis() as u64, + "failed to load default model, continuing" + ); + } } } } activation.mark_ready().await; } +/// Short kebab-case tag for a preflight failure. Used as a structured +/// log field so journalctl filtering can match on the failure class +/// (`reason=tp_requires_safetensors`, `reason=quant_not_found`, etc.). +fn preflight_kind(err: &PreflightError) -> &'static str { + match err { + PreflightError::RepoFetchFailed { .. } => "repo_fetch_failed", + PreflightError::EmptyRepo { .. } => "empty_repo", + PreflightError::TpRequiresSafetensors { .. } => "tp_requires_safetensors", + PreflightError::QuantNotFound { .. } => "quant_not_found", + } +} + /// Future that resolves on SIGINT (Ctrl-C) or SIGTERM (systemd stop). /// /// Wired into `axum::serve(...).with_graceful_shutdown(shutdown_signal())` diff --git a/crates/neuron/tests/preflight.rs b/crates/neuron/tests/preflight.rs new file mode 100644 index 0000000..2ecdf21 --- /dev/null +++ b/crates/neuron/tests/preflight.rs @@ -0,0 +1,269 @@ +//! End-to-end preflight tests against a mock HF-compatible server. +//! +//! Unit tests in `harness/preflight.rs` exercise the classifier and +//! feasibility table against synthetic file lists. These tests close +//! the loop: spawn an axum server that returns a `RepoInfo`-shaped +//! JSON payload at `/api/models/{org}/{name}`, point `hf_hub::Api` at +//! it, and assert `preflight()` returns the expected outcome. + +use axum::Router; +use axum::extract::Path; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Json}; +use axum::routing::get; +use cortex_core::harness::ModelSpec; +use neuron::harness::preflight::{PreflightError, SourceFormat, preflight}; +use serde_json::{Value, json}; +use std::sync::Arc; +use std::sync::Mutex; + +/// Per-test mock state: a map from `{org}/{name}` to the JSON body the +/// mock server returns at the corresponding `/api/models/{org}/{name}` +/// endpoint. `None` means "respond 404". +type MockBodies = Arc>>>; + +async fn spawn_mock(bodies: MockBodies) -> String { + // hf-hub 0.4 calls /api/models/{org}/{name}/revision/main for + // `repo.info()`. We route both shapes so the test stays robust + // to a future hf-hub upgrade that drops the `/revision/main` + // suffix. + let app = Router::new() + .route("/api/models/{org}/{name}", get(model_info)) + .route( + "/api/models/{org}/{name}/revision/{rev}", + get(model_info_rev), + ) + .with_state(bodies); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + format!("http://{addr}") +} + +async fn model_info( + Path((org, name)): Path<(String, String)>, + axum::extract::State(bodies): axum::extract::State, +) -> impl IntoResponse { + respond(&format!("{org}/{name}"), &bodies) +} + +async fn model_info_rev( + Path((org, name, _rev)): Path<(String, String, String)>, + axum::extract::State(bodies): axum::extract::State, +) -> impl IntoResponse { + respond(&format!("{org}/{name}"), &bodies) +} + +fn respond(key: &str, bodies: &MockBodies) -> axum::response::Response { + let entry = bodies.lock().unwrap().get(key).cloned(); + match entry { + Some(Some(body)) => Json(body).into_response(), + Some(None) | None => (StatusCode::NOT_FOUND, "not found").into_response(), + } +} + +fn build_api(endpoint: &str, cache_dir: &std::path::Path) -> hf_hub::api::tokio::Api { + hf_hub::api::tokio::ApiBuilder::new() + .with_endpoint(endpoint.to_string()) + .with_cache_dir(cache_dir.to_path_buf()) + .build() + .expect("build hf-hub Api") +} + +fn siblings(filenames: &[&str]) -> Value { + json!({ + "sha": "0000000000000000000000000000000000000000", + "siblings": filenames.iter().map(|f| json!({ "rfilename": f })).collect::>(), + }) +} + +fn spec(model_id: &str, tp: Option, quant: Option<&str>) -> ModelSpec { + ModelSpec { + model_id: model_id.into(), + harness: "candle".into(), + quant: quant.map(String::from), + tensor_parallel: tp, + devices: None, + } +} + +#[tokio::test] +async fn preflight_gguf_tp_rejected_over_http() { + let cache = tempfile::tempdir().expect("tempdir"); + let bodies: MockBodies = Arc::new(Mutex::new(Default::default())); + bodies.lock().unwrap().insert( + "HauhauCS/Qwen3.6".to_string(), + Some(siblings(&[ + "README.md", + ".gitattributes", + "Qwen3.6-Q4_K_P.gguf", + "Qwen3.6-Q6_K_P.gguf", + "Qwen3.6-Q8_K_P.gguf", + ])), + ); + let endpoint = spawn_mock(bodies).await; + + let api = build_api(&endpoint, cache.path()); + let s = spec("HauhauCS/Qwen3.6", Some(2), Some("q6k")); + let err = preflight(&api, &s).await.unwrap_err(); + match err { + PreflightError::TpRequiresSafetensors { + model_id, + tp_size, + gguf_quants, + .. + } => { + assert_eq!(model_id, "HauhauCS/Qwen3.6"); + assert_eq!(tp_size, 2); + assert_eq!(gguf_quants.len(), 3); + } + other => panic!("expected TpRequiresSafetensors, got {other:?}"), + } +} + +#[tokio::test] +async fn preflight_gguf_quant_suggestion_over_http() { + let cache = tempfile::tempdir().expect("tempdir"); + let bodies: MockBodies = Arc::new(Mutex::new(Default::default())); + bodies.lock().unwrap().insert( + "HauhauCS/Qwen3.6".to_string(), + Some(siblings(&[ + "Qwen3.6-Q4_K_P.gguf", + "Qwen3.6-Q5_K_P.gguf", + "Qwen3.6-Q6_K_P.gguf", + "Qwen3.6-Q8_K_P.gguf", + ])), + ); + let endpoint = spawn_mock(bodies).await; + + let api = build_api(&endpoint, cache.path()); + let s = spec("HauhauCS/Qwen3.6", Some(1), Some("q6k")); + let err = preflight(&api, &s).await.unwrap_err(); + match err { + PreflightError::QuantNotFound { + requested, + nearest, + available, + .. + } => { + assert_eq!(requested, "q6k"); + assert_eq!(nearest.as_deref(), Some("q6_k_p")); + assert_eq!(available.len(), 4); + } + other => panic!("expected QuantNotFound, got {other:?}"), + } +} + +#[tokio::test] +async fn preflight_dense_safetensors_tp_ok() { + let cache = tempfile::tempdir().expect("tempdir"); + let bodies: MockBodies = Arc::new(Mutex::new(Default::default())); + bodies.lock().unwrap().insert( + "Qwen/Q3-30B".to_string(), + Some(siblings(&[ + "config.json", + "tokenizer.json", + "tokenizer_config.json", + "model.safetensors.index.json", + "model-00001-of-00006.safetensors", + "model-00002-of-00006.safetensors", + "model-00003-of-00006.safetensors", + ])), + ); + let endpoint = spawn_mock(bodies).await; + + let api = build_api(&endpoint, cache.path()); + let s = spec("Qwen/Q3-30B", Some(2), Some("q5k")); + let plan = preflight(&api, &s).await.expect("dense+tp should succeed"); + assert_eq!(plan.tp_size, 2); + assert!(plan.picked_quant_file.is_none()); + assert!(matches!( + plan.format, + SourceFormat::DenseSafetensors { sharded: true } + )); +} + +#[tokio::test] +async fn preflight_gguf_single_gpu_good_quant() { + let cache = tempfile::tempdir().expect("tempdir"); + let bodies: MockBodies = Arc::new(Mutex::new(Default::default())); + bodies.lock().unwrap().insert( + "HauhauCS/Qwen3.6".to_string(), + Some(siblings(&["Qwen3.6-Q4_K_P.gguf", "Qwen3.6-Q6_K_P.gguf"])), + ); + let endpoint = spawn_mock(bodies).await; + + let api = build_api(&endpoint, cache.path()); + let s = spec("HauhauCS/Qwen3.6", Some(1), Some("q6_k_p")); + let plan = preflight(&api, &s) + .await + .expect("good quant should succeed"); + assert_eq!(plan.tp_size, 1); + assert_eq!( + plan.picked_quant_file.as_deref(), + Some("Qwen3.6-Q6_K_P.gguf") + ); +} + +#[tokio::test] +async fn preflight_repo_fetch_failed_on_404() { + // Mock server has no entry for this id → 404, exercising the + // RepoFetchFailed path (the same shape today's HauhauCS scenario + // would have produced if we'd added preflight before the cache + // download was attempted). + let cache = tempfile::tempdir().expect("tempdir"); + let bodies: MockBodies = Arc::new(Mutex::new(Default::default())); + let endpoint = spawn_mock(bodies).await; + + let api = build_api(&endpoint, cache.path()); + let s = spec("DoesNot/Exist", Some(1), None); + let err = preflight(&api, &s).await.unwrap_err(); + assert!( + matches!(err, PreflightError::RepoFetchFailed { .. }), + "expected RepoFetchFailed, got {err:?}" + ); +} + +#[tokio::test] +async fn preflight_empty_repo_rejected() { + let cache = tempfile::tempdir().expect("tempdir"); + let bodies: MockBodies = Arc::new(Mutex::new(Default::default())); + bodies.lock().unwrap().insert( + "Empty/Repo".to_string(), + Some(siblings(&["README.md", "tokenizer.json"])), + ); + let endpoint = spawn_mock(bodies).await; + + let api = build_api(&endpoint, cache.path()); + let s = spec("Empty/Repo", Some(1), None); + let err = preflight(&api, &s).await.unwrap_err(); + assert!( + matches!(err, PreflightError::EmptyRepo { .. }), + "expected EmptyRepo, got {err:?}" + ); +} + +#[tokio::test] +async fn preflight_mixed_repo_prefers_safetensors() { + let cache = tempfile::tempdir().expect("tempdir"); + let bodies: MockBodies = Arc::new(Mutex::new(Default::default())); + bodies.lock().unwrap().insert( + "Mixed/Repo".to_string(), + Some(siblings(&[ + "config.json", + "tokenizer.json", + "model.safetensors", + "model-Q4_K_M.gguf", + ])), + ); + let endpoint = spawn_mock(bodies).await; + + let api = build_api(&endpoint, cache.path()); + // TP=2 + quant should succeed via the dense path even though a + // GGUF is present — the dense path handles ISQ. + let s = spec("Mixed/Repo", Some(2), Some("q5k")); + let plan = preflight(&api, &s).await.expect("mixed should succeed"); + assert!(matches!(plan.format, SourceFormat::Mixed { .. })); +}