diff --git a/crates/neuron/src/harness/candle.rs b/crates/neuron/src/harness/candle.rs index 12ecb1d..6930fc9 100644 --- a/crates/neuron/src/harness/candle.rs +++ b/crates/neuron/src/harness/candle.rs @@ -134,6 +134,19 @@ pub struct LoadedModel { /// the worker; in that case [`Self::arch`] is `None`. The two /// fields are mutually exclusive. pub arch_handle: Option, + /// Serialises chat-completion requests against this model. Held + /// from the start of `clear_kv_cache` through the last decode + /// step, so concurrent requests can't interleave their KV-cache + /// mutations. Without this, two requests' chunked-prefill + /// `clear → forward(chunk0) → forward(chunk1) → ...` sequences + /// could end up sharing a cache between them — the device worker + /// channel serialises individual jobs, but not the sequence + /// boundary. Observed on benjy 2026-05-27 18:41 when agent-zero's + /// memorize extensions fired in parallel and produced a + /// shape-mismatch failure mid-prefill. Mirrors TpLoadedModel.pool + /// for the TP path (which already had this invariant by accident + /// because the pool lock covered the same window). + pub inference_lock: tokio::sync::Mutex<()>, } impl LoadedModel { @@ -1314,6 +1327,13 @@ impl CandleHarness { return Err(poisoned_error(&model_id)); } + // Serialise concurrent requests against this model. Holds for + // the duration of clear_kv_cache → prefill → decode so two + // requests' chunked-prefill sequences can't interleave on the + // shared KV cache (see `LoadedModel.inference_lock` for the + // observed failure mode). + let _inference_guard = loaded.inference_lock.lock().await; + let result = async { let prompt = format_qwen3_prompt(&request.messages); @@ -1624,13 +1644,21 @@ impl CandleHarness { // Routing parallel to the non-streaming chat_completion: CUDA // goes through the worker (async task), CPU keeps the - // spawn_blocking + Arc> path. + // spawn_blocking + Arc> path. Both branches + // acquire `loaded.inference_lock` from inside the spawned + // task so concurrent stream requests against the same model + // serialise at the request boundary (preventing the + // chunked-prefill KV-cache interleave failure mode). The + // role chunk was already sent above, so the client sees + // immediate "stream open" feedback even when this request + // queues behind another for the lock. if let (Some(worker), Some(handle)) = (loaded.worker.clone(), loaded.arch_handle) { #[cfg(feature = "cuda")] { let prompt_tokens = prompt_tokens.clone(); tokio::spawn( async move { + let _inference_guard = loaded_for_task.inference_lock.lock().await; match stream_inference_via_worker( worker, handle, @@ -1675,6 +1703,10 @@ impl CandleHarness { } else if let Some(arch_arc) = loaded.arch.clone() { tokio::task::spawn_blocking(move || { let _g = span_for_task.enter(); + // `blocking_lock` is safe here: spawn_blocking runs on + // a dedicated thread, not on the async runtime, so + // there's no executor to stall. + let _inference_guard = loaded_for_task.inference_lock.blocking_lock(); let mut guard = arch_arc.blocking_lock(); match run_inference_streaming( &mut guard, @@ -1831,6 +1863,7 @@ impl Harness for CandleHarness { poisoned: AtomicBool::new(false), worker, arch_handle, + inference_lock: tokio::sync::Mutex::new(()), }); let mut models = self.models.write().await;