fix(neuron): stream tokens via DecodeStream to avoid UTF-8 panic
When BPE byte-fallback splits a multi-byte UTF-8 char (e.g. an emoji) across multiple tokens, the previous "decode the cumulative token list, byte-slice the delta against a stored prefix" pattern would panic with 'start byte index N is not a char boundary; it is inside <emoji>'. The race: at step N the tokenizer renders the partial bytes as U+FFFD (3 bytes); at step N+1 it can decode the complete codepoint (e.g. 4 bytes for 🌫). `decoded_prefix.len()` from step N then lands inside the codepoint in step N+1's `full` string, and `&str[start..]` panics. Replace with tokenizers' `DecodeStream::step(id)` which maintains an internal byte buffer across token boundaries and only emits when a clean codepoint completes. Applied at all three SSE emission sites: - stream_inference_via_worker (single-GPU CUDA stream) - chat_completion_tp_stream's spawned task (TP stream) - run_inference_streaming (CPU stream) The shared emit helper splits into emit_delta (async, mpsc::send) and emit_delta_blocking (sync, mpsc::blocking_send) so each path keeps its existing send semantics. The old emit_chunk helper that did the unsafe full-decode-and-slice is removed entirely. Observed on beast 2026-05-27 17:49:55 — model emitted 🌫 in a tool-call response after a long agent-zero session; the spawned TP stream task panicked at candle.rs:2648. The model itself stayed healthy (no CUDA fault), only the one streaming request died. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -2183,7 +2183,12 @@ impl CandleHarness {
|
|||||||
let leader_handle = tp_for_task.leader_handle;
|
let leader_handle = tp_for_task.leader_handle;
|
||||||
|
|
||||||
let mut all_tokens: Vec<u32> = Vec::new();
|
let mut all_tokens: Vec<u32> = Vec::new();
|
||||||
let mut decoded_prefix = String::new();
|
// Incremental detokenizer. See the equivalent in
|
||||||
|
// `stream_inference_via_worker` for the why: the old
|
||||||
|
// "full decode + byte-slice delta" pattern panicked on
|
||||||
|
// UTF-8 mid-codepoint boundaries when BPE byte-fallback
|
||||||
|
// split a multi-byte char across tokens.
|
||||||
|
let mut decode_stream = tokenizer.decode_stream(true);
|
||||||
let mut finish_reason = "length".to_string();
|
let mut finish_reason = "length".to_string();
|
||||||
|
|
||||||
'work: {
|
'work: {
|
||||||
@@ -2255,21 +2260,21 @@ impl CandleHarness {
|
|||||||
finish_reason = "stop".into();
|
finish_reason = "stop".into();
|
||||||
} else {
|
} else {
|
||||||
all_tokens.push(next_token);
|
all_tokens.push(next_token);
|
||||||
if !emit_chunk(
|
match decode_stream.step(next_token) {
|
||||||
&all_tokens,
|
Ok(Some(delta)) => {
|
||||||
&mut decoded_prefix,
|
if !emit_delta(&delta, &tx, &id, created, &model_id).await {
|
||||||
&tokenizer,
|
|
||||||
&tx,
|
|
||||||
&id,
|
|
||||||
created,
|
|
||||||
&model_id,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
// Client gone — treat as normal stream end,
|
// Client gone — treat as normal stream end,
|
||||||
// not a failure. No log spam.
|
// not a failure. No log spam.
|
||||||
break 'work;
|
break 'work;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => {}
|
||||||
|
Err(e) => tracing::warn!(
|
||||||
|
model = %model_id,
|
||||||
|
error = %e,
|
||||||
|
"TP stream: decode_stream step failed"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
for index in 0..max_new.saturating_sub(1) {
|
for index in 0..max_new.saturating_sub(1) {
|
||||||
let logits_vec = match pool
|
let logits_vec = match pool
|
||||||
@@ -2333,20 +2338,20 @@ impl CandleHarness {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
all_tokens.push(next_token);
|
all_tokens.push(next_token);
|
||||||
if !emit_chunk(
|
match decode_stream.step(next_token) {
|
||||||
&all_tokens,
|
Ok(Some(delta)) => {
|
||||||
&mut decoded_prefix,
|
if !emit_delta(&delta, &tx, &id, created, &model_id).await {
|
||||||
&tokenizer,
|
|
||||||
&tx,
|
|
||||||
&id,
|
|
||||||
created,
|
|
||||||
&model_id,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
break 'work;
|
break 'work;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(None) => {}
|
||||||
|
Err(e) => tracing::warn!(
|
||||||
|
model = %model_id,
|
||||||
|
error = %e,
|
||||||
|
"TP stream: decode_stream step failed"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2624,29 +2629,22 @@ async fn chat_completion_tp_inner(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Decode the cumulative token list, emit the delta (substring appended
|
/// Send `delta` as a `chat.completion.chunk`. Returns `false` if the
|
||||||
/// since the last chunk) as a `chat.completion.chunk`. Returns `false`
|
/// receiver has hung up — the caller should bail. Empty deltas (the
|
||||||
/// if the receiver has hung up — the caller should bail.
|
/// DecodeStream is buffering an incomplete UTF-8 sequence) are a
|
||||||
|
/// no-op return-true so the caller can treat "no delta yet" and "tx
|
||||||
|
/// still live" uniformly.
|
||||||
#[cfg(feature = "cuda")]
|
#[cfg(feature = "cuda")]
|
||||||
async fn emit_chunk(
|
async fn emit_delta(
|
||||||
all_tokens: &[u32],
|
delta: &str,
|
||||||
decoded_prefix: &mut String,
|
|
||||||
tokenizer: &Tokenizer,
|
|
||||||
tx: &mpsc::Sender<ChatCompletionChunk>,
|
tx: &mpsc::Sender<ChatCompletionChunk>,
|
||||||
id: &str,
|
id: &str,
|
||||||
created: u64,
|
created: u64,
|
||||||
model_id: &str,
|
model_id: &str,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
let full = match tokenizer.decode(all_tokens, true) {
|
if delta.is_empty() {
|
||||||
Ok(s) => s,
|
return true;
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!(error = %e, "TP stream: decode failed");
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
};
|
|
||||||
if full.len() > decoded_prefix.len() {
|
|
||||||
let delta = full[decoded_prefix.len()..].to_string();
|
|
||||||
*decoded_prefix = full;
|
|
||||||
let chunk = ChatCompletionChunk {
|
let chunk = ChatCompletionChunk {
|
||||||
id: id.into(),
|
id: id.into(),
|
||||||
object: "chat.completion.chunk".into(),
|
object: "chat.completion.chunk".into(),
|
||||||
@@ -2661,11 +2659,38 @@ async fn emit_chunk(
|
|||||||
usage: None,
|
usage: None,
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
extra: serde_json::Value::Object(Default::default()),
|
||||||
};
|
};
|
||||||
if tx.send(chunk).await.is_err() {
|
tx.send(chunk).await.is_ok()
|
||||||
return false;
|
}
|
||||||
|
|
||||||
|
/// Sync counterpart of [`emit_delta`] for the CPU path's
|
||||||
|
/// `spawn_blocking` closure. Same shape, `blocking_send` instead of
|
||||||
|
/// `send`. Kept as a separate fn so the async / blocking-send choice
|
||||||
|
/// is local to one place per path.
|
||||||
|
fn emit_delta_blocking(
|
||||||
|
delta: &str,
|
||||||
|
tx: &mpsc::Sender<ChatCompletionChunk>,
|
||||||
|
id: &str,
|
||||||
|
created: u64,
|
||||||
|
model_id: &str,
|
||||||
|
) -> bool {
|
||||||
|
if delta.is_empty() {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
let chunk = ChatCompletionChunk {
|
||||||
true
|
id: id.into(),
|
||||||
|
object: "chat.completion.chunk".into(),
|
||||||
|
created,
|
||||||
|
model: model_id.into(),
|
||||||
|
choices: vec![ChunkChoice {
|
||||||
|
index: 0,
|
||||||
|
delta: json!({ "content": delta }),
|
||||||
|
finish_reason: None,
|
||||||
|
extra: serde_json::Value::Object(Default::default()),
|
||||||
|
}],
|
||||||
|
usage: None,
|
||||||
|
extra: serde_json::Value::Object(Default::default()),
|
||||||
|
};
|
||||||
|
tx.blocking_send(chunk).is_ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Errors returned by `CandleHarness::chat_completion`. The
|
/// Errors returned by `CandleHarness::chat_completion`. The
|
||||||
@@ -2848,7 +2873,13 @@ async fn stream_inference_via_worker(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut all_tokens: Vec<u32> = Vec::new();
|
let mut all_tokens: Vec<u32> = Vec::new();
|
||||||
let mut decoded_prefix = String::new();
|
// Incremental detokenizer. Replaces the old "decode cumulative
|
||||||
|
// tokens, byte-slice the delta against a stored prefix" pattern
|
||||||
|
// that panicked when BPE byte-fallback split a multi-byte UTF-8
|
||||||
|
// sequence (e.g. an emoji) across tokens. `step` returns
|
||||||
|
// `Ok(Some(delta))` only when the trailing bytes form a complete
|
||||||
|
// codepoint; `Ok(None)` while it's buffering an incomplete one.
|
||||||
|
let mut decode_stream = tokenizer.decode_stream(true);
|
||||||
let prompt_len = prompt_tokens.len();
|
let prompt_len = prompt_tokens.len();
|
||||||
let mut finish_reason = "length".to_string();
|
let mut finish_reason = "length".to_string();
|
||||||
|
|
||||||
@@ -2879,18 +2910,14 @@ async fn stream_inference_via_worker(
|
|||||||
finish_reason = "stop".into();
|
finish_reason = "stop".into();
|
||||||
} else {
|
} else {
|
||||||
all_tokens.push(next_token);
|
all_tokens.push(next_token);
|
||||||
if !emit_chunk(
|
match decode_stream.step(next_token) {
|
||||||
&all_tokens,
|
Ok(Some(delta)) => {
|
||||||
&mut decoded_prefix,
|
if !emit_delta(&delta, &tx, &id, created, &model_id).await {
|
||||||
&tokenizer,
|
return Ok(finish_reason);
|
||||||
&tx,
|
}
|
||||||
&id,
|
}
|
||||||
created,
|
Ok(None) => {}
|
||||||
&model_id,
|
Err(e) => tracing::warn!(error = %e, "decode_stream step failed"),
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
return Ok(finish_reason); // Client gone — clean stream end.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for index in 0..max_new.saturating_sub(1) {
|
for index in 0..max_new.saturating_sub(1) {
|
||||||
@@ -2916,20 +2943,16 @@ async fn stream_inference_via_worker(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
all_tokens.push(next_token);
|
all_tokens.push(next_token);
|
||||||
if !emit_chunk(
|
match decode_stream.step(next_token) {
|
||||||
&all_tokens,
|
Ok(Some(delta)) => {
|
||||||
&mut decoded_prefix,
|
if !emit_delta(&delta, &tx, &id, created, &model_id).await {
|
||||||
&tokenizer,
|
|
||||||
&tx,
|
|
||||||
&id,
|
|
||||||
created,
|
|
||||||
&model_id,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
return Ok(finish_reason);
|
return Ok(finish_reason);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(None) => {}
|
||||||
|
Err(e) => tracing::warn!(error = %e, "decode_stream step failed"),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Final chunk carrying finish_reason. Matches the run_inference_streaming
|
// Final chunk carrying finish_reason. Matches the run_inference_streaming
|
||||||
@@ -3035,50 +3058,30 @@ fn run_inference_streaming(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut all_tokens: Vec<u32> = Vec::new();
|
let mut all_tokens: Vec<u32> = Vec::new();
|
||||||
let mut decoded_prefix = String::new();
|
// Incremental detokenizer. See `stream_inference_via_worker` for
|
||||||
|
// the same reasoning — `tokenizer.decode_stream(true).step(id)`
|
||||||
|
// buffers incomplete multi-byte UTF-8 sequences across token
|
||||||
|
// boundaries and only emits when a clean codepoint completes.
|
||||||
|
let mut decode_stream = tokenizer.decode_stream(true);
|
||||||
let mut finish_reason = "length".to_string();
|
let mut finish_reason = "length".to_string();
|
||||||
|
|
||||||
arch.clear_kv_cache()?;
|
arch.clear_kv_cache()?;
|
||||||
let logits = chunked_prefill_local(arch, device, prompt_tokens)?;
|
let logits = chunked_prefill_local(arch, device, prompt_tokens)?;
|
||||||
let mut next_token = sample_with_penalty(&logits, &all_tokens, &mut logits_processor)?;
|
let mut next_token = sample_with_penalty(&logits, &all_tokens, &mut logits_processor)?;
|
||||||
|
|
||||||
let emit_token = |all_tokens: &[u32], decoded_prefix: &mut String| -> Result<bool> {
|
|
||||||
let full = tokenizer
|
|
||||||
.decode(all_tokens, true)
|
|
||||||
.map_err(|e| anyhow::anyhow!("decode: {e}"))?;
|
|
||||||
if full.len() > decoded_prefix.len() {
|
|
||||||
let delta = full[decoded_prefix.len()..].to_string();
|
|
||||||
*decoded_prefix = full;
|
|
||||||
let chunk = ChatCompletionChunk {
|
|
||||||
id: id.into(),
|
|
||||||
object: "chat.completion.chunk".into(),
|
|
||||||
created,
|
|
||||||
model: model_id.into(),
|
|
||||||
choices: vec![ChunkChoice {
|
|
||||||
index: 0,
|
|
||||||
delta: json!({ "content": delta }),
|
|
||||||
finish_reason: None,
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
}],
|
|
||||||
usage: None,
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
};
|
|
||||||
// blocking_send returns Err if the consumer hung up — signal
|
|
||||||
// the caller to stop generating.
|
|
||||||
if tx.blocking_send(chunk).is_err() {
|
|
||||||
return Ok(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(true)
|
|
||||||
};
|
|
||||||
|
|
||||||
if Some(next_token) == eos_id {
|
if Some(next_token) == eos_id {
|
||||||
finish_reason = "stop".into();
|
finish_reason = "stop".into();
|
||||||
} else {
|
} else {
|
||||||
all_tokens.push(next_token);
|
all_tokens.push(next_token);
|
||||||
if !emit_token(&all_tokens, &mut decoded_prefix)? {
|
match decode_stream.step(next_token) {
|
||||||
|
Ok(Some(delta)) => {
|
||||||
|
if !emit_delta_blocking(&delta, tx, id, created, model_id) {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => {}
|
||||||
|
Err(e) => tracing::warn!(error = %e, "stream: decode_stream step failed"),
|
||||||
|
}
|
||||||
|
|
||||||
for index in 0..max_new.saturating_sub(1) {
|
for index in 0..max_new.saturating_sub(1) {
|
||||||
let input = Tensor::new(&[next_token], device)?.unsqueeze(0)?;
|
let input = Tensor::new(&[next_token], device)?.unsqueeze(0)?;
|
||||||
@@ -3089,10 +3092,16 @@ fn run_inference_streaming(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
all_tokens.push(next_token);
|
all_tokens.push(next_token);
|
||||||
if !emit_token(&all_tokens, &mut decoded_prefix)? {
|
match decode_stream.step(next_token) {
|
||||||
|
Ok(Some(delta)) => {
|
||||||
|
if !emit_delta_blocking(&delta, tx, id, created, model_id) {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(None) => {}
|
||||||
|
Err(e) => tracing::warn!(error = %e, "stream: decode_stream step failed"),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let final_chunk = ChatCompletionChunk {
|
let final_chunk = ChatCompletionChunk {
|
||||||
|
|||||||
Reference in New Issue
Block a user