From bd04d7f580436e5b4d4a77a12a5c840b13b0c74b Mon Sep 17 00:00:00 2001 From: rob thijssen Date: Wed, 27 May 2026 18:01:24 +0300 Subject: [PATCH] fix(neuron): stream tokens via DecodeStream to avoid UTF-8 panic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When BPE byte-fallback splits a multi-byte UTF-8 char (e.g. an emoji) across multiple tokens, the previous "decode the cumulative token list, byte-slice the delta against a stored prefix" pattern would panic with 'start byte index N is not a char boundary; it is inside '. The race: at step N the tokenizer renders the partial bytes as U+FFFD (3 bytes); at step N+1 it can decode the complete codepoint (e.g. 4 bytes for 🌫). `decoded_prefix.len()` from step N then lands inside the codepoint in step N+1's `full` string, and `&str[start..]` panics. Replace with tokenizers' `DecodeStream::step(id)` which maintains an internal byte buffer across token boundaries and only emits when a clean codepoint completes. Applied at all three SSE emission sites: - stream_inference_via_worker (single-GPU CUDA stream) - chat_completion_tp_stream's spawned task (TP stream) - run_inference_streaming (CPU stream) The shared emit helper splits into emit_delta (async, mpsc::send) and emit_delta_blocking (sync, mpsc::blocking_send) so each path keeps its existing send semantics. The old emit_chunk helper that did the unsafe full-decode-and-slice is removed entirely. Observed on beast 2026-05-27 17:49:55 — model emitted 🌫 in a tool-call response after a long agent-zero session; the spawned TP stream task panicked at candle.rs:2648. The model itself stayed healthy (no CUDA fault), only the one streaming request died. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/neuron/src/harness/candle.rs | 253 ++++++++++++++-------------- 1 file changed, 131 insertions(+), 122 deletions(-) diff --git a/crates/neuron/src/harness/candle.rs b/crates/neuron/src/harness/candle.rs index 1320349..c1757e8 100644 --- a/crates/neuron/src/harness/candle.rs +++ b/crates/neuron/src/harness/candle.rs @@ -2183,7 +2183,12 @@ impl CandleHarness { let leader_handle = tp_for_task.leader_handle; let mut all_tokens: Vec = Vec::new(); - let mut decoded_prefix = String::new(); + // Incremental detokenizer. See the equivalent in + // `stream_inference_via_worker` for the why: the old + // "full decode + byte-slice delta" pattern panicked on + // UTF-8 mid-codepoint boundaries when BPE byte-fallback + // split a multi-byte char across tokens. + let mut decode_stream = tokenizer.decode_stream(true); let mut finish_reason = "length".to_string(); 'work: { @@ -2255,20 +2260,20 @@ impl CandleHarness { finish_reason = "stop".into(); } else { all_tokens.push(next_token); - if !emit_chunk( - &all_tokens, - &mut decoded_prefix, - &tokenizer, - &tx, - &id, - created, - &model_id, - ) - .await - { - // Client gone — treat as normal stream end, - // not a failure. No log spam. - break 'work; + match decode_stream.step(next_token) { + Ok(Some(delta)) => { + if !emit_delta(&delta, &tx, &id, created, &model_id).await { + // Client gone — treat as normal stream end, + // not a failure. No log spam. + break 'work; + } + } + Ok(None) => {} + Err(e) => tracing::warn!( + model = %model_id, + error = %e, + "TP stream: decode_stream step failed" + ), } for index in 0..max_new.saturating_sub(1) { @@ -2333,18 +2338,18 @@ impl CandleHarness { break; } all_tokens.push(next_token); - if !emit_chunk( - &all_tokens, - &mut decoded_prefix, - &tokenizer, - &tx, - &id, - created, - &model_id, - ) - .await - { - break 'work; + match decode_stream.step(next_token) { + Ok(Some(delta)) => { + if !emit_delta(&delta, &tx, &id, created, &model_id).await { + break 'work; + } + } + Ok(None) => {} + Err(e) => tracing::warn!( + model = %model_id, + error = %e, + "TP stream: decode_stream step failed" + ), } } } @@ -2624,48 +2629,68 @@ async fn chat_completion_tp_inner( }) } -/// Decode the cumulative token list, emit the delta (substring appended -/// since the last chunk) as a `chat.completion.chunk`. Returns `false` -/// if the receiver has hung up — the caller should bail. +/// Send `delta` as a `chat.completion.chunk`. Returns `false` if the +/// receiver has hung up — the caller should bail. Empty deltas (the +/// DecodeStream is buffering an incomplete UTF-8 sequence) are a +/// no-op return-true so the caller can treat "no delta yet" and "tx +/// still live" uniformly. #[cfg(feature = "cuda")] -async fn emit_chunk( - all_tokens: &[u32], - decoded_prefix: &mut String, - tokenizer: &Tokenizer, +async fn emit_delta( + delta: &str, tx: &mpsc::Sender, id: &str, created: u64, model_id: &str, ) -> bool { - let full = match tokenizer.decode(all_tokens, true) { - Ok(s) => s, - Err(e) => { - tracing::warn!(error = %e, "TP stream: decode failed"); - return false; - } - }; - if full.len() > decoded_prefix.len() { - let delta = full[decoded_prefix.len()..].to_string(); - *decoded_prefix = full; - let chunk = ChatCompletionChunk { - id: id.into(), - object: "chat.completion.chunk".into(), - created, - model: model_id.into(), - choices: vec![ChunkChoice { - index: 0, - delta: json!({ "content": delta }), - finish_reason: None, - extra: serde_json::Value::Object(Default::default()), - }], - usage: None, - extra: serde_json::Value::Object(Default::default()), - }; - if tx.send(chunk).await.is_err() { - return false; - } + if delta.is_empty() { + return true; } - true + let chunk = ChatCompletionChunk { + id: id.into(), + object: "chat.completion.chunk".into(), + created, + model: model_id.into(), + choices: vec![ChunkChoice { + index: 0, + delta: json!({ "content": delta }), + finish_reason: None, + extra: serde_json::Value::Object(Default::default()), + }], + usage: None, + extra: serde_json::Value::Object(Default::default()), + }; + tx.send(chunk).await.is_ok() +} + +/// Sync counterpart of [`emit_delta`] for the CPU path's +/// `spawn_blocking` closure. Same shape, `blocking_send` instead of +/// `send`. Kept as a separate fn so the async / blocking-send choice +/// is local to one place per path. +fn emit_delta_blocking( + delta: &str, + tx: &mpsc::Sender, + id: &str, + created: u64, + model_id: &str, +) -> bool { + if delta.is_empty() { + return true; + } + let chunk = ChatCompletionChunk { + id: id.into(), + object: "chat.completion.chunk".into(), + created, + model: model_id.into(), + choices: vec![ChunkChoice { + index: 0, + delta: json!({ "content": delta }), + finish_reason: None, + extra: serde_json::Value::Object(Default::default()), + }], + usage: None, + extra: serde_json::Value::Object(Default::default()), + }; + tx.blocking_send(chunk).is_ok() } /// Errors returned by `CandleHarness::chat_completion`. The @@ -2848,7 +2873,13 @@ async fn stream_inference_via_worker( }; let mut all_tokens: Vec = Vec::new(); - let mut decoded_prefix = String::new(); + // Incremental detokenizer. Replaces the old "decode cumulative + // tokens, byte-slice the delta against a stored prefix" pattern + // that panicked when BPE byte-fallback split a multi-byte UTF-8 + // sequence (e.g. an emoji) across tokens. `step` returns + // `Ok(Some(delta))` only when the trailing bytes form a complete + // codepoint; `Ok(None)` while it's buffering an incomplete one. + let mut decode_stream = tokenizer.decode_stream(true); let prompt_len = prompt_tokens.len(); let mut finish_reason = "length".to_string(); @@ -2879,18 +2910,14 @@ async fn stream_inference_via_worker( finish_reason = "stop".into(); } else { all_tokens.push(next_token); - if !emit_chunk( - &all_tokens, - &mut decoded_prefix, - &tokenizer, - &tx, - &id, - created, - &model_id, - ) - .await - { - return Ok(finish_reason); // Client gone — clean stream end. + match decode_stream.step(next_token) { + Ok(Some(delta)) => { + if !emit_delta(&delta, &tx, &id, created, &model_id).await { + return Ok(finish_reason); + } + } + Ok(None) => {} + Err(e) => tracing::warn!(error = %e, "decode_stream step failed"), } for index in 0..max_new.saturating_sub(1) { @@ -2916,18 +2943,14 @@ async fn stream_inference_via_worker( break; } all_tokens.push(next_token); - if !emit_chunk( - &all_tokens, - &mut decoded_prefix, - &tokenizer, - &tx, - &id, - created, - &model_id, - ) - .await - { - return Ok(finish_reason); + match decode_stream.step(next_token) { + Ok(Some(delta)) => { + if !emit_delta(&delta, &tx, &id, created, &model_id).await { + return Ok(finish_reason); + } + } + Ok(None) => {} + Err(e) => tracing::warn!(error = %e, "decode_stream step failed"), } } } @@ -3035,49 +3058,29 @@ fn run_inference_streaming( }; let mut all_tokens: Vec = Vec::new(); - let mut decoded_prefix = String::new(); + // Incremental detokenizer. See `stream_inference_via_worker` for + // the same reasoning — `tokenizer.decode_stream(true).step(id)` + // buffers incomplete multi-byte UTF-8 sequences across token + // boundaries and only emits when a clean codepoint completes. + let mut decode_stream = tokenizer.decode_stream(true); let mut finish_reason = "length".to_string(); arch.clear_kv_cache()?; let logits = chunked_prefill_local(arch, device, prompt_tokens)?; let mut next_token = sample_with_penalty(&logits, &all_tokens, &mut logits_processor)?; - let emit_token = |all_tokens: &[u32], decoded_prefix: &mut String| -> Result { - let full = tokenizer - .decode(all_tokens, true) - .map_err(|e| anyhow::anyhow!("decode: {e}"))?; - if full.len() > decoded_prefix.len() { - let delta = full[decoded_prefix.len()..].to_string(); - *decoded_prefix = full; - let chunk = ChatCompletionChunk { - id: id.into(), - object: "chat.completion.chunk".into(), - created, - model: model_id.into(), - choices: vec![ChunkChoice { - index: 0, - delta: json!({ "content": delta }), - finish_reason: None, - extra: serde_json::Value::Object(Default::default()), - }], - usage: None, - extra: serde_json::Value::Object(Default::default()), - }; - // blocking_send returns Err if the consumer hung up — signal - // the caller to stop generating. - if tx.blocking_send(chunk).is_err() { - return Ok(false); - } - } - Ok(true) - }; - if Some(next_token) == eos_id { finish_reason = "stop".into(); } else { all_tokens.push(next_token); - if !emit_token(&all_tokens, &mut decoded_prefix)? { - return Ok(()); + match decode_stream.step(next_token) { + Ok(Some(delta)) => { + if !emit_delta_blocking(&delta, tx, id, created, model_id) { + return Ok(()); + } + } + Ok(None) => {} + Err(e) => tracing::warn!(error = %e, "stream: decode_stream step failed"), } for index in 0..max_new.saturating_sub(1) { @@ -3089,8 +3092,14 @@ fn run_inference_streaming( break; } all_tokens.push(next_token); - if !emit_token(&all_tokens, &mut decoded_prefix)? { - return Ok(()); + match decode_stream.step(next_token) { + Ok(Some(delta)) => { + if !emit_delta_blocking(&delta, tx, id, created, model_id) { + return Ok(()); + } + } + Ok(None) => {} + Err(e) => tracing::warn!(error = %e, "stream: decode_stream step failed"), } } }