From 0a1cfcd4d03f6210cd5e403147e59af52246caac Mon Sep 17 00:00:00 2001 From: rob thijssen Date: Tue, 26 May 2026 12:25:11 +0300 Subject: [PATCH] feat(neuron,candle): req_id spans, terminal failure logs, pool-lock warnings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every chat completion path (single-GPU + TP, streaming + non-streaming) now opens an `info_span!("chat", req_id=…, model=…)`. The fmt subscriber prefixes every event with that span so `grep req_id=…` over journalctl reconstructs one request even when dozens overlap. Every path also emits a terminal log line on both success ("done", with prompt_tokens/completion_tokens/finish_reason/total_ms) and failure ("failed", with full anyhow chain + total_ms). Failures used to vanish silently — a request that hit a CUDA OOM left "starting" in the journal and no further trace. New `acquire_pool_lock` helper replaces the bare `tp.pool.lock().await` in both TP paths. It warns at 2s ("still waiting on pool lock") and re-warns every 2s thereafter, so queued requests stuck behind a deadlocked holder are visible immediately instead of looking like idle silence. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/neuron/src/harness/candle.rs | 575 +++++++++++++++++++--------- 1 file changed, 386 insertions(+), 189 deletions(-) diff --git a/crates/neuron/src/harness/candle.rs b/crates/neuron/src/harness/candle.rs index b431542..654cf47 100644 --- a/crates/neuron/src/harness/candle.rs +++ b/crates/neuron/src/harness/candle.rs @@ -29,9 +29,12 @@ use serde_json::json; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; +#[cfg(feature = "cuda")] +use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; use tokenizers::Tokenizer; use tokio::sync::{Mutex, RwLock, mpsc}; +use tracing::Instrument; /// In-process candle harness. Owns the loaded model registry. pub struct CandleHarness { @@ -351,6 +354,64 @@ fn resolve_hf_cache(explicit: Option) -> Option { None } +/// A short hex tag used to group every log line emitted on behalf of +/// one chat-completion request. Six hex digits is unique enough across +/// a 4-hour journal window (24 bits ≈ 16M values, while a busy neuron +/// sees ~10³ requests/hour) and fits cleanly inside `req_id=…` in the +/// fmt subscriber's span-prefix output. +fn new_req_id() -> String { + format!("{:06x}", unix_subsec_nanos() & 0xFFFFFF) +} + +/// Threshold above which `pool.lock().await` blocking is interesting +/// enough to warn about. Healthy concurrent requests serialise behind +/// the pool in single-digit ms — anything past 2 seconds is either a +/// huge in-flight prompt or, more often, a stuck request holding the +/// lock against a poisoned CUDA context. See the 2026-05-26 4-hour +/// silence on beast where dozens of requests piled up invisibly here. +#[cfg(feature = "cuda")] +const POOL_LOCK_WARN_THRESHOLD: Duration = Duration::from_secs(2); + +/// Acquire the TP pool lock, emitting a warn-level breadcrumb if the +/// wait exceeds [`POOL_LOCK_WARN_THRESHOLD`]. Wrapped in a helper so +/// the warn happens at the call site — the request whose lock-wait is +/// slow is the one that knows its prompt_len and other context. +#[cfg(feature = "cuda")] +async fn acquire_pool_lock( + pool: &tokio::sync::Mutex, + model_id: &str, +) -> tokio::sync::MutexGuard<'_, super::tp::WorkerPool> { + let start = std::time::Instant::now(); + // Tick once at the threshold so a stuck request shows up in + // journalctl even while it's still waiting. Without this the wait + // looks like silence in the log right up until the lock is freed. + tokio::pin! { + let lock = pool.lock(); + } + loop { + tokio::select! { + guard = &mut lock => { + let elapsed = start.elapsed(); + if elapsed >= POOL_LOCK_WARN_THRESHOLD { + tracing::warn!( + model = %model_id, + waited_ms = elapsed.as_millis(), + "TP chat_completion: pool lock acquired after long wait" + ); + } + return guard; + } + _ = tokio::time::sleep(POOL_LOCK_WARN_THRESHOLD) => { + tracing::warn!( + model = %model_id, + waited_ms = start.elapsed().as_millis(), + "TP chat_completion: still waiting on pool lock" + ); + } + } + } +} + /// Apply the repetition penalty (if any) to the prediction logits and /// then sample. Centralises the prefill / generation-loop call sites /// so they share identical sampling behaviour. @@ -746,76 +807,119 @@ impl CandleHarness { } }; - let prompt = format_qwen3_prompt(&request.messages); - - let encoding = loaded - .tokenizer - .encode(prompt.as_str(), true) - .map_err(|e| InferenceError::Other(anyhow::anyhow!("tokenize: {e}")))?; - let prompt_tokens: Vec = encoding.get_ids().to_vec(); - let prompt_len = prompt_tokens.len(); - - let temperature = request.temperature.unwrap_or(0.7); - let top_p = request.top_p; - let max_new = request.max_tokens.unwrap_or(512) as usize; - let seed = unix_subsec_nanos(); - - let eos_id = loaded - .tokenizer - .token_to_id("<|im_end|>") - .or_else(|| loaded.tokenizer.token_to_id("<|endoftext|>")); - - let arch_arc = Arc::clone(&loaded.arch); - let device = loaded.device.clone(); + // Span every line of this request with a short req_id + + // model so `grep req_id=…` over the journal can reconstruct + // one request even when dozens overlap. Add a terminal log + // line on both success and failure — the single-GPU path + // used to log nothing on either side, so a failing request + // looked exactly like an idle neuron. + let req_id = new_req_id(); let model_id = request.model.clone(); + let span = tracing::info_span!("chat", req_id = %req_id, model = %model_id); + let req_start = std::time::Instant::now(); - let (generated_ids, finish_reason) = - tokio::task::spawn_blocking(move || -> Result<(Vec, String)> { - let mut guard = arch_arc.blocking_lock(); - run_inference( - &mut guard, - &device, - &prompt_tokens, - max_new, - temperature, - top_p, - seed, - eos_id, - ) - }) - .await - .map_err(|e| InferenceError::Other(anyhow::anyhow!("inference task panicked: {e}")))? - .map_err(InferenceError::Other)?; + let result = async { + let prompt = format_qwen3_prompt(&request.messages); - let completion_text = loaded - .tokenizer - .decode(&generated_ids, true) - .map_err(|e| InferenceError::Other(anyhow::anyhow!("detokenize: {e}")))?; + let encoding = loaded + .tokenizer + .encode(prompt.as_str(), true) + .map_err(|e| InferenceError::Other(anyhow::anyhow!("tokenize: {e}")))?; + let prompt_tokens: Vec = encoding.get_ids().to_vec(); + let prompt_len = prompt_tokens.len(); - let usage = Usage { - prompt_tokens: prompt_len as u64, - completion_tokens: generated_ids.len() as u64, - total_tokens: (prompt_len + generated_ids.len()) as u64, - }; + let temperature = request.temperature.unwrap_or(0.7); + let top_p = request.top_p; + let max_new = request.max_tokens.unwrap_or(512) as usize; + let seed = unix_subsec_nanos(); - Ok(ChatCompletionResponse { - id: format!("chatcmpl-{:x}", unix_subsec_nanos()), - object: "chat.completion".into(), - created: unix_now_secs(), - model: model_id, - choices: vec![ChatCompletionChoice { - index: 0, - message: ChatMessage { - role: "assistant".into(), - content: MessageContent::Text(completion_text), + let eos_id = loaded + .tokenizer + .token_to_id("<|im_end|>") + .or_else(|| loaded.tokenizer.token_to_id("<|endoftext|>")); + + tracing::info!( + prompt_len, + max_new, + temperature, + ?top_p, + ?eos_id, + "chat_completion: starting" + ); + + let arch_arc = Arc::clone(&loaded.arch); + let device = loaded.device.clone(); + + let (generated_ids, finish_reason) = + tokio::task::spawn_blocking(move || -> Result<(Vec, String)> { + let mut guard = arch_arc.blocking_lock(); + run_inference( + &mut guard, + &device, + &prompt_tokens, + max_new, + temperature, + top_p, + seed, + eos_id, + ) + }) + .await + .map_err(|e| { + InferenceError::Other(anyhow::anyhow!("inference task panicked: {e}")) + })? + .map_err(InferenceError::Other)?; + + let completion_text = loaded + .tokenizer + .decode(&generated_ids, true) + .map_err(|e| InferenceError::Other(anyhow::anyhow!("detokenize: {e}")))?; + + let usage = Usage { + prompt_tokens: prompt_len as u64, + completion_tokens: generated_ids.len() as u64, + total_tokens: (prompt_len + generated_ids.len()) as u64, + }; + + tracing::info!( + prompt_tokens = prompt_len, + completion_tokens = generated_ids.len(), + finish_reason = %finish_reason, + total_ms = req_start.elapsed().as_millis(), + "chat_completion: done" + ); + + Ok::<_, InferenceError>(ChatCompletionResponse { + id: format!("chatcmpl-{:x}", unix_subsec_nanos()), + object: "chat.completion".into(), + created: unix_now_secs(), + model: request.model.clone(), + choices: vec![ChatCompletionChoice { + index: 0, + message: ChatMessage { + role: "assistant".into(), + content: MessageContent::Text(completion_text), + extra: serde_json::Value::Object(Default::default()), + }, + finish_reason: Some(finish_reason), extra: serde_json::Value::Object(Default::default()), - }, - finish_reason: Some(finish_reason), + }], + usage: Some(usage), extra: serde_json::Value::Object(Default::default()), - }], - usage: Some(usage), - extra: serde_json::Value::Object(Default::default()), - }) + }) + } + .instrument(span.clone()) + .await; + + if let Err(ref e) = result { + let _g = span.enter(); + tracing::error!( + error = %format!("{e:#}"), + total_ms = req_start.elapsed().as_millis(), + "chat_completion: failed" + ); + } + result } /// Run a streaming chat completion against a loaded model. @@ -903,9 +1007,30 @@ impl CandleHarness { .await .map_err(|_| InferenceError::Other(anyhow::anyhow!("client disconnected")))?; + // Span context — spawn_blocking detaches from the async + // executor so we capture the span explicitly and re-enter it + // inside the closure to keep the req_id on every emitted line. + let req_id = new_req_id(); + let span = tracing::info_span!("chat_stream", req_id = %req_id, model = %model_id); + let prompt_len = prompt_tokens.len(); + let req_start = std::time::Instant::now(); + let span_for_starting = span.clone(); + let span_for_task = span.clone(); + { + let _g = span_for_starting.enter(); + tracing::info!( + prompt_len, + max_new, + temperature, + ?top_p, + ?eos_id, + "chat_completion (stream): starting" + ); + } tokio::task::spawn_blocking(move || { + let _g = span_for_task.enter(); let mut guard = arch_arc.blocking_lock(); - if let Err(e) = run_inference_streaming( + match run_inference_streaming( &mut guard, &device, &tokenizer, @@ -920,7 +1045,17 @@ impl CandleHarness { &model_id, &tx, ) { - tracing::warn!(model = %model_id, error = %e, "streaming inference failed"); + Ok(()) => tracing::info!( + prompt_tokens = prompt_len, + total_ms = req_start.elapsed().as_millis(), + "chat_completion (stream): done" + ), + Err(e) => tracing::error!( + error = %format!("{e:#}"), + prompt_tokens = prompt_len, + total_ms = req_start.elapsed().as_millis(), + "chat_completion (stream): failed" + ), } }); @@ -1187,13 +1322,32 @@ impl CandleHarness { tp: Arc, request: ChatCompletionRequest, ) -> Result { - let handle = tokio::spawn(chat_completion_tp_inner(tp, request)); - match handle.await { - Ok(result) => result, + // Tag every line of this request with a short req_id so a + // grep over journalctl reconstructs one request even when + // dozens are queued and interleaved. The span prefix is added + // by the fmt subscriber to every event emitted within the + // instrumented future, including events from `WorkerPool::*` + // since those run on the leader's task. + let req_id = new_req_id(); + let model_id = request.model.clone(); + let span = tracing::info_span!("tp_chat", req_id = %req_id, model = %model_id); + let req_start = std::time::Instant::now(); + let handle = tokio::spawn(chat_completion_tp_inner(tp, request).instrument(span.clone())); + let result = match handle.await { + Ok(r) => r, Err(join_err) => Err(InferenceError::Other(anyhow::anyhow!( "TP inference task panicked or was cancelled: {join_err}" ))), + }; + if let Err(ref e) = result { + let _g = span.enter(); + tracing::error!( + error = %format!("{e:#}"), + total_ms = req_start.elapsed().as_millis(), + "TP chat_completion: failed" + ); } + result } /// Streaming counterpart to `chat_completion_tp`. Same per-step @@ -1263,143 +1417,189 @@ impl CandleHarness { // The orchestration task. Holds the pool lock for the lifetime // of this inference; concurrent requests against the same TP // model serialise behind it. + // + // Tagged with the same req_id span as the non-streaming path + // so the journal can be reconstructed regardless of which API + // surface the client hit. + let req_id = new_req_id(); + let span = tracing::info_span!( + "tp_chat_stream", + req_id = %req_id, + model = %model_id + ); + let req_start = std::time::Instant::now(); + tracing::info!( + parent: &span, + prompt_len, + max_new, + temperature, + ?top_p, + ?eos_id, + "TP chat_completion (stream): starting" + ); let tp_for_task = Arc::clone(&tp); - tokio::spawn(async move { - let mut pool = tp_for_task.pool.lock().await; - let leader_arc = tp_for_task.leader_model.clone(); + tokio::spawn( + async move { + let mut failure: Option = None; + let mut pool = acquire_pool_lock(&tp_for_task.pool, &model_id).await; + let leader_arc = tp_for_task.leader_model.clone(); - if let Err(e) = pool.clear_kv_cache(&model_id, leader_arc.clone()).await { - tracing::warn!(model = %model_id, error = %e, "TP stream: clear_kv_cache failed"); - return; - } + let mut all_tokens: Vec = Vec::new(); + let mut decoded_prefix = String::new(); + let mut finish_reason = "length".to_string(); - let mut logits_processor = { - let sampling = if temperature <= 0.0 { - Sampling::ArgMax - } else { - match top_p { - Some(p) => Sampling::TopP { p, temperature }, - None => Sampling::All { temperature }, + 'work: { + if let Err(e) = pool.clear_kv_cache(&model_id, leader_arc.clone()).await { + failure = Some(format!("clear_kv_cache: {e:#}")); + break 'work; } - }; - LogitsProcessor::from_sampling(seed, sampling) - }; - let mut all_tokens: Vec = Vec::new(); - let mut decoded_prefix = String::new(); - let mut finish_reason = "length".to_string(); + let mut logits_processor = { + let sampling = if temperature <= 0.0 { + Sampling::ArgMax + } else { + match top_p { + Some(p) => Sampling::TopP { p, temperature }, + None => Sampling::All { temperature }, + } + }; + LogitsProcessor::from_sampling(seed, sampling) + }; - // Prefill — every rank embeds the prompt, offset = 0. - let logits = match pool - .generate_step(&model_id, leader_arc.clone(), prompt_tokens.clone(), 0) - .await - { - Ok(l) => l, - Err(e) => { - tracing::warn!(model = %model_id, error = %e, "TP stream: prefill failed"); - return; - } - }; - let mut next_token = match sample_with_penalty( - &logits, - &all_tokens, - &mut logits_processor, - ) { - Ok(t) => t, - Err(e) => { - tracing::warn!(model = %model_id, error = %e, "TP stream: prefill sample failed"); - return; - } - }; - - if Some(next_token) == eos_id { - finish_reason = "stop".into(); - } else { - all_tokens.push(next_token); - if !emit_chunk( - &all_tokens, - &mut decoded_prefix, - &tokenizer, - &tx, - &id, - created, - &model_id, - ) - .await - { - return; - } - - for index in 0..max_new.saturating_sub(1) { + // Prefill — every rank embeds the prompt, offset = 0. let logits = match pool - .generate_step( - &model_id, - leader_arc.clone(), - vec![next_token], - prompt_len + index, - ) + .generate_step(&model_id, leader_arc.clone(), prompt_tokens.clone(), 0) .await { Ok(l) => l, Err(e) => { - tracing::warn!( - model = %model_id, - error = %e, - "TP stream: decode step failed" - ); - return; + failure = Some(format!("prefill: {e:#}")); + break 'work; } }; - next_token = + let mut next_token = match sample_with_penalty(&logits, &all_tokens, &mut logits_processor) { Ok(t) => t, Err(e) => { - tracing::warn!( - model = %model_id, - error = %e, - "TP stream: decode sample failed" - ); - return; + failure = Some(format!("prefill sample: {e:#}")); + break 'work; } }; + if Some(next_token) == eos_id { finish_reason = "stop".into(); - break; - } - all_tokens.push(next_token); - if !emit_chunk( - &all_tokens, - &mut decoded_prefix, - &tokenizer, - &tx, - &id, - created, - &model_id, - ) - .await - { - return; + } else { + all_tokens.push(next_token); + if !emit_chunk( + &all_tokens, + &mut decoded_prefix, + &tokenizer, + &tx, + &id, + created, + &model_id, + ) + .await + { + // Client gone — treat as normal stream end, + // not a failure. No log spam. + break 'work; + } + + for index in 0..max_new.saturating_sub(1) { + let logits = match pool + .generate_step( + &model_id, + leader_arc.clone(), + vec![next_token], + prompt_len + index, + ) + .await + { + Ok(l) => l, + Err(e) => { + failure = Some(format!("decode step {index}: {e:#}")); + break 'work; + } + }; + next_token = match sample_with_penalty( + &logits, + &all_tokens, + &mut logits_processor, + ) { + Ok(t) => t, + Err(e) => { + failure = Some(format!("decode sample {index}: {e:#}")); + break 'work; + } + }; + if Some(next_token) == eos_id { + finish_reason = "stop".into(); + break; + } + all_tokens.push(next_token); + if !emit_chunk( + &all_tokens, + &mut decoded_prefix, + &tokenizer, + &tx, + &id, + created, + &model_id, + ) + .await + { + break 'work; + } + } } } - } - // Final chunk carrying finish_reason. - let final_chunk = ChatCompletionChunk { - id: id.clone(), - object: "chat.completion.chunk".into(), - created, - model: model_id.clone(), - choices: vec![ChunkChoice { - index: 0, - delta: serde_json::Value::Object(Default::default()), - finish_reason: Some(finish_reason), - extra: serde_json::Value::Object(Default::default()), - }], - usage: None, - extra: serde_json::Value::Object(Default::default()), - }; - let _ = tx.send(final_chunk).await; - }); + // One terminal line per request, success or failure. The + // success branch was previously implicit (the SSE final + // chunk went out and the spawned task just ended); now + // there's always a log line for the operator. + if let Some(err) = &failure { + tracing::error!( + error = %err, + completion_tokens = all_tokens.len(), + total_ms = req_start.elapsed().as_millis(), + "TP chat_completion (stream): failed" + ); + } else { + tracing::info!( + prompt_tokens = prompt_len, + completion_tokens = all_tokens.len(), + finish_reason = %finish_reason, + total_ms = req_start.elapsed().as_millis(), + "TP chat_completion (stream): done" + ); + } + + // Final chunk carrying finish_reason — only on the success + // path. On failure we drop the channel so the client sees + // the SSE stream end abruptly (matches pre-change behaviour + // when the failed-path early-returned without final chunk). + if failure.is_none() { + let final_chunk = ChatCompletionChunk { + id: id.clone(), + object: "chat.completion.chunk".into(), + created, + model: model_id.clone(), + choices: vec![ChunkChoice { + index: 0, + delta: serde_json::Value::Object(Default::default()), + finish_reason: Some(finish_reason), + extra: serde_json::Value::Object(Default::default()), + }], + usage: None, + extra: serde_json::Value::Object(Default::default()), + }; + let _ = tx.send(final_chunk).await; + } + } + .instrument(span), + ); Ok(rx) } @@ -1455,13 +1655,10 @@ async fn chat_completion_tp_inner( // leader_model's own Mutex is acquired step-by-step inside // pool.generate_step (so spawn_blocking can grab it without // holding the pool lock across the blocking_lock call). - let lock_start = std::time::Instant::now(); - let mut pool = tp.pool.lock().await; - tracing::debug!( - model = %model_id, - elapsed_ms = lock_start.elapsed().as_millis(), - "TP chat_completion: pool lock acquired" - ); + // `acquire_pool_lock` warns periodically while we wait so a + // stuck holder doesn't make the queueing requests look like + // silence in the journal. + let mut pool = acquire_pool_lock(&tp.pool, &model_id).await; let leader_arc = tp.leader_model.clone(); // Reset every rank's KV cache so this request doesn't attend