refactor(neuron): cut mistralrs/llamacpp, scaffold candle harness

Stage 1 of the candle-native pivot. Replaces the external-process
harness model (mistralrs over HTTP, llamacpp placeholder) with an
in-process Harness trait whose sole implementation is candle. The
trait keeps its shape so future engines slot in additively, but
start/stop default to no-ops and HarnessConfig drops endpoint and
systemd_unit since no harness needs external supervision.

Behaviour is unchanged on the wire: load_model returns a "not
implemented yet (Stage 2)" error and list_models is empty. The
gateway-side proxy, poller, and router are untouched.

CLAUDE.md Phase 11 (llama.cpp) and Phase 12 (mistral.rs COPR) are
marked superseded; the staged plan lives in
~/.claude/plans/create-a-more-aggressive-calm-naur.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-18 15:53:04 +03:00
parent 7f797b0265
commit 3cccc2c56b
19 changed files with 203 additions and 401 deletions

View File

@@ -2,7 +2,7 @@
//!
//! These mirror the `/v1/messages` format used by the Anthropic API.
//! The gateway accepts these, translates to OpenAI format, proxies to
//! mistral.rs, then translates the response back.
//! the inference backend (neuron), then translates the response back.
use serde::{Deserialize, Serialize};
use serde_json::Value;

View File

