diff --git a/crates/cortex-core/src/lib.rs b/crates/cortex-core/src/lib.rs index d721fd1..d9fc5a1 100644 --- a/crates/cortex-core/src/lib.rs +++ b/crates/cortex-core/src/lib.rs @@ -7,4 +7,5 @@ pub mod metrics; pub mod node; pub mod openai; pub mod responses; +pub mod source; pub mod translate; diff --git a/crates/cortex-core/src/source.rs b/crates/cortex-core/src/source.rs new file mode 100644 index 0000000..5621766 --- /dev/null +++ b/crates/cortex-core/src/source.rs @@ -0,0 +1,267 @@ +//! Scheme-qualified model identifiers. +//! +//! cortex/neuron historically resolves every model id through hf-hub +//! against `https://huggingface.co`. Helexa is adding an EU-hosted +//! registry (`registry.helexa.ai`) alongside HF — both speak the same +//! HF-compatible wire format, but the bytes, jurisdiction, and trust +//! root differ. Model ids therefore need a scheme: +//! +//! - `huggingface:Qwen/Qwen3.6-27B` — HF-hosted bytes +//! - `helexa:Qwen/Qwen3.6-27B-Uncensored` — helexa registry bytes +//! - `helexa:SomeOperator/CustomFinetune` — operator publishing +//! under the helexa namespace; same scheme handles all `org/name` +//! pairs hosted in that registry. +//! +//! Bare `org/name` parses with an empty scheme; the caller (typically +//! a harness) substitutes its configured default scheme so existing +//! configs keep working through the transition. + +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::str::FromStr; + +/// Parsed `scheme:org/name`. Bare `org/name` produces an empty scheme +/// — call `with_default_scheme` (or check `is_scheme_unset`) to +/// resolve before using. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ModelSourceId { + pub scheme: String, + pub org: String, + pub name: String, +} + +/// Errors from `ModelSourceId::from_str`. Carries the offending input +/// so log lines / API errors can echo what the operator typed. +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum ParseError { + #[error("empty model id")] + Empty, + #[error("model id '{0}' is missing the '/' between org and name")] + MissingSlash(String), + #[error("model id '{0}' has an empty scheme before ':'")] + EmptyScheme(String), + #[error("model id '{0}' has an empty org")] + EmptyOrg(String), + #[error("model id '{0}' has an empty name")] + EmptyName(String), + #[error("model id '{0}' has a scheme containing '/' which is reserved for org/name")] + SchemeContainsSlash(String), + #[error("model id '{0}' has a name containing ':' which is reserved for the scheme prefix")] + NameContainsColon(String), +} + +impl ModelSourceId { + /// Construct directly from already-validated parts. Used by tests + /// and call sites that have the fields separately; the public API + /// for parsing user input is `FromStr`. + pub fn new(scheme: impl Into, org: impl Into, name: impl Into) -> Self { + Self { + scheme: scheme.into(), + org: org.into(), + name: name.into(), + } + } + + /// True when this id parsed from a bare `org/name` (no scheme + /// prefix). The harness substitutes its configured default in + /// `with_default_scheme` before resolving against a registry. + pub fn is_scheme_unset(&self) -> bool { + self.scheme.is_empty() + } + + /// Substitute `default` for an empty scheme. No-op when the scheme + /// is already set. Returns self by value so it composes neatly: + /// `id.parse::()?.with_default_scheme("huggingface")`. + pub fn with_default_scheme(mut self, default: &str) -> Self { + if self.scheme.is_empty() { + self.scheme = default.to_string(); + } + self + } + + /// The `org/name` half — what an hf-hub `Api::model(...)` call + /// expects regardless of which scheme/endpoint we're hitting. + pub fn repo_path(&self) -> String { + format!("{}/{}", self.org, self.name) + } +} + +impl fmt::Display for ModelSourceId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.scheme.is_empty() { + write!(f, "{}/{}", self.org, self.name) + } else { + write!(f, "{}:{}/{}", self.scheme, self.org, self.name) + } + } +} + +impl FromStr for ModelSourceId { + type Err = ParseError; + + fn from_str(s: &str) -> Result { + if s.is_empty() { + return Err(ParseError::Empty); + } + // Scheme split. Only the *first* colon counts — anything after + // belongs to org/name (and would be rejected separately because + // `:` isn't allowed there). + let (scheme, rest) = match s.split_once(':') { + Some((scheme, rest)) => { + if scheme.is_empty() { + return Err(ParseError::EmptyScheme(s.to_string())); + } + if scheme.contains('/') { + return Err(ParseError::SchemeContainsSlash(s.to_string())); + } + (scheme.to_string(), rest) + } + None => (String::new(), s), + }; + let (org, name) = rest + .split_once('/') + .ok_or_else(|| ParseError::MissingSlash(s.to_string()))?; + if org.is_empty() { + return Err(ParseError::EmptyOrg(s.to_string())); + } + if name.is_empty() { + return Err(ParseError::EmptyName(s.to_string())); + } + if name.contains(':') { + return Err(ParseError::NameContainsColon(s.to_string())); + } + Ok(Self { + scheme, + org: org.to_string(), + name: name.to_string(), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_qualified() { + let id: ModelSourceId = "huggingface:Qwen/Qwen3.6-27B".parse().unwrap(); + assert_eq!(id.scheme, "huggingface"); + assert_eq!(id.org, "Qwen"); + assert_eq!(id.name, "Qwen3.6-27B"); + assert_eq!(id.repo_path(), "Qwen/Qwen3.6-27B"); + assert!(!id.is_scheme_unset()); + } + + #[test] + fn parses_helexa_scheme() { + let id: ModelSourceId = "helexa:SomeOperator/Qwen3.6-27B-Uncensored" + .parse() + .unwrap(); + assert_eq!(id.scheme, "helexa"); + assert_eq!(id.org, "SomeOperator"); + assert_eq!(id.name, "Qwen3.6-27B-Uncensored"); + } + + #[test] + fn parses_bare_id_with_empty_scheme() { + let id: ModelSourceId = "Qwen/Qwen3-30B-A3B-Instruct".parse().unwrap(); + assert_eq!(id.scheme, ""); + assert_eq!(id.org, "Qwen"); + assert_eq!(id.name, "Qwen3-30B-A3B-Instruct"); + assert!(id.is_scheme_unset()); + } + + #[test] + fn substitutes_default_scheme_only_when_unset() { + let id: ModelSourceId = "Qwen/Q3".parse().unwrap(); + assert_eq!(id.with_default_scheme("huggingface").scheme, "huggingface"); + + let id: ModelSourceId = "helexa:Qwen/Q3".parse().unwrap(); + assert_eq!( + id.with_default_scheme("huggingface").scheme, + "helexa", + "default substitution must not override an explicit scheme" + ); + } + + #[test] + fn display_roundtrips_qualified_id() { + let s = "helexa:Helexa/Qwen3.6-27B"; + let id: ModelSourceId = s.parse().unwrap(); + assert_eq!(id.to_string(), s); + } + + #[test] + fn display_roundtrips_bare_id() { + let s = "Qwen/Q3"; + let id: ModelSourceId = s.parse().unwrap(); + assert_eq!(id.to_string(), s); + } + + #[test] + fn rejects_empty() { + assert_eq!("".parse::().unwrap_err(), ParseError::Empty); + } + + #[test] + fn rejects_missing_slash() { + match "Qwen".parse::().unwrap_err() { + ParseError::MissingSlash(s) => assert_eq!(s, "Qwen"), + other => panic!("expected MissingSlash, got {other:?}"), + } + match "huggingface:Qwen".parse::().unwrap_err() { + ParseError::MissingSlash(s) => assert_eq!(s, "huggingface:Qwen"), + other => panic!("expected MissingSlash, got {other:?}"), + } + } + + #[test] + fn rejects_empty_scheme() { + match ":Qwen/Q3".parse::().unwrap_err() { + ParseError::EmptyScheme(s) => assert_eq!(s, ":Qwen/Q3"), + other => panic!("expected EmptyScheme, got {other:?}"), + } + } + + #[test] + fn rejects_scheme_with_slash() { + match "hugg/ingface:Q/N".parse::().unwrap_err() { + ParseError::SchemeContainsSlash(s) => assert_eq!(s, "hugg/ingface:Q/N"), + other => panic!("expected SchemeContainsSlash, got {other:?}"), + } + } + + #[test] + fn rejects_empty_org_or_name() { + match "huggingface:/N".parse::().unwrap_err() { + ParseError::EmptyOrg(_) => {} + other => panic!("expected EmptyOrg, got {other:?}"), + } + match "huggingface:Q/".parse::().unwrap_err() { + ParseError::EmptyName(_) => {} + other => panic!("expected EmptyName, got {other:?}"), + } + } + + #[test] + fn rejects_name_with_colon() { + match "huggingface:Q/N:weird" + .parse::() + .unwrap_err() + { + ParseError::NameContainsColon(s) => assert_eq!(s, "huggingface:Q/N:weird"), + other => panic!("expected NameContainsColon, got {other:?}"), + } + } + + #[test] + fn serde_roundtrips_via_struct() { + // We serialize as a struct (scheme/org/name fields) so the + // shape is self-describing in API payloads. Callers that want + // the compact `scheme:org/name` string use `Display`/`FromStr`. + let id = ModelSourceId::new("helexa", "Helexa", "Qwen3.6-27B"); + let json = serde_json::to_string(&id).unwrap(); + let back: ModelSourceId = serde_json::from_str(&json).unwrap(); + assert_eq!(back, id); + } +} diff --git a/crates/neuron/src/config.rs b/crates/neuron/src/config.rs index a9e62f4..4b21a35 100644 --- a/crates/neuron/src/config.rs +++ b/crates/neuron/src/config.rs @@ -6,8 +6,18 @@ use figment::{ providers::{Env, Format, Toml}, }; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::path::{Path, PathBuf}; +/// Default scheme name applied to bare `org/name` model ids when no +/// `[harness.candle.default_source]` is set. Keeps existing operator +/// configs (which know nothing about schemes) working unchanged. +pub const DEFAULT_SOURCE_SCHEME: &str = "huggingface"; + +/// Endpoint URL for the default huggingface source, used when no +/// `[harness.candle.sources.huggingface]` is configured. +pub const DEFAULT_HF_ENDPOINT: &str = "https://huggingface.co"; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NeuronConfig { #[serde(default = "default_port")] @@ -37,8 +47,88 @@ pub struct HarnessSettings { pub struct CandleHarnessConfig { /// HuggingFace cache directory for model weights. /// When unset, defers to hf-hub's default (~/.cache/huggingface). + /// + /// Retained for back-compat — operators with existing + /// `hf_cache = "..."` configs continue to work. Treated as the + /// `huggingface` source's cache_dir when a sources table isn't + /// provided. #[serde(default)] pub hf_cache: Option, + + /// Default source scheme applied to bare `org/name` model ids + /// (those without an explicit `scheme:` prefix). When unset, falls + /// back to `DEFAULT_SOURCE_SCHEME` ("huggingface"). + #[serde(default)] + pub default_source: Option, + + /// Per-scheme source endpoints. Each entry maps a scheme name + /// (`huggingface`, `helexa`, an operator's mirror tag, …) to its + /// endpoint URL, optional auth env var, and optional cache + /// directory. + /// + /// When absent or missing the `huggingface` key, the loader + /// synthesises a `huggingface` entry pointing at + /// `https://huggingface.co` with `hf_cache` (above) as its + /// cache_dir. This keeps single-source configs ergonomic. + #[serde(default)] + pub sources: HashMap, +} + +/// Per-scheme source configuration. Mirrors the shape `hf_hub::ApiBuilder` +/// needs: endpoint URL, optional auth token (read from an env var so +/// secrets stay out of the config file), and optional cache directory +/// disambiguated per source to prevent mirror-vs-canonical collisions. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct SourceConfig { + /// Base URL of the registry. Must speak the HF-compatible wire + /// format (siblings listing at + /// `/api/models/{org}/{name}[/revision/{rev}]`, blob fetch at + /// `/{org}/{name}/resolve/{rev}/{path}`). + pub endpoint: String, + + /// Environment variable name to read for the bearer token used + /// against this source. `None` = anonymous. Reading from env + /// (vs. literal token in the config) keeps secrets out of TOML. + #[serde(default)] + pub auth_env: Option, + + /// Cache directory for this source. The hf-hub + /// `models--{org}--{name}/snapshots/...` tree lives directly + /// under this path, so distinct sources serving the same + /// `org/name` cannot collide on disk. + /// + /// `None` means "share the harness `hf_cache` directory" — only + /// safe when the operator has exactly one source configured. + #[serde(default)] + pub cache_dir: Option, +} + +impl CandleHarnessConfig { + /// Resolve the effective sources map for this config, synthesising + /// a `huggingface` entry from legacy fields (`hf_cache`) when the + /// operator hasn't supplied a sources table. Idempotent. + /// + /// Returns a fresh map rather than mutating self so the original + /// (operator-typed) config can still be serialized back to TOML + /// for diagnostics. + pub fn effective_sources(&self) -> HashMap { + let mut out = self.sources.clone(); + out.entry(DEFAULT_SOURCE_SCHEME.to_string()) + .or_insert_with(|| SourceConfig { + endpoint: DEFAULT_HF_ENDPOINT.to_string(), + auth_env: Some("HF_TOKEN".to_string()), + cache_dir: self.hf_cache.clone(), + }); + out + } + + /// Effective default scheme. Falls back to `DEFAULT_SOURCE_SCHEME` + /// when the operator hasn't pinned one. + pub fn effective_default_source(&self) -> &str { + self.default_source + .as_deref() + .unwrap_or(DEFAULT_SOURCE_SCHEME) + } } fn default_port() -> u16 { @@ -65,3 +155,109 @@ impl Default for NeuronConfig { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn effective_sources_synthesises_huggingface_when_absent() { + let cfg = CandleHarnessConfig::default(); + let sources = cfg.effective_sources(); + assert!(sources.contains_key("huggingface")); + let hf = &sources["huggingface"]; + assert_eq!(hf.endpoint, DEFAULT_HF_ENDPOINT); + assert_eq!(hf.auth_env.as_deref(), Some("HF_TOKEN")); + assert!(hf.cache_dir.is_none()); + } + + #[test] + fn effective_sources_carries_legacy_hf_cache_into_synth_entry() { + // Existing operator configs only set `hf_cache = "/archive3/..."` + // — the synth must pick that up so the loader keeps using the + // operator's storage. + let cfg = CandleHarnessConfig { + hf_cache: Some(PathBuf::from("/archive3/llm-cache")), + ..Default::default() + }; + let sources = cfg.effective_sources(); + assert_eq!( + sources["huggingface"].cache_dir.as_deref(), + Some(Path::new("/archive3/llm-cache")) + ); + } + + #[test] + fn effective_sources_preserves_explicit_huggingface_entry() { + // When an operator types out `[harness.candle.sources.huggingface]` + // explicitly, we must not clobber it with the synth defaults. + let mut sources = HashMap::new(); + sources.insert( + "huggingface".to_string(), + SourceConfig { + endpoint: "https://huggingface.example.org".into(), + auth_env: Some("MY_TOKEN".into()), + cache_dir: Some(PathBuf::from("/operator-cache")), + }, + ); + let cfg = CandleHarnessConfig { + hf_cache: Some(PathBuf::from("/legacy-cache")), + sources, + ..Default::default() + }; + let effective = cfg.effective_sources(); + assert_eq!( + effective["huggingface"].endpoint, + "https://huggingface.example.org" + ); + assert_eq!( + effective["huggingface"].auth_env.as_deref(), + Some("MY_TOKEN") + ); + assert_eq!( + effective["huggingface"].cache_dir.as_deref(), + Some(Path::new("/operator-cache")) + ); + } + + #[test] + fn effective_sources_includes_helexa_alongside_synth_huggingface() { + let mut sources = HashMap::new(); + sources.insert( + "helexa".to_string(), + SourceConfig { + endpoint: "https://registry.helexa.ai".into(), + auth_env: Some("HELEXA_TOKEN".into()), + cache_dir: Some(PathBuf::from("/archive3/llm-cache/helexa")), + }, + ); + let cfg = CandleHarnessConfig { + hf_cache: Some(PathBuf::from("/archive3/llm-cache/huggingface")), + sources, + ..Default::default() + }; + let effective = cfg.effective_sources(); + assert_eq!(effective.len(), 2); + assert_eq!(effective["helexa"].endpoint, "https://registry.helexa.ai"); + // huggingface still gets synth-derived from legacy hf_cache. + assert_eq!( + effective["huggingface"].cache_dir.as_deref(), + Some(Path::new("/archive3/llm-cache/huggingface")) + ); + } + + #[test] + fn effective_default_source_falls_back() { + let cfg = CandleHarnessConfig::default(); + assert_eq!(cfg.effective_default_source(), DEFAULT_SOURCE_SCHEME); + } + + #[test] + fn effective_default_source_honours_explicit() { + let cfg = CandleHarnessConfig { + default_source: Some("helexa".into()), + ..Default::default() + }; + assert_eq!(cfg.effective_default_source(), "helexa"); + } +} diff --git a/crates/neuron/src/harness/candle.rs b/crates/neuron/src/harness/candle.rs index d456ef8..d1ea49e 100644 --- a/crates/neuron/src/harness/candle.rs +++ b/crates/neuron/src/harness/candle.rs @@ -44,7 +44,13 @@ use tracing::Instrument; /// In-process candle harness. Owns the loaded model registry. pub struct CandleHarness { models: Arc>>, - hf_cache: Option, + /// Post-resolution source map: scheme → endpoint/token/cache. Built + /// in `new()` from the operator's `CandleHarnessConfig`; auth tokens + /// are read from their configured env vars at startup so secrets + /// don't leak through the config file. + sources: HashMap, + /// Scheme to substitute for bare `org/name` model ids. + default_source: String, bind_url: String, /// One worker thread per CUDA device index that owns its /// `CudaContext` for the daemon's lifetime. Populated lazily by @@ -968,20 +974,87 @@ async fn chunked_prefill_tp( last_logits.ok_or_else(|| anyhow::anyhow!("chunked_prefill_tp: no chunks produced")) } +/// Per-scheme source after env-var resolution. The auth token is the +/// already-read env-var value (or None for anonymous access), and the +/// cache dir is the post-`resolve_hf_cache` path for the huggingface +/// scheme and the operator's literal value for everything else. +#[derive(Debug, Clone)] +struct ResolvedSource { + endpoint: String, + auth_token: Option, + cache_dir: Option, +} + impl CandleHarness { - pub fn new(bind_url: String, hf_cache: Option) -> Self { - let hf_cache = resolve_hf_cache(hf_cache); - if let Some(p) = &hf_cache { - tracing::info!(path = %p.display(), "candle harness using HuggingFace cache"); + /// Construct a new harness for `bind_url` using `config`. Resolves + /// every configured source's auth env var and cache dir up front so + /// the hot load path (`hf_api_for`) is a pure HashMap lookup. + pub fn new(bind_url: String, config: &crate::config::CandleHarnessConfig) -> Self { + let raw_sources = config.effective_sources(); + let default_source = config.effective_default_source().to_string(); + let mut sources = HashMap::with_capacity(raw_sources.len()); + for (scheme, src) in raw_sources.into_iter() { + // Only the huggingface source gets the legacy + // HF_HUB_CACHE/HF_HOME env-var fallback chain — other + // schemes resolve to whatever the operator typed. + let cache_dir = if scheme == crate::config::DEFAULT_SOURCE_SCHEME { + resolve_hf_cache(src.cache_dir.clone()) + } else { + src.cache_dir.clone() + }; + let auth_token = src + .auth_env + .as_deref() + .and_then(|var| std::env::var(var).ok()) + .filter(|v| !v.is_empty()); + if let Some(p) = &cache_dir { + tracing::info!( + scheme = %scheme, + endpoint = %src.endpoint, + cache = %p.display(), + auth = auth_token.is_some(), + "candle harness source resolved" + ); + } else { + tracing::info!( + scheme = %scheme, + endpoint = %src.endpoint, + auth = auth_token.is_some(), + "candle harness source resolved (no cache dir; using hf-hub default)" + ); + } + sources.insert( + scheme, + ResolvedSource { + endpoint: src.endpoint, + auth_token, + cache_dir, + }, + ); + } + if !sources.contains_key(&default_source) { + tracing::warn!( + default_source, + "configured default_source has no matching [harness.candle.sources.*] entry; \ + bare model ids will fail to resolve until this is fixed" + ); } Self { models: Arc::new(RwLock::new(HashMap::new())), - hf_cache, + sources, + default_source, bind_url, device_workers: Arc::new(RwLock::new(HashMap::new())), } } + /// Scheme to substitute for bare `org/name` model ids. Mirrors the + /// effective default from the operator's config, exposed for the + /// load path's `ModelSourceId::with_default_scheme`. + pub(crate) fn default_source_scheme(&self) -> &str { + &self.default_source + } + /// Pick a candle `Device` for the requested indices. Without the /// `cuda` feature, or if CUDA initialisation fails, falls back to CPU. fn pick_device(devices: &[u32]) -> Result { @@ -1033,14 +1106,33 @@ impl CandleHarness { Ok(handle) } - /// Build an hf-hub API client pre-configured with the harness's - /// `hf_cache` (when one is set). - fn hf_api(&self) -> Result { - let mut builder = hf_hub::api::tokio::ApiBuilder::new(); - if let Some(cache) = &self.hf_cache { + /// Build an hf-hub API client for the given scheme. The scheme + /// must be present in the operator's configured `sources` table + /// (the synth `huggingface` entry counts). Each source carries its + /// own endpoint, optional bearer token, and cache directory, so + /// the same `org/name` served by two registries cannot collide on + /// disk. + pub(crate) fn hf_api_for(&self, scheme: &str) -> Result { + let src = self.sources.get(scheme).ok_or_else(|| { + let mut configured: Vec<&str> = self.sources.keys().map(String::as_str).collect(); + configured.sort(); + anyhow::anyhow!( + "no source configured for scheme '{scheme}'; \ + configured: {configured:?}. Add a \ + [harness.candle.sources.{scheme}] block to neuron.toml \ + with endpoint = '...'." + ) + })?; + let mut builder = hf_hub::api::tokio::ApiBuilder::new().with_endpoint(src.endpoint.clone()); + if let Some(cache) = &src.cache_dir { builder = builder.with_cache_dir(cache.clone()); } - builder.build().context("build hf-hub API") + if let Some(token) = &src.auth_token { + builder = builder.with_token(Some(token.clone())); + } + builder + .build() + .with_context(|| format!("build hf-hub API for scheme '{scheme}'")) } /// Resolve a dense (bf16/fp16 safetensors) model to its local file @@ -1053,18 +1145,21 @@ impl CandleHarness { async fn resolve_dense_files( &self, spec: &ModelSpec, + source_id: &cortex_core::source::ModelSourceId, ) -> Result<(PathBuf, PathBuf, Vec)> { - let api = self.hf_api()?; - let repo = api.model(spec.model_id.clone()); + let api = self.hf_api_for(&source_id.scheme)?; + let repo = api.model(source_id.repo_path()); + let display_id = source_id.to_string(); + let _ = spec; // reserved for future use (quant-aware filtering) let config_path = repo .get("config.json") .await - .with_context(|| format!("fetch config.json from {}", spec.model_id))?; + .with_context(|| format!("fetch config.json from {display_id}"))?; let tokenizer_path = repo .get("tokenizer.json") .await - .with_context(|| format!("fetch tokenizer.json from {}", spec.model_id))?; + .with_context(|| format!("fetch tokenizer.json from {display_id}"))?; // Prefer the sharded layout (most HF dense models > 5B ship it). let safetensors_paths = match repo.get("model.safetensors.index.json").await { @@ -1111,9 +1206,10 @@ impl CandleHarness { async fn load_arch_gguf( &self, spec: &ModelSpec, + source_id: &cortex_core::source::ModelSourceId, device: &Device, ) -> Result<(PathBuf, ModelArch)> { - let (gguf_path, tokenizer_path) = self.resolve_files(spec).await?; + let (gguf_path, tokenizer_path) = self.resolve_files(spec, source_id).await?; let device_for_load = device.clone(); let gguf_path_for_load = gguf_path.clone(); let model_id_for_log = spec.model_id.clone(); @@ -1176,10 +1272,11 @@ impl CandleHarness { async fn load_arch_dense( &self, spec: &ModelSpec, + source_id: &cortex_core::source::ModelSourceId, device: &Device, ) -> Result<(PathBuf, ModelArch)> { let (config_path, tokenizer_path, safetensors_paths) = - self.resolve_dense_files(spec).await?; + self.resolve_dense_files(spec, source_id).await?; let device_for_load = device.clone(); let model_id_for_log = spec.model_id.clone(); @@ -1290,14 +1387,20 @@ impl CandleHarness { /// Resolve a model spec to local GGUF and tokenizer file paths via /// hf-hub. Downloads on first use; subsequent calls are cached. - async fn resolve_files(&self, spec: &ModelSpec) -> Result<(PathBuf, PathBuf)> { - let api = self.hf_api()?; - let repo = api.model(spec.model_id.clone()); + async fn resolve_files( + &self, + spec: &ModelSpec, + source_id: &cortex_core::source::ModelSourceId, + ) -> Result<(PathBuf, PathBuf)> { + let api = self.hf_api_for(&source_id.scheme)?; + let repo_path = source_id.repo_path(); + let repo = api.model(repo_path.clone()); + let display_id = source_id.to_string(); let info = repo .info() .await - .with_context(|| format!("fetch HF repo info for {}", spec.model_id))?; + .with_context(|| format!("fetch HF repo info for {display_id}"))?; let quant = spec.quant.as_deref().unwrap_or(""); let quant_lc = quant.to_lowercase(); @@ -1309,15 +1412,14 @@ impl CandleHarness { .find(|name| quant_lc.is_empty() || name.to_lowercase().contains(&quant_lc)) .ok_or_else(|| { anyhow::anyhow!( - "no GGUF file matching quant {:?} in repo {}", + "no GGUF file matching quant {:?} in repo {display_id}", spec.quant, - spec.model_id ) })? .to_string(); tracing::info!( - model = %spec.model_id, + model = %display_id, file = %gguf_filename, "resolving GGUF (may be cached)" ); @@ -1331,27 +1433,28 @@ impl CandleHarness { // tokenizer.json lives in the base non-GGUF repo. Derive the // base repo id by stripping a `-GGUF` / `-gguf` suffix; if // there's no such suffix the same repo is used (works for - // non-GGUF model_ids). - let tokenizer_repo_id = spec - .model_id + // non-GGUF model_ids). Stripping happens on the repo_path + // (scheme already accounted for) so this composes cleanly with + // helexa-scheme GGUF repos too. + let tokenizer_repo_path = repo_path .strip_suffix("-GGUF") - .or_else(|| spec.model_id.strip_suffix("-gguf")) - .unwrap_or(spec.model_id.as_str()) + .or_else(|| repo_path.strip_suffix("-gguf")) + .unwrap_or(&repo_path) .to_string(); - let tokenizer_repo = if tokenizer_repo_id == spec.model_id { + let tokenizer_repo = if tokenizer_repo_path == repo_path { repo } else { tracing::debug!( - from = %spec.model_id, - to = %tokenizer_repo_id, + from = %repo_path, + to = %tokenizer_repo_path, "tokenizer.json sourced from base repo (GGUF suffix stripped)" ); - api.model(tokenizer_repo_id.clone()) + api.model(tokenizer_repo_path.clone()) }; let tokenizer_path = tokenizer_repo .get("tokenizer.json") .await - .with_context(|| format!("fetch tokenizer.json from {tokenizer_repo_id}"))?; + .with_context(|| format!("fetch tokenizer.json from {tokenizer_repo_path}"))?; Ok((gguf_path, tokenizer_path)) } @@ -2002,6 +2105,16 @@ impl Harness for CandleHarness { } } + // Parse the model id, substituting the harness's default + // source for bare `org/name` entries so existing operator + // configs keep working unchanged. Stored on the request-local + // path so downstream resolve_* can ask the right registry. + let source_id = spec + .model_id + .parse::() + .with_context(|| format!("parse model id '{}' as scheme:org/name", spec.model_id))? + .with_default_scheme(self.default_source_scheme()); + // Preflight: classify the source repo and apply the // tp/quant/source feasibility table before any device // allocation, NCCL handshake, or weight fetch. Failures bubble @@ -2011,8 +2124,8 @@ impl Harness for CandleHarness { // dispatch — downstream `resolve_files` / `resolve_dense_files` // re-run their own substring match — but the structured error // surface is the main payoff. - let api = self.hf_api()?; - super::preflight::preflight(&api, spec) + let api = self.hf_api_for(&source_id.scheme)?; + super::preflight::preflight(&api, &source_id, spec) .await .map_err(anyhow::Error::new)?; @@ -2020,7 +2133,7 @@ impl Harness for CandleHarness { if tp_size > 1 { #[cfg(feature = "cuda")] { - return self.load_tp(spec, tp_size).await; + return self.load_tp(spec, &source_id, tp_size).await; } #[cfg(not(feature = "cuda"))] { @@ -2048,7 +2161,7 @@ impl Harness for CandleHarness { let (tokenizer_path, arch_local, arch_handle) = if let Some(w) = &worker { // CUDA path: resolve, then load in the worker. if spec.quant.is_some() { - let (gguf_path, tokenizer_path) = self.resolve_files(spec).await?; + let (gguf_path, tokenizer_path) = self.resolve_files(spec, &source_id).await?; let handle = w .load_gguf(gguf_path, spec.model_id.clone()) .await @@ -2056,7 +2169,7 @@ impl Harness for CandleHarness { (tokenizer_path, None, Some(handle)) } else { let (config_path, tokenizer_path, safetensors_paths) = - self.resolve_dense_files(spec).await?; + self.resolve_dense_files(spec, &source_id).await?; let handle = w .load_dense(config_path, safetensors_paths, spec.model_id.clone()) .await @@ -2066,9 +2179,9 @@ impl Harness for CandleHarness { } else { // CPU path: legacy spawn_blocking + Arc>. let (tokenizer_path, arch) = if spec.quant.is_some() { - self.load_arch_gguf(spec, &device).await? + self.load_arch_gguf(spec, &source_id, &device).await? } else { - self.load_arch_dense(spec, &device).await? + self.load_arch_dense(spec, &source_id, &device).await? }; (tokenizer_path, Some(Arc::new(Mutex::new(arch))), None) }; @@ -2226,7 +2339,12 @@ impl CandleHarness { /// `spec.devices` carries the per-rank CUDA device indices (one /// entry per rank, in rank order); defaults to `0..tp_size`. #[cfg(feature = "cuda")] - async fn load_tp(&self, spec: &ModelSpec, tp_size: u32) -> Result<()> { + async fn load_tp( + &self, + spec: &ModelSpec, + source_id: &cortex_core::source::ModelSourceId, + tp_size: u32, + ) -> Result<()> { use std::sync::Arc as StdArc; use tokio::sync::Mutex as TMutex; @@ -2251,7 +2369,7 @@ impl CandleHarness { // 1. Resolve config + tokenizer + safetensors via hf-hub. let (config_path, tokenizer_path, safetensors_paths) = - self.resolve_dense_files(spec).await?; + self.resolve_dense_files(spec, source_id).await?; let config_json = std::fs::read_to_string(&config_path).context("read config.json")?; // Reject unsupported architectures *before* spawning the worker // pool and fanning out NCCL — otherwise we'd burn the pool @@ -4023,4 +4141,75 @@ mod tests { "DriverError(CUDA_ERROR_ILLEGAL_ADDRESS, \"an illegal memory access was encountered\")" )); } + + /// Phase 1 of plan-source-aware-loader: harness must resolve each + /// configured scheme to its own endpoint+cache, and reject schemes + /// the operator hasn't configured with a useful error. + #[test] + fn hf_api_for_routes_per_scheme() { + use crate::config::{CandleHarnessConfig, SourceConfig}; + use std::collections::HashMap; + + let mut sources = HashMap::new(); + sources.insert( + "huggingface".to_string(), + SourceConfig { + endpoint: "https://huggingface.example.org".into(), + auth_env: None, + cache_dir: Some(std::path::PathBuf::from("/tmp/hf-cache")), + }, + ); + sources.insert( + "helexa".to_string(), + SourceConfig { + endpoint: "https://registry.helexa.example.ai".into(), + auth_env: None, + cache_dir: Some(std::path::PathBuf::from("/tmp/helexa-cache")), + }, + ); + let cfg = CandleHarnessConfig { + sources, + default_source: Some("huggingface".into()), + ..Default::default() + }; + let harness = CandleHarness::new("http://localhost:13131".into(), &cfg); + + // Both configured schemes build cleanly. + harness + .hf_api_for("huggingface") + .expect("huggingface scheme should build"); + harness + .hf_api_for("helexa") + .expect("helexa scheme should build"); + + // Unknown scheme errors with a message that names the configured + // set so the operator can act on it. + let err = harness + .hf_api_for("does-not-exist") + .expect_err("unknown scheme should error"); + let msg = format!("{err:#}"); + assert!( + msg.contains("does-not-exist") && msg.contains("huggingface") && msg.contains("helexa"), + "error must list configured schemes: {msg}" + ); + + assert_eq!(harness.default_source_scheme(), "huggingface"); + } + + /// Operator with only `hf_cache` set (no `sources` table) still + /// gets a working `huggingface` source pointed at HF. + #[test] + fn hf_api_for_synthesises_huggingface_from_legacy_hf_cache() { + use crate::config::CandleHarnessConfig; + + let cfg = CandleHarnessConfig { + hf_cache: Some(std::path::PathBuf::from("/archive3/llm-cache")), + ..Default::default() + }; + let harness = CandleHarness::new("http://localhost:13131".into(), &cfg); + harness + .hf_api_for("huggingface") + .expect("synth huggingface source should build"); + assert_eq!(harness.default_source_scheme(), "huggingface"); + } } diff --git a/crates/neuron/src/harness/mod.rs b/crates/neuron/src/harness/mod.rs index e9570ab..abedc5e 100644 --- a/crates/neuron/src/harness/mod.rs +++ b/crates/neuron/src/harness/mod.rs @@ -115,7 +115,7 @@ impl HarnessRegistry { "candle" => { let harness = Arc::new(candle::CandleHarness::new( bind_url.to_string(), - settings.candle.hf_cache.clone(), + &settings.candle, )); registry.candle = Some(Arc::clone(&harness)); registry.harnesses.insert("candle".into(), harness); diff --git a/crates/neuron/src/harness/preflight.rs b/crates/neuron/src/harness/preflight.rs index d80fee0..0e36143 100644 --- a/crates/neuron/src/harness/preflight.rs +++ b/crates/neuron/src/harness/preflight.rs @@ -22,6 +22,7 @@ //! cleanly when Phase 1 lands. use cortex_core::harness::ModelSpec; +use cortex_core::source::ModelSourceId; use hf_hub::api::tokio::Api; use serde::Serialize; @@ -115,13 +116,22 @@ pub enum PreflightError { /// One network round-trip (`repo.info()`); no blob fetches. Returns /// `Ok(PlacementPlan)` when the requested combination is feasible, or /// a structured `PreflightError` describing what's wrong. -pub async fn preflight(api: &Api, spec: &ModelSpec) -> Result { - let repo = api.model(spec.model_id.clone()); +/// +/// `api` must already be configured for the scheme `source_id` belongs +/// to — caller (typically `CandleHarness::load_model`) builds it via +/// `hf_api_for(&source_id.scheme)`. Only the `org/name` portion of the +/// id is sent to the registry. +pub async fn preflight( + api: &Api, + source_id: &ModelSourceId, + spec: &ModelSpec, +) -> Result { + let repo = api.model(source_id.repo_path()); let info = repo .info() .await .map_err(|e| PreflightError::RepoFetchFailed { - model_id: spec.model_id.clone(), + model_id: source_id.to_string(), cause: format!("{e}"), })?; @@ -132,13 +142,13 @@ pub async fn preflight(api: &Api, spec: &ModelSpec) -> Result Err(PreflightError::EmptyRepo { - model_id: spec.model_id.clone(), + model_id: source_id.to_string(), }), // GGUF-only + TP: not supported. Today's HauhauCS failure. (SourceFormat::Gguf { quants }, tp, _) if tp > 1 => { Err(PreflightError::TpRequiresSafetensors { - model_id: spec.model_id.clone(), + model_id: source_id.to_string(), tp_size: tp, gguf_quants: quants.clone(), suggestion: format!( @@ -154,13 +164,13 @@ pub async fn preflight(api: &Api, spec: &ModelSpec) -> Result Ok(PlacementPlan { - model_id: spec.model_id.clone(), + model_id: source_id.to_string(), format: format.clone(), tp_size, picked_quant_file: Some(fname), }), None => Err(PreflightError::QuantNotFound { - model_id: spec.model_id.clone(), + model_id: source_id.to_string(), requested: requested.unwrap_or("").to_string(), available: quants.clone(), nearest: nearest_quant(requested.unwrap_or(""), quants), @@ -174,7 +184,7 @@ pub async fn preflight(api: &Api, spec: &ModelSpec) -> Result { Ok(PlacementPlan { - model_id: spec.model_id.clone(), + model_id: source_id.to_string(), format: format.clone(), tp_size, picked_quant_file: None, @@ -431,14 +441,20 @@ mod tests { format: &SourceFormat, filenames: &[&str], ) -> Result { + // Tests parse spec.model_id with the default scheme so the + // assertions can keep comparing against bare "org/name". + let source_id: ModelSourceId = spec + .model_id + .parse::() + .expect("test spec.model_id must parse"); let tp_size = spec.tensor_parallel.unwrap_or(1); match (format, tp_size, spec.quant.as_deref()) { (SourceFormat::Empty, _, _) => Err(PreflightError::EmptyRepo { - model_id: spec.model_id.clone(), + model_id: source_id.to_string(), }), (SourceFormat::Gguf { quants }, tp, _) if tp > 1 => { Err(PreflightError::TpRequiresSafetensors { - model_id: spec.model_id.clone(), + model_id: source_id.to_string(), tp_size: tp, gguf_quants: quants.clone(), suggestion: format!( @@ -451,13 +467,13 @@ mod tests { let picked = pick_gguf_file(filenames, requested.unwrap_or("")); match picked { Some(fname) => Ok(PlacementPlan { - model_id: spec.model_id.clone(), + model_id: source_id.to_string(), format: format.clone(), tp_size, picked_quant_file: Some(fname), }), None => Err(PreflightError::QuantNotFound { - model_id: spec.model_id.clone(), + model_id: source_id.to_string(), requested: requested.unwrap_or("").to_string(), available: quants.clone(), nearest: nearest_quant(requested.unwrap_or(""), quants), @@ -466,7 +482,7 @@ mod tests { } (SourceFormat::DenseSafetensors { .. } | SourceFormat::Mixed { .. }, _, _) => { Ok(PlacementPlan { - model_id: spec.model_id.clone(), + model_id: source_id.to_string(), format: format.clone(), tp_size, picked_quant_file: None, diff --git a/crates/neuron/tests/preflight.rs b/crates/neuron/tests/preflight.rs index 2ecdf21..a7ad106 100644 --- a/crates/neuron/tests/preflight.rs +++ b/crates/neuron/tests/preflight.rs @@ -12,6 +12,7 @@ use axum::http::StatusCode; use axum::response::{IntoResponse, Json}; use axum::routing::get; use cortex_core::harness::ModelSpec; +use cortex_core::source::ModelSourceId; use neuron::harness::preflight::{PreflightError, SourceFormat, preflight}; use serde_json::{Value, json}; use std::sync::Arc; @@ -89,6 +90,15 @@ fn spec(model_id: &str, tp: Option, quant: Option<&str>) -> ModelSpec { } } +/// Build a `ModelSourceId` from a bare `org/name` test input, +/// substituting the default scheme so the mock route key matches. +fn sid(model_id: &str) -> ModelSourceId { + model_id + .parse::() + .expect("test model_id parses") + .with_default_scheme("huggingface") +} + #[tokio::test] async fn preflight_gguf_tp_rejected_over_http() { let cache = tempfile::tempdir().expect("tempdir"); @@ -107,7 +117,7 @@ async fn preflight_gguf_tp_rejected_over_http() { let api = build_api(&endpoint, cache.path()); let s = spec("HauhauCS/Qwen3.6", Some(2), Some("q6k")); - let err = preflight(&api, &s).await.unwrap_err(); + let err = preflight(&api, &sid(&s.model_id), &s).await.unwrap_err(); match err { PreflightError::TpRequiresSafetensors { model_id, @@ -115,7 +125,9 @@ async fn preflight_gguf_tp_rejected_over_http() { gguf_quants, .. } => { - assert_eq!(model_id, "HauhauCS/Qwen3.6"); + // Scheme prefix surfaces in error display now that + // preflight is source-aware. + assert_eq!(model_id, "huggingface:HauhauCS/Qwen3.6"); assert_eq!(tp_size, 2); assert_eq!(gguf_quants.len(), 3); } @@ -140,7 +152,7 @@ async fn preflight_gguf_quant_suggestion_over_http() { let api = build_api(&endpoint, cache.path()); let s = spec("HauhauCS/Qwen3.6", Some(1), Some("q6k")); - let err = preflight(&api, &s).await.unwrap_err(); + let err = preflight(&api, &sid(&s.model_id), &s).await.unwrap_err(); match err { PreflightError::QuantNotFound { requested, @@ -176,7 +188,9 @@ async fn preflight_dense_safetensors_tp_ok() { let api = build_api(&endpoint, cache.path()); let s = spec("Qwen/Q3-30B", Some(2), Some("q5k")); - let plan = preflight(&api, &s).await.expect("dense+tp should succeed"); + let plan = preflight(&api, &sid(&s.model_id), &s) + .await + .expect("dense+tp should succeed"); assert_eq!(plan.tp_size, 2); assert!(plan.picked_quant_file.is_none()); assert!(matches!( @@ -197,7 +211,7 @@ async fn preflight_gguf_single_gpu_good_quant() { let api = build_api(&endpoint, cache.path()); let s = spec("HauhauCS/Qwen3.6", Some(1), Some("q6_k_p")); - let plan = preflight(&api, &s) + let plan = preflight(&api, &sid(&s.model_id), &s) .await .expect("good quant should succeed"); assert_eq!(plan.tp_size, 1); @@ -219,7 +233,7 @@ async fn preflight_repo_fetch_failed_on_404() { let api = build_api(&endpoint, cache.path()); let s = spec("DoesNot/Exist", Some(1), None); - let err = preflight(&api, &s).await.unwrap_err(); + let err = preflight(&api, &sid(&s.model_id), &s).await.unwrap_err(); assert!( matches!(err, PreflightError::RepoFetchFailed { .. }), "expected RepoFetchFailed, got {err:?}" @@ -238,7 +252,7 @@ async fn preflight_empty_repo_rejected() { let api = build_api(&endpoint, cache.path()); let s = spec("Empty/Repo", Some(1), None); - let err = preflight(&api, &s).await.unwrap_err(); + let err = preflight(&api, &sid(&s.model_id), &s).await.unwrap_err(); assert!( matches!(err, PreflightError::EmptyRepo { .. }), "expected EmptyRepo, got {err:?}" @@ -264,6 +278,8 @@ async fn preflight_mixed_repo_prefers_safetensors() { // TP=2 + quant should succeed via the dense path even though a // GGUF is present — the dense path handles ISQ. let s = spec("Mixed/Repo", Some(2), Some("q5k")); - let plan = preflight(&api, &s).await.expect("mixed should succeed"); + let plan = preflight(&api, &sid(&s.model_id), &s) + .await + .expect("mixed should succeed"); assert!(matches!(plan.format, SourceFormat::Mixed { .. })); } diff --git a/neuron.example.toml b/neuron.example.toml index a842f5b..152a5d3 100644 --- a/neuron.example.toml +++ b/neuron.example.toml @@ -22,7 +22,9 @@ name = "candle" # HuggingFace cache directory for model weights. # # Resolution order (first hit wins): -# 1. `hf_cache` here in this file. +# 1. `hf_cache` here in this file (applies to the synth `huggingface` +# source only — see [harness.candle.sources.*] below for explicit +# per-source paths). # 2. `HF_HUB_CACHE` env var — same convention as the Python # `huggingface_hub` library, so an existing cache directory shared # with other tooling can be reused without per-tool config. @@ -36,6 +38,32 @@ name = "candle" # Environment=HF_HUB_CACHE=/archive/hf-cache # hf_cache = "/var/lib/neuron/hf-cache" +# Default scheme applied to bare `org/name` model ids (those without a +# `scheme:` prefix). Defaults to "huggingface" when unset. Set to +# "helexa" to make `default_models = [{ model_id = "Helexa/Foo" }]` +# resolve via the helexa registry without prefixing every entry. +# default_source = "huggingface" + +# Per-scheme source endpoints. Each scheme maps to an HF-compatible +# registry. The `huggingface` source is auto-synthesised pointing at +# `https://huggingface.co` when omitted; declare it explicitly here to +# override the endpoint, auth env, or cache dir. +# +# [harness.candle.sources.huggingface] +# endpoint = "https://huggingface.co" +# auth_env = "HF_TOKEN" # optional bearer token via env var +# cache_dir = "/archive3/llm-cache/huggingface" +# +# Add helexa (or any operator-run mirror speaking the HF-compatible +# wire format) by adding another sources entry. Caches are +# disambiguated per scheme so a mirror serving the same `org/name` as +# HF cannot collide on disk. +# +# [harness.candle.sources.helexa] +# endpoint = "https://registry.helexa.ai" +# auth_env = "HELEXA_TOKEN" +# cache_dir = "/archive3/llm-cache/helexa" + # -- Default models ---------------------------------------------------------- # Models listed here are loaded automatically when the neuron service # activates. Loading is sequential — a slow or failing entry doesn't