feat(neuron,cortex-core): source-aware loader (scheme:org/name)
All checks were successful
CI / CUDA type-check (push) Successful in 46s
CI / Format (push) Successful in 32s
build-prerelease / Resolve version stamps (push) Successful in 42s
CI / Clippy (push) Successful in 2m40s
build-prerelease / Build cortex binary (push) Successful in 4m23s
CI / Test (push) Successful in 5m28s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Build neuron-blackwell (push) Successful in 5m39s
build-prerelease / Package cortex RPM (push) Successful in 1m19s
build-prerelease / Build neuron-ampere (push) Successful in 7m53s
build-prerelease / Build neuron-ada (push) Successful in 5m18s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 2m59s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 3m6s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 3m44s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 1m2s
All checks were successful
CI / CUDA type-check (push) Successful in 46s
CI / Format (push) Successful in 32s
build-prerelease / Resolve version stamps (push) Successful in 42s
CI / Clippy (push) Successful in 2m40s
build-prerelease / Build cortex binary (push) Successful in 4m23s
CI / Test (push) Successful in 5m28s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Build neuron-blackwell (push) Successful in 5m39s
build-prerelease / Package cortex RPM (push) Successful in 1m19s
build-prerelease / Build neuron-ampere (push) Successful in 7m53s
build-prerelease / Build neuron-ada (push) Successful in 5m18s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 2m59s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 3m6s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 3m44s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 1m2s
Phase 1 of plan-source-aware-loader-preflight. Makes neuron's
loader treat `huggingface:org/name` and `helexa:org/name` as
first-class distinct sources with per-source endpoint + cache,
while staying backwards-compatible with bare `org/name` ids.
Zero behavior change for existing operator configs.
Motivation: helexa is adding an EU-hosted registry
(`registry.helexa.ai`) alongside HF. Both speak HF-compatible
wire format, but the bytes, jurisdiction, trust root, and cache
namespace are distinct. The loader needs to disambiguate which
registry serves a given model id, and to keep their caches from
colliding on disk when both happen to host the same `org/name`.
What lands:
- `cortex-core::source` — new module. `ModelSourceId { scheme,
org, name }` with `FromStr` accepting both `scheme:org/name`
and bare `org/name`. `Display` round-trips. `repo_path()`
emits the `org/name` half for the hf-hub `Api::model(...)`
call regardless of which scheme/endpoint we're hitting.
Rejects malformed input with typed `ParseError` variants
(empty scheme, missing slash, scheme with `/`, name with
`:`, etc.).
- `neuron::config::CandleHarnessConfig` gains
`default_source: Option<String>` and
`sources: HashMap<String, SourceConfig>`. `SourceConfig`
mirrors what `hf_hub::ApiBuilder` consumes: endpoint URL,
optional `auth_env` (env var name read at startup so secrets
stay out of TOML), and optional cache_dir. Defaults
synthesise a `huggingface` entry pointing at
`https://huggingface.co` with the legacy `hf_cache` field as
its cache_dir — so existing configs that only set `hf_cache`
keep working unchanged.
- `CandleHarness::new(bind_url, &CandleHarnessConfig)` replaces
`CandleHarness::new(bind_url, hf_cache)`. Resolves every
configured source's auth env var and cache dir up front so
`hf_api_for(scheme)` is a pure HashMap lookup on the hot
load path. Only the `huggingface` scheme gets the legacy
`HF_HUB_CACHE`/`HF_HOME` env-var fallback chain; other
schemes resolve to whatever the operator typed.
- `hf_api()` -> `hf_api_for(scheme)`. Builds an
`hf_hub::Api` with the source's endpoint, cache_dir, and
auth token. Errors with a useful message naming the
configured schemes when an unknown scheme is requested.
- `CandleHarness::load_model` parses `spec.model_id` into a
`ModelSourceId`, substitutes `default_source` for bare ids,
and threads the parsed source through `preflight`,
`resolve_files`, `resolve_dense_files`, `load_arch_gguf`,
`load_arch_dense`, and `load_tp`. The hf-hub `Api::model()`
call now uses `source_id.repo_path()` so registry calls hit
the right URL shape regardless of scheme.
- `preflight()` signature gains a `&ModelSourceId` parameter
(it's the canonical id for log lines and error display);
`RepoFetchFailed.model_id` etc. now carry the
scheme-qualified form so operator-visible errors echo
exactly what was configured.
- `neuron.example.toml` documents the new
`[harness.candle.sources.*]` table with commented-out
examples for `huggingface` (explicit override) and `helexa`.
Tests:
- 13 new unit tests in `cortex-core::source` covering parse /
display round-trip, default-scheme substitution semantics,
and every `ParseError` variant.
- 6 new unit tests in `neuron::config` covering the
`effective_sources` synth (legacy `hf_cache` carry-through,
explicit override preservation, helexa-alongside-huggingface)
and `effective_default_source` fallback.
- 2 new unit tests in `harness::candle::tests` covering
multi-scheme `hf_api_for` routing, including the
"unknown scheme" error path naming configured schemes.
- Preflight integration tests updated to construct
`ModelSourceId` and assert against the scheme-qualified
error form.
CI gate: cargo fmt --check, cargo clippy --workspace
--all-targets -- -D warnings, cargo test --workspace (all 24
test groups ok, zero failures).
Out of scope (Phase 3):
- Cortex catalogue `source` field — independent of Phase 1+2,
ships when the registry comes online.
- `helexa` source endpoint itself — separate project; this
PR adds the client-side rails only.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -7,4 +7,5 @@ pub mod metrics;
|
||||
pub mod node;
|
||||
pub mod openai;
|
||||
pub mod responses;
|
||||
pub mod source;
|
||||
pub mod translate;
|
||||
|
||||
267
crates/cortex-core/src/source.rs
Normal file
267
crates/cortex-core/src/source.rs
Normal file
@@ -0,0 +1,267 @@
|
||||
//! Scheme-qualified model identifiers.
|
||||
//!
|
||||
//! cortex/neuron historically resolves every model id through hf-hub
|
||||
//! against `https://huggingface.co`. Helexa is adding an EU-hosted
|
||||
//! registry (`registry.helexa.ai`) alongside HF — both speak the same
|
||||
//! HF-compatible wire format, but the bytes, jurisdiction, and trust
|
||||
//! root differ. Model ids therefore need a scheme:
|
||||
//!
|
||||
//! - `huggingface:Qwen/Qwen3.6-27B` — HF-hosted bytes
|
||||
//! - `helexa:Qwen/Qwen3.6-27B-Uncensored` — helexa registry bytes
|
||||
//! - `helexa:SomeOperator/CustomFinetune` — operator publishing
|
||||
//! under the helexa namespace; same scheme handles all `org/name`
|
||||
//! pairs hosted in that registry.
|
||||
//!
|
||||
//! Bare `org/name` parses with an empty scheme; the caller (typically
|
||||
//! a harness) substitutes its configured default scheme so existing
|
||||
//! configs keep working through the transition.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::str::FromStr;
|
||||
|
||||
/// Parsed `scheme:org/name`. Bare `org/name` produces an empty scheme
|
||||
/// — call `with_default_scheme` (or check `is_scheme_unset`) to
|
||||
/// resolve before using.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct ModelSourceId {
|
||||
pub scheme: String,
|
||||
pub org: String,
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
/// Errors from `ModelSourceId::from_str`. Carries the offending input
|
||||
/// so log lines / API errors can echo what the operator typed.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
|
||||
pub enum ParseError {
|
||||
#[error("empty model id")]
|
||||
Empty,
|
||||
#[error("model id '{0}' is missing the '/' between org and name")]
|
||||
MissingSlash(String),
|
||||
#[error("model id '{0}' has an empty scheme before ':'")]
|
||||
EmptyScheme(String),
|
||||
#[error("model id '{0}' has an empty org")]
|
||||
EmptyOrg(String),
|
||||
#[error("model id '{0}' has an empty name")]
|
||||
EmptyName(String),
|
||||
#[error("model id '{0}' has a scheme containing '/' which is reserved for org/name")]
|
||||
SchemeContainsSlash(String),
|
||||
#[error("model id '{0}' has a name containing ':' which is reserved for the scheme prefix")]
|
||||
NameContainsColon(String),
|
||||
}
|
||||
|
||||
impl ModelSourceId {
|
||||
/// Construct directly from already-validated parts. Used by tests
|
||||
/// and call sites that have the fields separately; the public API
|
||||
/// for parsing user input is `FromStr`.
|
||||
pub fn new(scheme: impl Into<String>, org: impl Into<String>, name: impl Into<String>) -> Self {
|
||||
Self {
|
||||
scheme: scheme.into(),
|
||||
org: org.into(),
|
||||
name: name.into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// True when this id parsed from a bare `org/name` (no scheme
|
||||
/// prefix). The harness substitutes its configured default in
|
||||
/// `with_default_scheme` before resolving against a registry.
|
||||
pub fn is_scheme_unset(&self) -> bool {
|
||||
self.scheme.is_empty()
|
||||
}
|
||||
|
||||
/// Substitute `default` for an empty scheme. No-op when the scheme
|
||||
/// is already set. Returns self by value so it composes neatly:
|
||||
/// `id.parse::<ModelSourceId>()?.with_default_scheme("huggingface")`.
|
||||
pub fn with_default_scheme(mut self, default: &str) -> Self {
|
||||
if self.scheme.is_empty() {
|
||||
self.scheme = default.to_string();
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// The `org/name` half — what an hf-hub `Api::model(...)` call
|
||||
/// expects regardless of which scheme/endpoint we're hitting.
|
||||
pub fn repo_path(&self) -> String {
|
||||
format!("{}/{}", self.org, self.name)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ModelSourceId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
if self.scheme.is_empty() {
|
||||
write!(f, "{}/{}", self.org, self.name)
|
||||
} else {
|
||||
write!(f, "{}:{}/{}", self.scheme, self.org, self.name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for ModelSourceId {
|
||||
type Err = ParseError;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
if s.is_empty() {
|
||||
return Err(ParseError::Empty);
|
||||
}
|
||||
// Scheme split. Only the *first* colon counts — anything after
|
||||
// belongs to org/name (and would be rejected separately because
|
||||
// `:` isn't allowed there).
|
||||
let (scheme, rest) = match s.split_once(':') {
|
||||
Some((scheme, rest)) => {
|
||||
if scheme.is_empty() {
|
||||
return Err(ParseError::EmptyScheme(s.to_string()));
|
||||
}
|
||||
if scheme.contains('/') {
|
||||
return Err(ParseError::SchemeContainsSlash(s.to_string()));
|
||||
}
|
||||
(scheme.to_string(), rest)
|
||||
}
|
||||
None => (String::new(), s),
|
||||
};
|
||||
let (org, name) = rest
|
||||
.split_once('/')
|
||||
.ok_or_else(|| ParseError::MissingSlash(s.to_string()))?;
|
||||
if org.is_empty() {
|
||||
return Err(ParseError::EmptyOrg(s.to_string()));
|
||||
}
|
||||
if name.is_empty() {
|
||||
return Err(ParseError::EmptyName(s.to_string()));
|
||||
}
|
||||
if name.contains(':') {
|
||||
return Err(ParseError::NameContainsColon(s.to_string()));
|
||||
}
|
||||
Ok(Self {
|
||||
scheme,
|
||||
org: org.to_string(),
|
||||
name: name.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parses_qualified() {
|
||||
let id: ModelSourceId = "huggingface:Qwen/Qwen3.6-27B".parse().unwrap();
|
||||
assert_eq!(id.scheme, "huggingface");
|
||||
assert_eq!(id.org, "Qwen");
|
||||
assert_eq!(id.name, "Qwen3.6-27B");
|
||||
assert_eq!(id.repo_path(), "Qwen/Qwen3.6-27B");
|
||||
assert!(!id.is_scheme_unset());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parses_helexa_scheme() {
|
||||
let id: ModelSourceId = "helexa:SomeOperator/Qwen3.6-27B-Uncensored"
|
||||
.parse()
|
||||
.unwrap();
|
||||
assert_eq!(id.scheme, "helexa");
|
||||
assert_eq!(id.org, "SomeOperator");
|
||||
assert_eq!(id.name, "Qwen3.6-27B-Uncensored");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parses_bare_id_with_empty_scheme() {
|
||||
let id: ModelSourceId = "Qwen/Qwen3-30B-A3B-Instruct".parse().unwrap();
|
||||
assert_eq!(id.scheme, "");
|
||||
assert_eq!(id.org, "Qwen");
|
||||
assert_eq!(id.name, "Qwen3-30B-A3B-Instruct");
|
||||
assert!(id.is_scheme_unset());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn substitutes_default_scheme_only_when_unset() {
|
||||
let id: ModelSourceId = "Qwen/Q3".parse().unwrap();
|
||||
assert_eq!(id.with_default_scheme("huggingface").scheme, "huggingface");
|
||||
|
||||
let id: ModelSourceId = "helexa:Qwen/Q3".parse().unwrap();
|
||||
assert_eq!(
|
||||
id.with_default_scheme("huggingface").scheme,
|
||||
"helexa",
|
||||
"default substitution must not override an explicit scheme"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn display_roundtrips_qualified_id() {
|
||||
let s = "helexa:Helexa/Qwen3.6-27B";
|
||||
let id: ModelSourceId = s.parse().unwrap();
|
||||
assert_eq!(id.to_string(), s);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn display_roundtrips_bare_id() {
|
||||
let s = "Qwen/Q3";
|
||||
let id: ModelSourceId = s.parse().unwrap();
|
||||
assert_eq!(id.to_string(), s);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_empty() {
|
||||
assert_eq!("".parse::<ModelSourceId>().unwrap_err(), ParseError::Empty);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_missing_slash() {
|
||||
match "Qwen".parse::<ModelSourceId>().unwrap_err() {
|
||||
ParseError::MissingSlash(s) => assert_eq!(s, "Qwen"),
|
||||
other => panic!("expected MissingSlash, got {other:?}"),
|
||||
}
|
||||
match "huggingface:Qwen".parse::<ModelSourceId>().unwrap_err() {
|
||||
ParseError::MissingSlash(s) => assert_eq!(s, "huggingface:Qwen"),
|
||||
other => panic!("expected MissingSlash, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_empty_scheme() {
|
||||
match ":Qwen/Q3".parse::<ModelSourceId>().unwrap_err() {
|
||||
ParseError::EmptyScheme(s) => assert_eq!(s, ":Qwen/Q3"),
|
||||
other => panic!("expected EmptyScheme, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_scheme_with_slash() {
|
||||
match "hugg/ingface:Q/N".parse::<ModelSourceId>().unwrap_err() {
|
||||
ParseError::SchemeContainsSlash(s) => assert_eq!(s, "hugg/ingface:Q/N"),
|
||||
other => panic!("expected SchemeContainsSlash, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_empty_org_or_name() {
|
||||
match "huggingface:/N".parse::<ModelSourceId>().unwrap_err() {
|
||||
ParseError::EmptyOrg(_) => {}
|
||||
other => panic!("expected EmptyOrg, got {other:?}"),
|
||||
}
|
||||
match "huggingface:Q/".parse::<ModelSourceId>().unwrap_err() {
|
||||
ParseError::EmptyName(_) => {}
|
||||
other => panic!("expected EmptyName, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_name_with_colon() {
|
||||
match "huggingface:Q/N:weird"
|
||||
.parse::<ModelSourceId>()
|
||||
.unwrap_err()
|
||||
{
|
||||
ParseError::NameContainsColon(s) => assert_eq!(s, "huggingface:Q/N:weird"),
|
||||
other => panic!("expected NameContainsColon, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_roundtrips_via_struct() {
|
||||
// We serialize as a struct (scheme/org/name fields) so the
|
||||
// shape is self-describing in API payloads. Callers that want
|
||||
// the compact `scheme:org/name` string use `Display`/`FromStr`.
|
||||
let id = ModelSourceId::new("helexa", "Helexa", "Qwen3.6-27B");
|
||||
let json = serde_json::to_string(&id).unwrap();
|
||||
let back: ModelSourceId = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(back, id);
|
||||
}
|
||||
}
|
||||
@@ -6,8 +6,18 @@ use figment::{
|
||||
providers::{Env, Format, Toml},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
/// Default scheme name applied to bare `org/name` model ids when no
|
||||
/// `[harness.candle.default_source]` is set. Keeps existing operator
|
||||
/// configs (which know nothing about schemes) working unchanged.
|
||||
pub const DEFAULT_SOURCE_SCHEME: &str = "huggingface";
|
||||
|
||||
/// Endpoint URL for the default huggingface source, used when no
|
||||
/// `[harness.candle.sources.huggingface]` is configured.
|
||||
pub const DEFAULT_HF_ENDPOINT: &str = "https://huggingface.co";
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct NeuronConfig {
|
||||
#[serde(default = "default_port")]
|
||||
@@ -37,8 +47,88 @@ pub struct HarnessSettings {
|
||||
pub struct CandleHarnessConfig {
|
||||
/// HuggingFace cache directory for model weights.
|
||||
/// When unset, defers to hf-hub's default (~/.cache/huggingface).
|
||||
///
|
||||
/// Retained for back-compat — operators with existing
|
||||
/// `hf_cache = "..."` configs continue to work. Treated as the
|
||||
/// `huggingface` source's cache_dir when a sources table isn't
|
||||
/// provided.
|
||||
#[serde(default)]
|
||||
pub hf_cache: Option<PathBuf>,
|
||||
|
||||
/// Default source scheme applied to bare `org/name` model ids
|
||||
/// (those without an explicit `scheme:` prefix). When unset, falls
|
||||
/// back to `DEFAULT_SOURCE_SCHEME` ("huggingface").
|
||||
#[serde(default)]
|
||||
pub default_source: Option<String>,
|
||||
|
||||
/// Per-scheme source endpoints. Each entry maps a scheme name
|
||||
/// (`huggingface`, `helexa`, an operator's mirror tag, …) to its
|
||||
/// endpoint URL, optional auth env var, and optional cache
|
||||
/// directory.
|
||||
///
|
||||
/// When absent or missing the `huggingface` key, the loader
|
||||
/// synthesises a `huggingface` entry pointing at
|
||||
/// `https://huggingface.co` with `hf_cache` (above) as its
|
||||
/// cache_dir. This keeps single-source configs ergonomic.
|
||||
#[serde(default)]
|
||||
pub sources: HashMap<String, SourceConfig>,
|
||||
}
|
||||
|
||||
/// Per-scheme source configuration. Mirrors the shape `hf_hub::ApiBuilder`
|
||||
/// needs: endpoint URL, optional auth token (read from an env var so
|
||||
/// secrets stay out of the config file), and optional cache directory
|
||||
/// disambiguated per source to prevent mirror-vs-canonical collisions.
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct SourceConfig {
|
||||
/// Base URL of the registry. Must speak the HF-compatible wire
|
||||
/// format (siblings listing at
|
||||
/// `/api/models/{org}/{name}[/revision/{rev}]`, blob fetch at
|
||||
/// `/{org}/{name}/resolve/{rev}/{path}`).
|
||||
pub endpoint: String,
|
||||
|
||||
/// Environment variable name to read for the bearer token used
|
||||
/// against this source. `None` = anonymous. Reading from env
|
||||
/// (vs. literal token in the config) keeps secrets out of TOML.
|
||||
#[serde(default)]
|
||||
pub auth_env: Option<String>,
|
||||
|
||||
/// Cache directory for this source. The hf-hub
|
||||
/// `models--{org}--{name}/snapshots/...` tree lives directly
|
||||
/// under this path, so distinct sources serving the same
|
||||
/// `org/name` cannot collide on disk.
|
||||
///
|
||||
/// `None` means "share the harness `hf_cache` directory" — only
|
||||
/// safe when the operator has exactly one source configured.
|
||||
#[serde(default)]
|
||||
pub cache_dir: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl CandleHarnessConfig {
|
||||
/// Resolve the effective sources map for this config, synthesising
|
||||
/// a `huggingface` entry from legacy fields (`hf_cache`) when the
|
||||
/// operator hasn't supplied a sources table. Idempotent.
|
||||
///
|
||||
/// Returns a fresh map rather than mutating self so the original
|
||||
/// (operator-typed) config can still be serialized back to TOML
|
||||
/// for diagnostics.
|
||||
pub fn effective_sources(&self) -> HashMap<String, SourceConfig> {
|
||||
let mut out = self.sources.clone();
|
||||
out.entry(DEFAULT_SOURCE_SCHEME.to_string())
|
||||
.or_insert_with(|| SourceConfig {
|
||||
endpoint: DEFAULT_HF_ENDPOINT.to_string(),
|
||||
auth_env: Some("HF_TOKEN".to_string()),
|
||||
cache_dir: self.hf_cache.clone(),
|
||||
});
|
||||
out
|
||||
}
|
||||
|
||||
/// Effective default scheme. Falls back to `DEFAULT_SOURCE_SCHEME`
|
||||
/// when the operator hasn't pinned one.
|
||||
pub fn effective_default_source(&self) -> &str {
|
||||
self.default_source
|
||||
.as_deref()
|
||||
.unwrap_or(DEFAULT_SOURCE_SCHEME)
|
||||
}
|
||||
}
|
||||
|
||||
fn default_port() -> u16 {
|
||||
@@ -65,3 +155,109 @@ impl Default for NeuronConfig {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn effective_sources_synthesises_huggingface_when_absent() {
|
||||
let cfg = CandleHarnessConfig::default();
|
||||
let sources = cfg.effective_sources();
|
||||
assert!(sources.contains_key("huggingface"));
|
||||
let hf = &sources["huggingface"];
|
||||
assert_eq!(hf.endpoint, DEFAULT_HF_ENDPOINT);
|
||||
assert_eq!(hf.auth_env.as_deref(), Some("HF_TOKEN"));
|
||||
assert!(hf.cache_dir.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn effective_sources_carries_legacy_hf_cache_into_synth_entry() {
|
||||
// Existing operator configs only set `hf_cache = "/archive3/..."`
|
||||
// — the synth must pick that up so the loader keeps using the
|
||||
// operator's storage.
|
||||
let cfg = CandleHarnessConfig {
|
||||
hf_cache: Some(PathBuf::from("/archive3/llm-cache")),
|
||||
..Default::default()
|
||||
};
|
||||
let sources = cfg.effective_sources();
|
||||
assert_eq!(
|
||||
sources["huggingface"].cache_dir.as_deref(),
|
||||
Some(Path::new("/archive3/llm-cache"))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn effective_sources_preserves_explicit_huggingface_entry() {
|
||||
// When an operator types out `[harness.candle.sources.huggingface]`
|
||||
// explicitly, we must not clobber it with the synth defaults.
|
||||
let mut sources = HashMap::new();
|
||||
sources.insert(
|
||||
"huggingface".to_string(),
|
||||
SourceConfig {
|
||||
endpoint: "https://huggingface.example.org".into(),
|
||||
auth_env: Some("MY_TOKEN".into()),
|
||||
cache_dir: Some(PathBuf::from("/operator-cache")),
|
||||
},
|
||||
);
|
||||
let cfg = CandleHarnessConfig {
|
||||
hf_cache: Some(PathBuf::from("/legacy-cache")),
|
||||
sources,
|
||||
..Default::default()
|
||||
};
|
||||
let effective = cfg.effective_sources();
|
||||
assert_eq!(
|
||||
effective["huggingface"].endpoint,
|
||||
"https://huggingface.example.org"
|
||||
);
|
||||
assert_eq!(
|
||||
effective["huggingface"].auth_env.as_deref(),
|
||||
Some("MY_TOKEN")
|
||||
);
|
||||
assert_eq!(
|
||||
effective["huggingface"].cache_dir.as_deref(),
|
||||
Some(Path::new("/operator-cache"))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn effective_sources_includes_helexa_alongside_synth_huggingface() {
|
||||
let mut sources = HashMap::new();
|
||||
sources.insert(
|
||||
"helexa".to_string(),
|
||||
SourceConfig {
|
||||
endpoint: "https://registry.helexa.ai".into(),
|
||||
auth_env: Some("HELEXA_TOKEN".into()),
|
||||
cache_dir: Some(PathBuf::from("/archive3/llm-cache/helexa")),
|
||||
},
|
||||
);
|
||||
let cfg = CandleHarnessConfig {
|
||||
hf_cache: Some(PathBuf::from("/archive3/llm-cache/huggingface")),
|
||||
sources,
|
||||
..Default::default()
|
||||
};
|
||||
let effective = cfg.effective_sources();
|
||||
assert_eq!(effective.len(), 2);
|
||||
assert_eq!(effective["helexa"].endpoint, "https://registry.helexa.ai");
|
||||
// huggingface still gets synth-derived from legacy hf_cache.
|
||||
assert_eq!(
|
||||
effective["huggingface"].cache_dir.as_deref(),
|
||||
Some(Path::new("/archive3/llm-cache/huggingface"))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn effective_default_source_falls_back() {
|
||||
let cfg = CandleHarnessConfig::default();
|
||||
assert_eq!(cfg.effective_default_source(), DEFAULT_SOURCE_SCHEME);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn effective_default_source_honours_explicit() {
|
||||
let cfg = CandleHarnessConfig {
|
||||
default_source: Some("helexa".into()),
|
||||
..Default::default()
|
||||
};
|
||||
assert_eq!(cfg.effective_default_source(), "helexa");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,7 +44,13 @@ use tracing::Instrument;
|
||||
/// In-process candle harness. Owns the loaded model registry.
|
||||
pub struct CandleHarness {
|
||||
models: Arc<RwLock<HashMap<String, LoadedHandle>>>,
|
||||
hf_cache: Option<PathBuf>,
|
||||
/// Post-resolution source map: scheme → endpoint/token/cache. Built
|
||||
/// in `new()` from the operator's `CandleHarnessConfig`; auth tokens
|
||||
/// are read from their configured env vars at startup so secrets
|
||||
/// don't leak through the config file.
|
||||
sources: HashMap<String, ResolvedSource>,
|
||||
/// Scheme to substitute for bare `org/name` model ids.
|
||||
default_source: String,
|
||||
bind_url: String,
|
||||
/// One worker thread per CUDA device index that owns its
|
||||
/// `CudaContext` for the daemon's lifetime. Populated lazily by
|
||||
@@ -968,20 +974,87 @@ async fn chunked_prefill_tp(
|
||||
last_logits.ok_or_else(|| anyhow::anyhow!("chunked_prefill_tp: no chunks produced"))
|
||||
}
|
||||
|
||||
/// Per-scheme source after env-var resolution. The auth token is the
|
||||
/// already-read env-var value (or None for anonymous access), and the
|
||||
/// cache dir is the post-`resolve_hf_cache` path for the huggingface
|
||||
/// scheme and the operator's literal value for everything else.
|
||||
#[derive(Debug, Clone)]
|
||||
struct ResolvedSource {
|
||||
endpoint: String,
|
||||
auth_token: Option<String>,
|
||||
cache_dir: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl CandleHarness {
|
||||
pub fn new(bind_url: String, hf_cache: Option<PathBuf>) -> Self {
|
||||
let hf_cache = resolve_hf_cache(hf_cache);
|
||||
if let Some(p) = &hf_cache {
|
||||
tracing::info!(path = %p.display(), "candle harness using HuggingFace cache");
|
||||
/// Construct a new harness for `bind_url` using `config`. Resolves
|
||||
/// every configured source's auth env var and cache dir up front so
|
||||
/// the hot load path (`hf_api_for`) is a pure HashMap lookup.
|
||||
pub fn new(bind_url: String, config: &crate::config::CandleHarnessConfig) -> Self {
|
||||
let raw_sources = config.effective_sources();
|
||||
let default_source = config.effective_default_source().to_string();
|
||||
let mut sources = HashMap::with_capacity(raw_sources.len());
|
||||
for (scheme, src) in raw_sources.into_iter() {
|
||||
// Only the huggingface source gets the legacy
|
||||
// HF_HUB_CACHE/HF_HOME env-var fallback chain — other
|
||||
// schemes resolve to whatever the operator typed.
|
||||
let cache_dir = if scheme == crate::config::DEFAULT_SOURCE_SCHEME {
|
||||
resolve_hf_cache(src.cache_dir.clone())
|
||||
} else {
|
||||
src.cache_dir.clone()
|
||||
};
|
||||
let auth_token = src
|
||||
.auth_env
|
||||
.as_deref()
|
||||
.and_then(|var| std::env::var(var).ok())
|
||||
.filter(|v| !v.is_empty());
|
||||
if let Some(p) = &cache_dir {
|
||||
tracing::info!(
|
||||
scheme = %scheme,
|
||||
endpoint = %src.endpoint,
|
||||
cache = %p.display(),
|
||||
auth = auth_token.is_some(),
|
||||
"candle harness source resolved"
|
||||
);
|
||||
} else {
|
||||
tracing::info!(
|
||||
scheme = %scheme,
|
||||
endpoint = %src.endpoint,
|
||||
auth = auth_token.is_some(),
|
||||
"candle harness source resolved (no cache dir; using hf-hub default)"
|
||||
);
|
||||
}
|
||||
sources.insert(
|
||||
scheme,
|
||||
ResolvedSource {
|
||||
endpoint: src.endpoint,
|
||||
auth_token,
|
||||
cache_dir,
|
||||
},
|
||||
);
|
||||
}
|
||||
if !sources.contains_key(&default_source) {
|
||||
tracing::warn!(
|
||||
default_source,
|
||||
"configured default_source has no matching [harness.candle.sources.*] entry; \
|
||||
bare model ids will fail to resolve until this is fixed"
|
||||
);
|
||||
}
|
||||
Self {
|
||||
models: Arc::new(RwLock::new(HashMap::new())),
|
||||
hf_cache,
|
||||
sources,
|
||||
default_source,
|
||||
bind_url,
|
||||
device_workers: Arc::new(RwLock::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Scheme to substitute for bare `org/name` model ids. Mirrors the
|
||||
/// effective default from the operator's config, exposed for the
|
||||
/// load path's `ModelSourceId::with_default_scheme`.
|
||||
pub(crate) fn default_source_scheme(&self) -> &str {
|
||||
&self.default_source
|
||||
}
|
||||
|
||||
/// Pick a candle `Device` for the requested indices. Without the
|
||||
/// `cuda` feature, or if CUDA initialisation fails, falls back to CPU.
|
||||
fn pick_device(devices: &[u32]) -> Result<Device> {
|
||||
@@ -1033,14 +1106,33 @@ impl CandleHarness {
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
/// Build an hf-hub API client pre-configured with the harness's
|
||||
/// `hf_cache` (when one is set).
|
||||
fn hf_api(&self) -> Result<hf_hub::api::tokio::Api> {
|
||||
let mut builder = hf_hub::api::tokio::ApiBuilder::new();
|
||||
if let Some(cache) = &self.hf_cache {
|
||||
/// Build an hf-hub API client for the given scheme. The scheme
|
||||
/// must be present in the operator's configured `sources` table
|
||||
/// (the synth `huggingface` entry counts). Each source carries its
|
||||
/// own endpoint, optional bearer token, and cache directory, so
|
||||
/// the same `org/name` served by two registries cannot collide on
|
||||
/// disk.
|
||||
pub(crate) fn hf_api_for(&self, scheme: &str) -> Result<hf_hub::api::tokio::Api> {
|
||||
let src = self.sources.get(scheme).ok_or_else(|| {
|
||||
let mut configured: Vec<&str> = self.sources.keys().map(String::as_str).collect();
|
||||
configured.sort();
|
||||
anyhow::anyhow!(
|
||||
"no source configured for scheme '{scheme}'; \
|
||||
configured: {configured:?}. Add a \
|
||||
[harness.candle.sources.{scheme}] block to neuron.toml \
|
||||
with endpoint = '...'."
|
||||
)
|
||||
})?;
|
||||
let mut builder = hf_hub::api::tokio::ApiBuilder::new().with_endpoint(src.endpoint.clone());
|
||||
if let Some(cache) = &src.cache_dir {
|
||||
builder = builder.with_cache_dir(cache.clone());
|
||||
}
|
||||
builder.build().context("build hf-hub API")
|
||||
if let Some(token) = &src.auth_token {
|
||||
builder = builder.with_token(Some(token.clone()));
|
||||
}
|
||||
builder
|
||||
.build()
|
||||
.with_context(|| format!("build hf-hub API for scheme '{scheme}'"))
|
||||
}
|
||||
|
||||
/// Resolve a dense (bf16/fp16 safetensors) model to its local file
|
||||
@@ -1053,18 +1145,21 @@ impl CandleHarness {
|
||||
async fn resolve_dense_files(
|
||||
&self,
|
||||
spec: &ModelSpec,
|
||||
source_id: &cortex_core::source::ModelSourceId,
|
||||
) -> Result<(PathBuf, PathBuf, Vec<PathBuf>)> {
|
||||
let api = self.hf_api()?;
|
||||
let repo = api.model(spec.model_id.clone());
|
||||
let api = self.hf_api_for(&source_id.scheme)?;
|
||||
let repo = api.model(source_id.repo_path());
|
||||
let display_id = source_id.to_string();
|
||||
let _ = spec; // reserved for future use (quant-aware filtering)
|
||||
|
||||
let config_path = repo
|
||||
.get("config.json")
|
||||
.await
|
||||
.with_context(|| format!("fetch config.json from {}", spec.model_id))?;
|
||||
.with_context(|| format!("fetch config.json from {display_id}"))?;
|
||||
let tokenizer_path = repo
|
||||
.get("tokenizer.json")
|
||||
.await
|
||||
.with_context(|| format!("fetch tokenizer.json from {}", spec.model_id))?;
|
||||
.with_context(|| format!("fetch tokenizer.json from {display_id}"))?;
|
||||
|
||||
// Prefer the sharded layout (most HF dense models > 5B ship it).
|
||||
let safetensors_paths = match repo.get("model.safetensors.index.json").await {
|
||||
@@ -1111,9 +1206,10 @@ impl CandleHarness {
|
||||
async fn load_arch_gguf(
|
||||
&self,
|
||||
spec: &ModelSpec,
|
||||
source_id: &cortex_core::source::ModelSourceId,
|
||||
device: &Device,
|
||||
) -> Result<(PathBuf, ModelArch)> {
|
||||
let (gguf_path, tokenizer_path) = self.resolve_files(spec).await?;
|
||||
let (gguf_path, tokenizer_path) = self.resolve_files(spec, source_id).await?;
|
||||
let device_for_load = device.clone();
|
||||
let gguf_path_for_load = gguf_path.clone();
|
||||
let model_id_for_log = spec.model_id.clone();
|
||||
@@ -1176,10 +1272,11 @@ impl CandleHarness {
|
||||
async fn load_arch_dense(
|
||||
&self,
|
||||
spec: &ModelSpec,
|
||||
source_id: &cortex_core::source::ModelSourceId,
|
||||
device: &Device,
|
||||
) -> Result<(PathBuf, ModelArch)> {
|
||||
let (config_path, tokenizer_path, safetensors_paths) =
|
||||
self.resolve_dense_files(spec).await?;
|
||||
self.resolve_dense_files(spec, source_id).await?;
|
||||
let device_for_load = device.clone();
|
||||
let model_id_for_log = spec.model_id.clone();
|
||||
|
||||
@@ -1290,14 +1387,20 @@ impl CandleHarness {
|
||||
|
||||
/// Resolve a model spec to local GGUF and tokenizer file paths via
|
||||
/// hf-hub. Downloads on first use; subsequent calls are cached.
|
||||
async fn resolve_files(&self, spec: &ModelSpec) -> Result<(PathBuf, PathBuf)> {
|
||||
let api = self.hf_api()?;
|
||||
let repo = api.model(spec.model_id.clone());
|
||||
async fn resolve_files(
|
||||
&self,
|
||||
spec: &ModelSpec,
|
||||
source_id: &cortex_core::source::ModelSourceId,
|
||||
) -> Result<(PathBuf, PathBuf)> {
|
||||
let api = self.hf_api_for(&source_id.scheme)?;
|
||||
let repo_path = source_id.repo_path();
|
||||
let repo = api.model(repo_path.clone());
|
||||
let display_id = source_id.to_string();
|
||||
|
||||
let info = repo
|
||||
.info()
|
||||
.await
|
||||
.with_context(|| format!("fetch HF repo info for {}", spec.model_id))?;
|
||||
.with_context(|| format!("fetch HF repo info for {display_id}"))?;
|
||||
|
||||
let quant = spec.quant.as_deref().unwrap_or("");
|
||||
let quant_lc = quant.to_lowercase();
|
||||
@@ -1309,15 +1412,14 @@ impl CandleHarness {
|
||||
.find(|name| quant_lc.is_empty() || name.to_lowercase().contains(&quant_lc))
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"no GGUF file matching quant {:?} in repo {}",
|
||||
"no GGUF file matching quant {:?} in repo {display_id}",
|
||||
spec.quant,
|
||||
spec.model_id
|
||||
)
|
||||
})?
|
||||
.to_string();
|
||||
|
||||
tracing::info!(
|
||||
model = %spec.model_id,
|
||||
model = %display_id,
|
||||
file = %gguf_filename,
|
||||
"resolving GGUF (may be cached)"
|
||||
);
|
||||
@@ -1331,27 +1433,28 @@ impl CandleHarness {
|
||||
// tokenizer.json lives in the base non-GGUF repo. Derive the
|
||||
// base repo id by stripping a `-GGUF` / `-gguf` suffix; if
|
||||
// there's no such suffix the same repo is used (works for
|
||||
// non-GGUF model_ids).
|
||||
let tokenizer_repo_id = spec
|
||||
.model_id
|
||||
// non-GGUF model_ids). Stripping happens on the repo_path
|
||||
// (scheme already accounted for) so this composes cleanly with
|
||||
// helexa-scheme GGUF repos too.
|
||||
let tokenizer_repo_path = repo_path
|
||||
.strip_suffix("-GGUF")
|
||||
.or_else(|| spec.model_id.strip_suffix("-gguf"))
|
||||
.unwrap_or(spec.model_id.as_str())
|
||||
.or_else(|| repo_path.strip_suffix("-gguf"))
|
||||
.unwrap_or(&repo_path)
|
||||
.to_string();
|
||||
let tokenizer_repo = if tokenizer_repo_id == spec.model_id {
|
||||
let tokenizer_repo = if tokenizer_repo_path == repo_path {
|
||||
repo
|
||||
} else {
|
||||
tracing::debug!(
|
||||
from = %spec.model_id,
|
||||
to = %tokenizer_repo_id,
|
||||
from = %repo_path,
|
||||
to = %tokenizer_repo_path,
|
||||
"tokenizer.json sourced from base repo (GGUF suffix stripped)"
|
||||
);
|
||||
api.model(tokenizer_repo_id.clone())
|
||||
api.model(tokenizer_repo_path.clone())
|
||||
};
|
||||
let tokenizer_path = tokenizer_repo
|
||||
.get("tokenizer.json")
|
||||
.await
|
||||
.with_context(|| format!("fetch tokenizer.json from {tokenizer_repo_id}"))?;
|
||||
.with_context(|| format!("fetch tokenizer.json from {tokenizer_repo_path}"))?;
|
||||
Ok((gguf_path, tokenizer_path))
|
||||
}
|
||||
|
||||
@@ -2002,6 +2105,16 @@ impl Harness for CandleHarness {
|
||||
}
|
||||
}
|
||||
|
||||
// Parse the model id, substituting the harness's default
|
||||
// source for bare `org/name` entries so existing operator
|
||||
// configs keep working unchanged. Stored on the request-local
|
||||
// path so downstream resolve_* can ask the right registry.
|
||||
let source_id = spec
|
||||
.model_id
|
||||
.parse::<cortex_core::source::ModelSourceId>()
|
||||
.with_context(|| format!("parse model id '{}' as scheme:org/name", spec.model_id))?
|
||||
.with_default_scheme(self.default_source_scheme());
|
||||
|
||||
// Preflight: classify the source repo and apply the
|
||||
// tp/quant/source feasibility table before any device
|
||||
// allocation, NCCL handshake, or weight fetch. Failures bubble
|
||||
@@ -2011,8 +2124,8 @@ impl Harness for CandleHarness {
|
||||
// dispatch — downstream `resolve_files` / `resolve_dense_files`
|
||||
// re-run their own substring match — but the structured error
|
||||
// surface is the main payoff.
|
||||
let api = self.hf_api()?;
|
||||
super::preflight::preflight(&api, spec)
|
||||
let api = self.hf_api_for(&source_id.scheme)?;
|
||||
super::preflight::preflight(&api, &source_id, spec)
|
||||
.await
|
||||
.map_err(anyhow::Error::new)?;
|
||||
|
||||
@@ -2020,7 +2133,7 @@ impl Harness for CandleHarness {
|
||||
if tp_size > 1 {
|
||||
#[cfg(feature = "cuda")]
|
||||
{
|
||||
return self.load_tp(spec, tp_size).await;
|
||||
return self.load_tp(spec, &source_id, tp_size).await;
|
||||
}
|
||||
#[cfg(not(feature = "cuda"))]
|
||||
{
|
||||
@@ -2048,7 +2161,7 @@ impl Harness for CandleHarness {
|
||||
let (tokenizer_path, arch_local, arch_handle) = if let Some(w) = &worker {
|
||||
// CUDA path: resolve, then load in the worker.
|
||||
if spec.quant.is_some() {
|
||||
let (gguf_path, tokenizer_path) = self.resolve_files(spec).await?;
|
||||
let (gguf_path, tokenizer_path) = self.resolve_files(spec, &source_id).await?;
|
||||
let handle = w
|
||||
.load_gguf(gguf_path, spec.model_id.clone())
|
||||
.await
|
||||
@@ -2056,7 +2169,7 @@ impl Harness for CandleHarness {
|
||||
(tokenizer_path, None, Some(handle))
|
||||
} else {
|
||||
let (config_path, tokenizer_path, safetensors_paths) =
|
||||
self.resolve_dense_files(spec).await?;
|
||||
self.resolve_dense_files(spec, &source_id).await?;
|
||||
let handle = w
|
||||
.load_dense(config_path, safetensors_paths, spec.model_id.clone())
|
||||
.await
|
||||
@@ -2066,9 +2179,9 @@ impl Harness for CandleHarness {
|
||||
} else {
|
||||
// CPU path: legacy spawn_blocking + Arc<Mutex<ModelArch>>.
|
||||
let (tokenizer_path, arch) = if spec.quant.is_some() {
|
||||
self.load_arch_gguf(spec, &device).await?
|
||||
self.load_arch_gguf(spec, &source_id, &device).await?
|
||||
} else {
|
||||
self.load_arch_dense(spec, &device).await?
|
||||
self.load_arch_dense(spec, &source_id, &device).await?
|
||||
};
|
||||
(tokenizer_path, Some(Arc::new(Mutex::new(arch))), None)
|
||||
};
|
||||
@@ -2226,7 +2339,12 @@ impl CandleHarness {
|
||||
/// `spec.devices` carries the per-rank CUDA device indices (one
|
||||
/// entry per rank, in rank order); defaults to `0..tp_size`.
|
||||
#[cfg(feature = "cuda")]
|
||||
async fn load_tp(&self, spec: &ModelSpec, tp_size: u32) -> Result<()> {
|
||||
async fn load_tp(
|
||||
&self,
|
||||
spec: &ModelSpec,
|
||||
source_id: &cortex_core::source::ModelSourceId,
|
||||
tp_size: u32,
|
||||
) -> Result<()> {
|
||||
use std::sync::Arc as StdArc;
|
||||
use tokio::sync::Mutex as TMutex;
|
||||
|
||||
@@ -2251,7 +2369,7 @@ impl CandleHarness {
|
||||
|
||||
// 1. Resolve config + tokenizer + safetensors via hf-hub.
|
||||
let (config_path, tokenizer_path, safetensors_paths) =
|
||||
self.resolve_dense_files(spec).await?;
|
||||
self.resolve_dense_files(spec, source_id).await?;
|
||||
let config_json = std::fs::read_to_string(&config_path).context("read config.json")?;
|
||||
// Reject unsupported architectures *before* spawning the worker
|
||||
// pool and fanning out NCCL — otherwise we'd burn the pool
|
||||
@@ -4023,4 +4141,75 @@ mod tests {
|
||||
"DriverError(CUDA_ERROR_ILLEGAL_ADDRESS, \"an illegal memory access was encountered\")"
|
||||
));
|
||||
}
|
||||
|
||||
/// Phase 1 of plan-source-aware-loader: harness must resolve each
|
||||
/// configured scheme to its own endpoint+cache, and reject schemes
|
||||
/// the operator hasn't configured with a useful error.
|
||||
#[test]
|
||||
fn hf_api_for_routes_per_scheme() {
|
||||
use crate::config::{CandleHarnessConfig, SourceConfig};
|
||||
use std::collections::HashMap;
|
||||
|
||||
let mut sources = HashMap::new();
|
||||
sources.insert(
|
||||
"huggingface".to_string(),
|
||||
SourceConfig {
|
||||
endpoint: "https://huggingface.example.org".into(),
|
||||
auth_env: None,
|
||||
cache_dir: Some(std::path::PathBuf::from("/tmp/hf-cache")),
|
||||
},
|
||||
);
|
||||
sources.insert(
|
||||
"helexa".to_string(),
|
||||
SourceConfig {
|
||||
endpoint: "https://registry.helexa.example.ai".into(),
|
||||
auth_env: None,
|
||||
cache_dir: Some(std::path::PathBuf::from("/tmp/helexa-cache")),
|
||||
},
|
||||
);
|
||||
let cfg = CandleHarnessConfig {
|
||||
sources,
|
||||
default_source: Some("huggingface".into()),
|
||||
..Default::default()
|
||||
};
|
||||
let harness = CandleHarness::new("http://localhost:13131".into(), &cfg);
|
||||
|
||||
// Both configured schemes build cleanly.
|
||||
harness
|
||||
.hf_api_for("huggingface")
|
||||
.expect("huggingface scheme should build");
|
||||
harness
|
||||
.hf_api_for("helexa")
|
||||
.expect("helexa scheme should build");
|
||||
|
||||
// Unknown scheme errors with a message that names the configured
|
||||
// set so the operator can act on it.
|
||||
let err = harness
|
||||
.hf_api_for("does-not-exist")
|
||||
.expect_err("unknown scheme should error");
|
||||
let msg = format!("{err:#}");
|
||||
assert!(
|
||||
msg.contains("does-not-exist") && msg.contains("huggingface") && msg.contains("helexa"),
|
||||
"error must list configured schemes: {msg}"
|
||||
);
|
||||
|
||||
assert_eq!(harness.default_source_scheme(), "huggingface");
|
||||
}
|
||||
|
||||
/// Operator with only `hf_cache` set (no `sources` table) still
|
||||
/// gets a working `huggingface` source pointed at HF.
|
||||
#[test]
|
||||
fn hf_api_for_synthesises_huggingface_from_legacy_hf_cache() {
|
||||
use crate::config::CandleHarnessConfig;
|
||||
|
||||
let cfg = CandleHarnessConfig {
|
||||
hf_cache: Some(std::path::PathBuf::from("/archive3/llm-cache")),
|
||||
..Default::default()
|
||||
};
|
||||
let harness = CandleHarness::new("http://localhost:13131".into(), &cfg);
|
||||
harness
|
||||
.hf_api_for("huggingface")
|
||||
.expect("synth huggingface source should build");
|
||||
assert_eq!(harness.default_source_scheme(), "huggingface");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,7 +115,7 @@ impl HarnessRegistry {
|
||||
"candle" => {
|
||||
let harness = Arc::new(candle::CandleHarness::new(
|
||||
bind_url.to_string(),
|
||||
settings.candle.hf_cache.clone(),
|
||||
&settings.candle,
|
||||
));
|
||||
registry.candle = Some(Arc::clone(&harness));
|
||||
registry.harnesses.insert("candle".into(), harness);
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
//! cleanly when Phase 1 lands.
|
||||
|
||||
use cortex_core::harness::ModelSpec;
|
||||
use cortex_core::source::ModelSourceId;
|
||||
use hf_hub::api::tokio::Api;
|
||||
use serde::Serialize;
|
||||
|
||||
@@ -115,13 +116,22 @@ pub enum PreflightError {
|
||||
/// One network round-trip (`repo.info()`); no blob fetches. Returns
|
||||
/// `Ok(PlacementPlan)` when the requested combination is feasible, or
|
||||
/// a structured `PreflightError` describing what's wrong.
|
||||
pub async fn preflight(api: &Api, spec: &ModelSpec) -> Result<PlacementPlan, PreflightError> {
|
||||
let repo = api.model(spec.model_id.clone());
|
||||
///
|
||||
/// `api` must already be configured for the scheme `source_id` belongs
|
||||
/// to — caller (typically `CandleHarness::load_model`) builds it via
|
||||
/// `hf_api_for(&source_id.scheme)`. Only the `org/name` portion of the
|
||||
/// id is sent to the registry.
|
||||
pub async fn preflight(
|
||||
api: &Api,
|
||||
source_id: &ModelSourceId,
|
||||
spec: &ModelSpec,
|
||||
) -> Result<PlacementPlan, PreflightError> {
|
||||
let repo = api.model(source_id.repo_path());
|
||||
let info = repo
|
||||
.info()
|
||||
.await
|
||||
.map_err(|e| PreflightError::RepoFetchFailed {
|
||||
model_id: spec.model_id.clone(),
|
||||
model_id: source_id.to_string(),
|
||||
cause: format!("{e}"),
|
||||
})?;
|
||||
|
||||
@@ -132,13 +142,13 @@ pub async fn preflight(api: &Api, spec: &ModelSpec) -> Result<PlacementPlan, Pre
|
||||
match (&format, tp_size, spec.quant.as_deref()) {
|
||||
// No weights at all — nothing to do.
|
||||
(SourceFormat::Empty, _, _) => Err(PreflightError::EmptyRepo {
|
||||
model_id: spec.model_id.clone(),
|
||||
model_id: source_id.to_string(),
|
||||
}),
|
||||
|
||||
// GGUF-only + TP: not supported. Today's HauhauCS failure.
|
||||
(SourceFormat::Gguf { quants }, tp, _) if tp > 1 => {
|
||||
Err(PreflightError::TpRequiresSafetensors {
|
||||
model_id: spec.model_id.clone(),
|
||||
model_id: source_id.to_string(),
|
||||
tp_size: tp,
|
||||
gguf_quants: quants.clone(),
|
||||
suggestion: format!(
|
||||
@@ -154,13 +164,13 @@ pub async fn preflight(api: &Api, spec: &ModelSpec) -> Result<PlacementPlan, Pre
|
||||
let picked = pick_gguf_file(&filenames, requested.unwrap_or(""));
|
||||
match picked {
|
||||
Some(fname) => Ok(PlacementPlan {
|
||||
model_id: spec.model_id.clone(),
|
||||
model_id: source_id.to_string(),
|
||||
format: format.clone(),
|
||||
tp_size,
|
||||
picked_quant_file: Some(fname),
|
||||
}),
|
||||
None => Err(PreflightError::QuantNotFound {
|
||||
model_id: spec.model_id.clone(),
|
||||
model_id: source_id.to_string(),
|
||||
requested: requested.unwrap_or("").to_string(),
|
||||
available: quants.clone(),
|
||||
nearest: nearest_quant(requested.unwrap_or(""), quants),
|
||||
@@ -174,7 +184,7 @@ pub async fn preflight(api: &Api, spec: &ModelSpec) -> Result<PlacementPlan, Pre
|
||||
// on disk, since it needs the parsed JSON.
|
||||
(SourceFormat::DenseSafetensors { .. } | SourceFormat::Mixed { .. }, _, _) => {
|
||||
Ok(PlacementPlan {
|
||||
model_id: spec.model_id.clone(),
|
||||
model_id: source_id.to_string(),
|
||||
format: format.clone(),
|
||||
tp_size,
|
||||
picked_quant_file: None,
|
||||
@@ -431,14 +441,20 @@ mod tests {
|
||||
format: &SourceFormat,
|
||||
filenames: &[&str],
|
||||
) -> Result<PlacementPlan, PreflightError> {
|
||||
// Tests parse spec.model_id with the default scheme so the
|
||||
// assertions can keep comparing against bare "org/name".
|
||||
let source_id: ModelSourceId = spec
|
||||
.model_id
|
||||
.parse::<ModelSourceId>()
|
||||
.expect("test spec.model_id must parse");
|
||||
let tp_size = spec.tensor_parallel.unwrap_or(1);
|
||||
match (format, tp_size, spec.quant.as_deref()) {
|
||||
(SourceFormat::Empty, _, _) => Err(PreflightError::EmptyRepo {
|
||||
model_id: spec.model_id.clone(),
|
||||
model_id: source_id.to_string(),
|
||||
}),
|
||||
(SourceFormat::Gguf { quants }, tp, _) if tp > 1 => {
|
||||
Err(PreflightError::TpRequiresSafetensors {
|
||||
model_id: spec.model_id.clone(),
|
||||
model_id: source_id.to_string(),
|
||||
tp_size: tp,
|
||||
gguf_quants: quants.clone(),
|
||||
suggestion: format!(
|
||||
@@ -451,13 +467,13 @@ mod tests {
|
||||
let picked = pick_gguf_file(filenames, requested.unwrap_or(""));
|
||||
match picked {
|
||||
Some(fname) => Ok(PlacementPlan {
|
||||
model_id: spec.model_id.clone(),
|
||||
model_id: source_id.to_string(),
|
||||
format: format.clone(),
|
||||
tp_size,
|
||||
picked_quant_file: Some(fname),
|
||||
}),
|
||||
None => Err(PreflightError::QuantNotFound {
|
||||
model_id: spec.model_id.clone(),
|
||||
model_id: source_id.to_string(),
|
||||
requested: requested.unwrap_or("").to_string(),
|
||||
available: quants.clone(),
|
||||
nearest: nearest_quant(requested.unwrap_or(""), quants),
|
||||
@@ -466,7 +482,7 @@ mod tests {
|
||||
}
|
||||
(SourceFormat::DenseSafetensors { .. } | SourceFormat::Mixed { .. }, _, _) => {
|
||||
Ok(PlacementPlan {
|
||||
model_id: spec.model_id.clone(),
|
||||
model_id: source_id.to_string(),
|
||||
format: format.clone(),
|
||||
tp_size,
|
||||
picked_quant_file: None,
|
||||
|
||||
@@ -12,6 +12,7 @@ use axum::http::StatusCode;
|
||||
use axum::response::{IntoResponse, Json};
|
||||
use axum::routing::get;
|
||||
use cortex_core::harness::ModelSpec;
|
||||
use cortex_core::source::ModelSourceId;
|
||||
use neuron::harness::preflight::{PreflightError, SourceFormat, preflight};
|
||||
use serde_json::{Value, json};
|
||||
use std::sync::Arc;
|
||||
@@ -89,6 +90,15 @@ fn spec(model_id: &str, tp: Option<u32>, quant: Option<&str>) -> ModelSpec {
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a `ModelSourceId` from a bare `org/name` test input,
|
||||
/// substituting the default scheme so the mock route key matches.
|
||||
fn sid(model_id: &str) -> ModelSourceId {
|
||||
model_id
|
||||
.parse::<ModelSourceId>()
|
||||
.expect("test model_id parses")
|
||||
.with_default_scheme("huggingface")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn preflight_gguf_tp_rejected_over_http() {
|
||||
let cache = tempfile::tempdir().expect("tempdir");
|
||||
@@ -107,7 +117,7 @@ async fn preflight_gguf_tp_rejected_over_http() {
|
||||
|
||||
let api = build_api(&endpoint, cache.path());
|
||||
let s = spec("HauhauCS/Qwen3.6", Some(2), Some("q6k"));
|
||||
let err = preflight(&api, &s).await.unwrap_err();
|
||||
let err = preflight(&api, &sid(&s.model_id), &s).await.unwrap_err();
|
||||
match err {
|
||||
PreflightError::TpRequiresSafetensors {
|
||||
model_id,
|
||||
@@ -115,7 +125,9 @@ async fn preflight_gguf_tp_rejected_over_http() {
|
||||
gguf_quants,
|
||||
..
|
||||
} => {
|
||||
assert_eq!(model_id, "HauhauCS/Qwen3.6");
|
||||
// Scheme prefix surfaces in error display now that
|
||||
// preflight is source-aware.
|
||||
assert_eq!(model_id, "huggingface:HauhauCS/Qwen3.6");
|
||||
assert_eq!(tp_size, 2);
|
||||
assert_eq!(gguf_quants.len(), 3);
|
||||
}
|
||||
@@ -140,7 +152,7 @@ async fn preflight_gguf_quant_suggestion_over_http() {
|
||||
|
||||
let api = build_api(&endpoint, cache.path());
|
||||
let s = spec("HauhauCS/Qwen3.6", Some(1), Some("q6k"));
|
||||
let err = preflight(&api, &s).await.unwrap_err();
|
||||
let err = preflight(&api, &sid(&s.model_id), &s).await.unwrap_err();
|
||||
match err {
|
||||
PreflightError::QuantNotFound {
|
||||
requested,
|
||||
@@ -176,7 +188,9 @@ async fn preflight_dense_safetensors_tp_ok() {
|
||||
|
||||
let api = build_api(&endpoint, cache.path());
|
||||
let s = spec("Qwen/Q3-30B", Some(2), Some("q5k"));
|
||||
let plan = preflight(&api, &s).await.expect("dense+tp should succeed");
|
||||
let plan = preflight(&api, &sid(&s.model_id), &s)
|
||||
.await
|
||||
.expect("dense+tp should succeed");
|
||||
assert_eq!(plan.tp_size, 2);
|
||||
assert!(plan.picked_quant_file.is_none());
|
||||
assert!(matches!(
|
||||
@@ -197,7 +211,7 @@ async fn preflight_gguf_single_gpu_good_quant() {
|
||||
|
||||
let api = build_api(&endpoint, cache.path());
|
||||
let s = spec("HauhauCS/Qwen3.6", Some(1), Some("q6_k_p"));
|
||||
let plan = preflight(&api, &s)
|
||||
let plan = preflight(&api, &sid(&s.model_id), &s)
|
||||
.await
|
||||
.expect("good quant should succeed");
|
||||
assert_eq!(plan.tp_size, 1);
|
||||
@@ -219,7 +233,7 @@ async fn preflight_repo_fetch_failed_on_404() {
|
||||
|
||||
let api = build_api(&endpoint, cache.path());
|
||||
let s = spec("DoesNot/Exist", Some(1), None);
|
||||
let err = preflight(&api, &s).await.unwrap_err();
|
||||
let err = preflight(&api, &sid(&s.model_id), &s).await.unwrap_err();
|
||||
assert!(
|
||||
matches!(err, PreflightError::RepoFetchFailed { .. }),
|
||||
"expected RepoFetchFailed, got {err:?}"
|
||||
@@ -238,7 +252,7 @@ async fn preflight_empty_repo_rejected() {
|
||||
|
||||
let api = build_api(&endpoint, cache.path());
|
||||
let s = spec("Empty/Repo", Some(1), None);
|
||||
let err = preflight(&api, &s).await.unwrap_err();
|
||||
let err = preflight(&api, &sid(&s.model_id), &s).await.unwrap_err();
|
||||
assert!(
|
||||
matches!(err, PreflightError::EmptyRepo { .. }),
|
||||
"expected EmptyRepo, got {err:?}"
|
||||
@@ -264,6 +278,8 @@ async fn preflight_mixed_repo_prefers_safetensors() {
|
||||
// TP=2 + quant should succeed via the dense path even though a
|
||||
// GGUF is present — the dense path handles ISQ.
|
||||
let s = spec("Mixed/Repo", Some(2), Some("q5k"));
|
||||
let plan = preflight(&api, &s).await.expect("mixed should succeed");
|
||||
let plan = preflight(&api, &sid(&s.model_id), &s)
|
||||
.await
|
||||
.expect("mixed should succeed");
|
||||
assert!(matches!(plan.format, SourceFormat::Mixed { .. }));
|
||||
}
|
||||
|
||||
@@ -22,7 +22,9 @@ name = "candle"
|
||||
# HuggingFace cache directory for model weights.
|
||||
#
|
||||
# Resolution order (first hit wins):
|
||||
# 1. `hf_cache` here in this file.
|
||||
# 1. `hf_cache` here in this file (applies to the synth `huggingface`
|
||||
# source only — see [harness.candle.sources.*] below for explicit
|
||||
# per-source paths).
|
||||
# 2. `HF_HUB_CACHE` env var — same convention as the Python
|
||||
# `huggingface_hub` library, so an existing cache directory shared
|
||||
# with other tooling can be reused without per-tool config.
|
||||
@@ -36,6 +38,32 @@ name = "candle"
|
||||
# Environment=HF_HUB_CACHE=/archive/hf-cache
|
||||
# hf_cache = "/var/lib/neuron/hf-cache"
|
||||
|
||||
# Default scheme applied to bare `org/name` model ids (those without a
|
||||
# `scheme:` prefix). Defaults to "huggingface" when unset. Set to
|
||||
# "helexa" to make `default_models = [{ model_id = "Helexa/Foo" }]`
|
||||
# resolve via the helexa registry without prefixing every entry.
|
||||
# default_source = "huggingface"
|
||||
|
||||
# Per-scheme source endpoints. Each scheme maps to an HF-compatible
|
||||
# registry. The `huggingface` source is auto-synthesised pointing at
|
||||
# `https://huggingface.co` when omitted; declare it explicitly here to
|
||||
# override the endpoint, auth env, or cache dir.
|
||||
#
|
||||
# [harness.candle.sources.huggingface]
|
||||
# endpoint = "https://huggingface.co"
|
||||
# auth_env = "HF_TOKEN" # optional bearer token via env var
|
||||
# cache_dir = "/archive3/llm-cache/huggingface"
|
||||
#
|
||||
# Add helexa (or any operator-run mirror speaking the HF-compatible
|
||||
# wire format) by adding another sources entry. Caches are
|
||||
# disambiguated per scheme so a mirror serving the same `org/name` as
|
||||
# HF cannot collide on disk.
|
||||
#
|
||||
# [harness.candle.sources.helexa]
|
||||
# endpoint = "https://registry.helexa.ai"
|
||||
# auth_env = "HELEXA_TOKEN"
|
||||
# cache_dir = "/archive3/llm-cache/helexa"
|
||||
|
||||
# -- Default models ----------------------------------------------------------
|
||||
# Models listed here are loaded automatically when the neuron service
|
||||
# activates. Loading is sequential — a slow or failing entry doesn't
|
||||
|
||||
Reference in New Issue
Block a user