@@ -9,13 +9,13 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
/// Configuration for a harness instance on a neuron.
///
/// All current harnesses are in-process (candle); per-harness tuning
/// (cache paths, device policies, etc.) lives in dedicated config
/// blocks rather than on this struct.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HarnessConfig {
pub name: String,
/// Base URL of the harness (e.g. "http://localhost:8080" for mistral.rs).
pub endpoint: Option<String>,
/// Systemd unit name, if the harness is managed via systemd.
pub systemd_unit: Option<String>,
}
/// Health status of a harness process.
@@ -47,16 +47,24 @@ pub struct ModelInfo {
}
/// What an inference harness must do, from neuron's perspective.
///
/// All current harnesses are in-process — they share neuron's address
/// space and lifecycle. `start`/`stop` therefore default to no-ops; a
/// future process-supervising harness would override them.
#[async_trait]
pub trait Harness: Send + Sync {
/// Human-readable name (e.g. "mistralrs", "llamacpp", "comfyui").
/// Human-readable name (e.g. "candle").
fn name(&self) -> &str;
/// Start the harness process if it is not already running.
async fn start(&self, config: &HarnessConfig) -> Result<()>;
/// Start the harness. Default no-op for in-process harnesses.
async fn start(&self, _config: &HarnessConfig) -> Result<()> {
Ok(())
}
/// Stop the harness process gracefully.
async fn stop(&self) -> Result<()>;
/// Stop the harness. Default no-op for in-process harnesses.
async fn stop(&self) -> Result<()> {
Ok(())
}
/// Health check. Returns the harness process status.
async fn health(&self) -> HarnessHealth;

View File

@@ -3,7 +3,7 @@
//! These are a subset sufficient for chat completions (streaming + non-streaming).
//! Fields not relevant to proxying are captured as `serde_json::Value` via
//! `#[serde(flatten)]` so we forward them without needing to enumerate every
//! extension field mistral.rs supports.
//! extension field a backend might support.
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -22,7 +22,7 @@ pub struct ChatCompletionRequest {
pub max_tokens: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stream: Option<bool>,
/// All other fields (tools, response_format, mistral.rs extensions, etc.)
/// All other fields (tools, response_format, backend extensions, etc.)
#[serde(flatten)]
pub extra: Value,
}

View File

@@ -22,6 +22,7 @@ use tokio::net::TcpListener;
/// - GET /models/:id/endpoint (returns the inference URL)
/// - POST /models/unload (accepts unload requests)
/// - GET /v1/chat/completions + POST /v1/chat/completions (inference)
///
/// Returns the neuron base URL.
pub async fn spawn_mock_neuron() -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
@@ -54,7 +55,7 @@ pub async fn spawn_mock_neuron() -> String {
async fn mock_neuron_list_models() -> Json<Value> {
Json(json!([
{"id": "test-model", "harness": "mistralrs", "status": "loaded", "devices": [0], "vram_used_mb": 8000}
{"id": "test-model", "harness": "candle", "status": "loaded", "devices": [0], "vram_used_mb": 8000}
]))
}

View File

@@ -12,8 +12,8 @@ use std::sync::Arc;
async fn test_poller_discovers_models() {
// Mock neuron reports 2 models via /models endpoint (neuron format).
let mock_url = common::spawn_mock_neuron_with_models(json!([
{"id": "model-a", "harness": "mistralrs", "status": "loaded", "devices": [0], "vram_used_mb": 8000},
{"id": "model-b", "harness": "mistralrs", "status": "unloaded", "devices": [], "vram_used_mb": null}
{"id": "model-a", "harness": "candle", "status": "loaded", "devices": [0], "vram_used_mb": 8000},
{"id": "model-b", "harness": "candle", "status": "unloaded", "devices": [], "vram_used_mb": null}
]))
.await;
@@ -63,8 +63,8 @@ async fn test_poller_discovers_models() {
#[tokio::test]
async fn test_poller_updates_gateway_models_endpoint() {
let mock_url = common::spawn_mock_neuron_with_models(json!([
{"id": "model-x", "harness": "mistralrs", "status": "loaded", "devices": [0], "vram_used_mb": null},
{"id": "model-y", "harness": "mistralrs", "status": "loaded", "devices": [1], "vram_used_mb": null}
{"id": "model-x", "harness": "candle", "status": "loaded", "devices": [0], "vram_used_mb": null},
{"id": "model-y", "harness": "candle", "status": "loaded", "devices": [1], "vram_used_mb": null}
]))
.await;
@@ -152,8 +152,8 @@ async fn test_poller_marks_unreachable_node_unhealthy() {
#[tokio::test]
async fn test_poller_removes_stale_models() {
let mock_url = common::spawn_mock_neuron_with_models(json!([
{"id": "keep-me", "harness": "mistralrs", "status": "loaded", "devices": [0], "vram_used_mb": null},
{"id": "drop-me", "harness": "mistralrs", "status": "loaded", "devices": [0], "vram_used_mb": null}
{"id": "keep-me", "harness": "candle", "status": "loaded", "devices": [0], "vram_used_mb": null},
{"id": "drop-me", "harness": "candle", "status": "loaded", "devices": [0], "vram_used_mb": null}
]))
.await;
@@ -183,7 +183,7 @@ async fn test_poller_removes_stale_models() {
// New mock with only one model.
let new_mock_url = common::spawn_mock_neuron_with_models(json!([
{"id": "keep-me", "harness": "mistralrs", "status": "loaded", "devices": [0], "vram_used_mb": null}
{"id": "keep-me", "harness": "candle", "status": "loaded", "devices": [0], "vram_used_mb": null}
]))
.await;

View File

@@ -51,18 +51,18 @@ async fn test_streaming_sse_passthrough() {
}
assert!(
chunks.len() >= chunk_count + 1,
"expected at least {} chunks (got {}): {:?}",
chunk_count + 1,
chunks.len() > chunk_count,
"expected more than {} chunks (got {}): {:?}",
chunk_count,
chunks.len(),
chunks,
);
assert_eq!(chunks.last().unwrap(), "[DONE]");
for i in 0..chunk_count {
for (i, chunk) in chunks.iter().enumerate().take(chunk_count) {
let chunk_json: serde_json::Value =
serde_json::from_str(&chunks[i]).expect("chunk should be valid JSON");
serde_json::from_str(chunk).expect("chunk should be valid JSON");
assert_eq!(
chunk_json["choices"][0]["delta"]["content"],
format!("token{i}")

View File

@@ -0,0 +1,54 @@
//! Candle harness — in-process inference using huggingface/candle.
//!
//! This is the sole `Harness` implementation. Unlike the previous
//! mistralrs/llamacpp harnesses, candle inference runs inside the neuron
//! process itself — no external subprocess, no systemd indirection.
//!
//! Stage 1 ships this as an inert skeleton; Stage 2 wires up actual
//! model load/unload via `candle-transformers`.
use anyhow::Result;
use async_trait::async_trait;
use cortex_core::harness::{Harness, HarnessHealth, ModelInfo, ModelSpec};
pub struct CandleHarness {
/// URL where this neuron serves inference (its own bind address).
bind_url: String,
}
impl CandleHarness {
pub fn new(bind_url: String) -> Self {
Self { bind_url }
}
}
#[async_trait]
impl Harness for CandleHarness {
fn name(&self) -> &str {
"candle"
}
async fn health(&self) -> HarnessHealth {
HarnessHealth {
name: "candle".into(),
running: true,
uptime_secs: None,
}
}
async fn list_models(&self) -> Result<Vec<ModelInfo>> {
Ok(Vec::new())
}
async fn load_model(&self, _spec: &ModelSpec) -> Result<()> {
anyhow::bail!("candle harness load_model not implemented yet (Stage 2)")
}
async fn unload_model(&self, _model_id: &str) -> Result<()> {
anyhow::bail!("candle harness unload_model not implemented yet (Stage 2)")
}
async fn inference_endpoint(&self, _model_id: &str) -> Option<String> {
Some(self.bind_url.clone())
}
}

View File

@@ -1 +0,0 @@
// llama.cpp harness implementation — Phase 11.

View File

@@ -1,163 +0,0 @@
//! mistral.rs harness implementation.
//!
//! Wraps the mistral.rs HTTP API for model lifecycle management
//! and optionally manages the process via systemd.
use anyhow::Result;
use async_trait::async_trait;
use cortex_core::harness::{Harness, HarnessConfig, HarnessHealth, ModelInfo, ModelSpec};
use reqwest::Client;
use serde::Deserialize;
pub struct MistralRsHarness {
endpoint: String,
systemd_unit: Option<String>,
client: Client,
}
impl MistralRsHarness {
pub fn new(endpoint: String, systemd_unit: Option<String>) -> Self {
Self {
endpoint,
systemd_unit,
client: Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.expect("failed to build HTTP client"),
}
}
}
/// Response from mistral.rs `GET /v1/models`.
#[derive(Debug, Deserialize)]
struct ModelsResponse {
data: Vec<ModelEntry>,
}
#[derive(Debug, Deserialize)]
struct ModelEntry {
id: String,
#[serde(default)]
status: Option<String>,
}
#[async_trait]
impl Harness for MistralRsHarness {
fn name(&self) -> &str {
"mistralrs"
}
async fn start(&self, _config: &HarnessConfig) -> Result<()> {
let Some(unit) = &self.systemd_unit else {
anyhow::bail!("no systemd unit configured for mistralrs harness");
};
let output = tokio::process::Command::new("systemctl")
.args(["start", unit])
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("systemctl start {unit} failed: {stderr}");
}
// Wait for the health endpoint to respond (up to 30s).
let url = format!("{}/health", self.endpoint);
for _ in 0..30 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if self.client.get(&url).send().await.is_ok() {
tracing::info!(unit, "mistralrs started and healthy");
return Ok(());
}
}
anyhow::bail!("mistralrs started but health endpoint did not respond within 30s");
}
async fn stop(&self) -> Result<()> {
let Some(unit) = &self.systemd_unit else {
anyhow::bail!("no systemd unit configured for mistralrs harness");
};
let output = tokio::process::Command::new("systemctl")
.args(["stop", unit])
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("systemctl stop {unit} failed: {stderr}");
}
Ok(())
}
async fn health(&self) -> HarnessHealth {
let url = format!("{}/health", self.endpoint);
let running = self.client.get(&url).send().await.is_ok();
HarnessHealth {
name: "mistralrs".into(),
running,
uptime_secs: None,
}
}
async fn list_models(&self) -> Result<Vec<ModelInfo>> {
let url = format!("{}/v1/models", self.endpoint);
let resp = self.client.get(&url).send().await?;
if !resp.status().is_success() {
anyhow::bail!("GET /v1/models returned {}", resp.status());
}
let models_resp: ModelsResponse = resp.json().await?;
Ok(models_resp
.data
.into_iter()
.map(|m| ModelInfo {
id: m.id,
harness: "mistralrs".into(),
status: m.status.unwrap_or_else(|| "loaded".into()),
devices: vec![],
vram_used_mb: None,
})
.collect())
}
async fn load_model(&self, spec: &ModelSpec) -> Result<()> {
let url = format!("{}/v1/models/reload", self.endpoint);
let resp = self
.client
.post(&url)
.json(&serde_json::json!({ "model_id": spec.model_id }))
.send()
.await?;
if !resp.status().is_success() {
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("POST /v1/models/reload failed: {body}");
}
Ok(())
}
async fn unload_model(&self, model_id: &str) -> Result<()> {
let url = format!("{}/v1/models/unload", self.endpoint);
let resp = self
.client
.post(&url)
.json(&serde_json::json!({ "model_id": model_id }))
.send()
.await?;
if !resp.status().is_success() {
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("POST /v1/models/unload failed: {body}");
}
Ok(())
}
async fn inference_endpoint(&self, _model_id: &str) -> Option<String> {
// mistral.rs routes internally by model name in the request body,
// so the inference endpoint is always the base URL.
Some(self.endpoint.clone())
}
}

View File

@@ -1,7 +1,6 @@
//! Harness registry — maps harness names to trait implementations.
pub mod llamacpp;
pub mod mistralrs;
pub mod candle;
use anyhow::Result;
use cortex_core::harness::{Harness, HarnessConfig, ModelInfo, ModelSpec};
@@ -81,19 +80,16 @@ impl HarnessRegistry {
}
/// Build a registry from harness configs.
pub fn from_configs(configs: &[HarnessConfig]) -> Self {
///
/// `bind_url` is the URL where this neuron serves inference (its own
/// listen address). In-process harnesses (currently the only kind)
/// return this URL from `inference_endpoint`.
pub fn from_configs(configs: &[HarnessConfig], bind_url: &str) -> Self {
let mut registry = Self::new();
for config in configs {
match config.name.as_str() {
"mistralrs" => {
if let Some(endpoint) = &config.endpoint {
registry.register(Box::new(mistralrs::MistralRsHarness::new(
endpoint.clone(),
config.systemd_unit.clone(),
)));
} else {
tracing::warn!("mistralrs harness missing endpoint, skipping");
}
"candle" => {
registry.register(Box::new(candle::CandleHarness::new(bind_url.to_string())));
}
other => {
tracing::warn!(harness = other, "unknown harness type, skipping");

View File

@@ -37,6 +37,7 @@ async fn main() -> Result<()> {
});
let port = args.port.unwrap_or(cfg.port);
let bind_url = format!("http://localhost:{port}");
let start_time = Instant::now();
tracing::info!("running hardware discovery");
@@ -47,8 +48,10 @@ async fn main() -> Result<()> {
"discovery complete"
);
// Build harness registry from config.
let registry = HarnessRegistry::from_configs(&cfg.harnesses);
// Build harness registry from config. In-process harnesses (candle)
// need to know neuron's own bind URL so they can return it from
// inference_endpoint.
let registry = HarnessRegistry::from_configs(&cfg.harnesses, &bind_url);
discovery_result.harnesses = registry.names();
let health_cache = Arc::new(health::HealthCache::new());

View File

@@ -135,50 +135,18 @@ async fn test_models_empty_registry() {
assert!(body.as_array().unwrap().is_empty());
}
/// Spawn a mock mistral.rs backend and a neuron with the mistralrs harness
/// pointing at it, then test the full model lifecycle through neuron's API.
/// Verify the candle harness registers and the load endpoint returns a
/// "not implemented" error in Stage 1 (Stage 2 wires up actual loading).
#[tokio::test]
async fn test_models_via_mistralrs_harness() {
use axum::routing::{get, post};
use axum::{Json, Router};
async fn test_candle_harness_registers_but_load_unimplemented() {
use cortex_core::harness::HarnessConfig;
use serde_json::Value;
// Mock mistral.rs backend.
let mock_app = Router::new()
.route(
"/v1/models",
get(|| async {
Json(json!({
"data": [
{"id": "test-model", "status": "loaded"},
{"id": "other-model", "status": "unloaded"}
]
}))
}),
)
.route(
"/v1/models/unload",
post(|Json(_body): Json<Value>| async { Json(json!({"status": "ok"})) }),
)
.route(
"/v1/models/reload",
post(|Json(_body): Json<Value>| async { Json(json!({"status": "ok"})) }),
);
let mock_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let mock_addr = mock_listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(mock_listener, mock_app).await.unwrap();
});
let mock_url = format!("http://{mock_addr}");
// Build neuron with mistralrs harness pointing at mock.
let registry = HarnessRegistry::from_configs(&[HarnessConfig {
name: "mistralrs".into(),
endpoint: Some(mock_url.clone()),
systemd_unit: None,
}]);
let registry = HarnessRegistry::from_configs(
&[HarnessConfig {
name: "candle".into(),
}],
"http://localhost:13131",
);
let health_cache = Arc::new(HealthCache::new());
let state = Arc::new(NeuronState {
@@ -197,7 +165,7 @@ async fn test_models_via_mistralrs_harness() {
let client = reqwest::Client::new();
// GET /models — should return models from mock mistralrs.
// GET /models — candle harness has no models loaded yet.
let resp = client
.get(format!("{neuron_url}/models"))
.send()
@@ -205,45 +173,14 @@ async fn test_models_via_mistralrs_harness() {
.unwrap();
assert_eq!(resp.status(), 200);
let models: Vec<serde_json::Value> = resp.json().await.unwrap();
assert_eq!(models.len(), 2);
assert_eq!(models[0]["id"], "test-model");
assert_eq!(models[0]["harness"], "mistralrs");
assert_eq!(models[0]["status"], "loaded");
assert_eq!(models[1]["id"], "other-model");
assert_eq!(models[1]["status"], "unloaded");
assert!(models.is_empty());
// GET /models/test-model/endpoint — should return mock URL.
let resp = client
.get(format!("{neuron_url}/models/test-model/endpoint"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["url"], mock_url);
// POST /models/unload — should succeed.
let resp = client
.post(format!("{neuron_url}/models/unload"))
.json(&json!({"model_id": "test-model"}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["status"], "unloaded");
// POST /models/load — should succeed.
// POST /models/load — Stage 1 skeleton returns an error.
let resp = client
.post(format!("{neuron_url}/models/load"))
.json(&json!({
"model_id": "test-model",
"harness": "mistralrs"
}))
.json(&json!({"model_id": "some-model", "harness": "candle"}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["status"], "loaded");
assert_eq!(resp.status(), 400);
}