refactor(neuron): introduce InferenceEvent + wire projection layer
Some checks failed
build-prerelease / Resolve version stamps (push) Successful in 31s
CI / Format (push) Successful in 38s
CI / Clippy (push) Successful in 3m28s
build-prerelease / Build neuron-blackwell (push) Failing after 6m4s
build-prerelease / Build neuron-ampere (push) Failing after 7m20s
CI / Test (push) Successful in 7m29s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Build neuron-ada (push) Failing after 4m57s
build-prerelease / Package helexa-neuron-ada RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-ampere RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been skipped
build-prerelease / Build cortex binary (push) Successful in 4m19s
build-prerelease / Package cortex RPM (push) Successful in 1m24s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Has been skipped
Some checks failed
build-prerelease / Resolve version stamps (push) Successful in 31s
CI / Format (push) Successful in 38s
CI / Clippy (push) Successful in 3m28s
build-prerelease / Build neuron-blackwell (push) Failing after 6m4s
build-prerelease / Build neuron-ampere (push) Failing after 7m20s
CI / Test (push) Successful in 7m29s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Build neuron-ada (push) Failing after 4m57s
build-prerelease / Package helexa-neuron-ada RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-ampere RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been skipped
build-prerelease / Build cortex binary (push) Successful in 4m19s
build-prerelease / Package cortex RPM (push) Successful in 1m24s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Has been skipped
Step 1 of the OpenAI Responses API rollout. Pure refactor — no new
endpoints, no behaviour change on the wire. Lays the seam for
emitting Responses-shaped streaming events from the same harness
output as chat completions in Step 2.
- New `neuron::wire` module tree:
- `wire::event::InferenceEvent` — format-agnostic enum
(Start, TextDelta, ReasoningDelta, Finish) the candle harness
now emits as its native streaming currency.
- `wire::event::FinishReason` — typed reason that maps cleanly
onto OpenAI `finish_reason`, OpenAI Responses `status`, and
Anthropic `stop_reason` strings.
- `wire::openai_chat::project_chat_stream` — async task that
consumes an InferenceEvent receiver and produces a
ChatCompletionChunk receiver, stamping per-request metadata
(id, created, model_id) onto every chunk. Output matches the
pre-refactor wire shape bit-for-bit.
- candle.rs refactored to emit InferenceEvent on its internal
channel through all three streaming paths (CPU
run_inference_streaming, CUDA single-GPU stream_inference_via_worker,
CUDA TP chat_completion_tp_stream). The streaming functions lost
their id/created/model_id parameters since wire-format metadata
now lives in the projector.
- emit_delta + emit_delta_blocking simplified to single-purpose
TextDelta emitters with no wire-format coupling.
- chat_completion_stream wraps the InferenceEvent receiver in
wire_chat::project_chat_stream before returning so the
/v1/chat/completions HTTP handler keeps consuming
ChatCompletionChunks unchanged. External signature preserved.
Also fixes a pre-existing helexa-acp test race (three modules each
declared their own static LOCK for HOME mutation, so cross-module
parallelism flaked tests that read HOME at runtime). Consolidated
onto a single crate-wide path_util::ENV_LOCK.
122 helexa-acp tests + 44 neuron tests pass (5 new wire projection
tests). fmt + clippy --workspace -- -D warnings clean. Ran helexa-acp
suite 3x to confirm the env race is closed.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -26,6 +26,16 @@
|
|||||||
|
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
|
/// Process-global lock for tests that mutate `HOME`. Anyone in the
|
||||||
|
/// crate touching `HOME` must hold this for the duration of the
|
||||||
|
/// read-modify-restore window — otherwise concurrent `cargo test`
|
||||||
|
/// workers race and flake.
|
||||||
|
///
|
||||||
|
/// Only built into the test binaries. Production code never mutates
|
||||||
|
/// env vars.
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
|
||||||
|
|
||||||
/// Expand `~`, `~/`, `$HOME`, and `$HOME/` prefixes against the
|
/// Expand `~`, `~/`, `$HOME`, and `$HOME/` prefixes against the
|
||||||
/// current user's home directory. All other inputs pass through
|
/// current user's home directory. All other inputs pass through
|
||||||
/// unchanged.
|
/// unchanged.
|
||||||
@@ -56,13 +66,11 @@ mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
/// Set HOME for the duration of the test. Tests using this run
|
/// Set HOME for the duration of the test. Tests using this run
|
||||||
/// serially under one mutex because env mutation isn't
|
/// serially under the crate-wide [`ENV_LOCK`] because env
|
||||||
/// thread-safe — `cargo test` parallel workers would race
|
/// mutation isn't thread-safe — `cargo test` parallel workers
|
||||||
/// without it.
|
/// would race without it.
|
||||||
fn with_home<F: FnOnce()>(home: &str, body: F) {
|
fn with_home<F: FnOnce()>(home: &str, body: F) {
|
||||||
use std::sync::Mutex;
|
let _g = ENV_LOCK.lock().unwrap();
|
||||||
static LOCK: Mutex<()> = Mutex::new(());
|
|
||||||
let _g = LOCK.lock().unwrap();
|
|
||||||
let prior = std::env::var("HOME").ok();
|
let prior = std::env::var("HOME").ok();
|
||||||
// SAFETY: tests touch process-global env. The mutex
|
// SAFETY: tests touch process-global env. The mutex
|
||||||
// serialises access; sub-threads in other test modules
|
// serialises access; sub-threads in other test modules
|
||||||
@@ -148,10 +156,10 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn no_home_env_passes_through() {
|
fn no_home_env_passes_through() {
|
||||||
// Lock + clear HOME for this one.
|
// Share the same crate-wide lock as `with_home` — otherwise
|
||||||
use std::sync::Mutex;
|
// a parallel test setting HOME races this clear-and-assert
|
||||||
static LOCK: Mutex<()> = Mutex::new(());
|
// window.
|
||||||
let _g = LOCK.lock().unwrap();
|
let _g = ENV_LOCK.lock().unwrap();
|
||||||
let prior = std::env::var("HOME").ok();
|
let prior = std::env::var("HOME").ok();
|
||||||
// SAFETY: serialised by LOCK above.
|
// SAFETY: serialised by LOCK above.
|
||||||
unsafe {
|
unsafe {
|
||||||
|
|||||||
@@ -1251,7 +1251,20 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
// Holds the env lock across an await — the await is the
|
||||||
|
// tool dispatch, which itself re-reads HOME via plan_dir_for.
|
||||||
|
// Releasing the lock would let another test mutate HOME
|
||||||
|
// between this test's setup and the gate's lookup.
|
||||||
|
#[allow(clippy::await_holding_lock)]
|
||||||
async fn plan_mode_allows_write_inside_plan_dir_without_permission() {
|
async fn plan_mode_allows_write_inside_plan_dir_without_permission() {
|
||||||
|
// Plan-mode gate calls store::plan_dir_for at runtime
|
||||||
|
// (which reads HOME). If a parallel test mutates HOME
|
||||||
|
// mid-flight, the gate's plan_dir would differ from the
|
||||||
|
// one we computed up here and the path check would fail.
|
||||||
|
// Share the crate-wide env lock so we and any HOME-mutator
|
||||||
|
// serialise.
|
||||||
|
let _g = crate::path_util::ENV_LOCK.lock().unwrap();
|
||||||
|
|
||||||
// Skip if we can't resolve a plan dir in this environment
|
// Skip if we can't resolve a plan dir in this environment
|
||||||
// (would happen with no HOME / XDG_DATA_HOME — neither
|
// (would happen with no HOME / XDG_DATA_HOME — neither
|
||||||
// realistic in CI nor for an interactive run).
|
// realistic in CI nor for an interactive run).
|
||||||
@@ -1321,11 +1334,9 @@ mod tests {
|
|||||||
// correct *default*; this is the documented exception.
|
// correct *default*; this is the documented exception.
|
||||||
#[allow(clippy::await_holding_lock)]
|
#[allow(clippy::await_holding_lock)]
|
||||||
async fn read_file_expands_tilde_before_dispatch() {
|
async fn read_file_expands_tilde_before_dispatch() {
|
||||||
// HOME mutation is process-global; serialise tests that
|
// HOME mutation is process-global; share the crate-wide
|
||||||
// touch it under a single std::sync::Mutex.
|
// ENV_LOCK with path_util's tests so workers don't race.
|
||||||
use std::sync::Mutex;
|
let _g = crate::path_util::ENV_LOCK.lock().unwrap();
|
||||||
static LOCK: Mutex<()> = Mutex::new(());
|
|
||||||
let _g = LOCK.lock().unwrap();
|
|
||||||
let prior = std::env::var("HOME").ok();
|
let prior = std::env::var("HOME").ok();
|
||||||
unsafe {
|
unsafe {
|
||||||
std::env::set_var("HOME", "/home/me");
|
std::env::set_var("HOME", "/home/me");
|
||||||
|
|||||||
@@ -23,9 +23,10 @@ use candle_transformers::models::qwen3_moe as qwen3_moe_dense;
|
|||||||
use cortex_core::harness::{Harness, HarnessHealth, ModelInfo, ModelSpec};
|
use cortex_core::harness::{Harness, HarnessHealth, ModelInfo, ModelSpec};
|
||||||
use cortex_core::openai::{
|
use cortex_core::openai::{
|
||||||
ChatCompletionChoice, ChatCompletionChunk, ChatCompletionRequest, ChatCompletionResponse,
|
ChatCompletionChoice, ChatCompletionChunk, ChatCompletionRequest, ChatCompletionResponse,
|
||||||
ChatMessage, ChunkChoice, MessageContent, Usage,
|
ChatMessage, MessageContent, Usage,
|
||||||
};
|
};
|
||||||
use serde_json::json;
|
|
||||||
|
use crate::wire::{FinishReason, InferenceEvent, openai_chat as wire_chat};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -1635,36 +1636,24 @@ impl CandleHarness {
|
|||||||
let created = unix_now_secs();
|
let created = unix_now_secs();
|
||||||
|
|
||||||
// Bounded channel so the producer (blocking inference) is back-
|
// Bounded channel so the producer (blocking inference) is back-
|
||||||
// pressured by the consumer (SSE writer). 32 is generous —
|
// pressured by the consumer (SSE writer, via the wire
|
||||||
// tokens arrive one at a time and the SSE writer is async.
|
// projector). 32 is generous — tokens arrive one at a time
|
||||||
let (tx, rx) = mpsc::channel::<ChatCompletionChunk>(32);
|
// and downstream consumption is async.
|
||||||
|
let (tx, event_rx) = mpsc::channel::<InferenceEvent>(32);
|
||||||
|
|
||||||
// Lead chunk: announce the assistant role per OpenAI streaming
|
|
||||||
// conventions. Tools that auto-detect a streaming reply expect
|
|
||||||
// this before any content delta.
|
|
||||||
let role_chunk = ChatCompletionChunk {
|
|
||||||
id: id.clone(),
|
|
||||||
object: "chat.completion.chunk".into(),
|
|
||||||
created,
|
|
||||||
model: model_id.clone(),
|
|
||||||
choices: vec![ChunkChoice {
|
|
||||||
index: 0,
|
|
||||||
delta: json!({"role": "assistant"}),
|
|
||||||
finish_reason: None,
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
}],
|
|
||||||
usage: None,
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
};
|
|
||||||
// Refuse if the model is already poisoned. No point opening
|
// Refuse if the model is already poisoned. No point opening
|
||||||
// an SSE stream just to send the role chunk and then bail.
|
// an SSE stream just to send the Start event and then bail.
|
||||||
if loaded.poisoned.load(Ordering::Acquire) {
|
if loaded.poisoned.load(Ordering::Acquire) {
|
||||||
return Err(poisoned_error(&model_id));
|
return Err(poisoned_error(&model_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
// If sending the role chunk fails the receiver is already gone;
|
// Start event: tells the wire projector to emit its
|
||||||
// bail before kicking off the heavy blocking work.
|
// format-specific "the assistant is about to speak" frame
|
||||||
tx.send(role_chunk)
|
// (an OpenAI `delta: {role: "assistant"}` chunk here; a
|
||||||
|
// `response.created` + `response.output_item.added` pair on
|
||||||
|
// the Responses path). If sending fails the receiver is
|
||||||
|
// already gone; bail before kicking off the heavy work.
|
||||||
|
tx.send(InferenceEvent::Start)
|
||||||
.await
|
.await
|
||||||
.map_err(|_| InferenceError::Other(anyhow::anyhow!("client disconnected")))?;
|
.map_err(|_| InferenceError::Other(anyhow::anyhow!("client disconnected")))?;
|
||||||
|
|
||||||
@@ -1728,9 +1717,6 @@ impl CandleHarness {
|
|||||||
top_p,
|
top_p,
|
||||||
seed,
|
seed,
|
||||||
eos_id,
|
eos_id,
|
||||||
id,
|
|
||||||
created,
|
|
||||||
model_id,
|
|
||||||
tx,
|
tx,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -1787,9 +1773,6 @@ impl CandleHarness {
|
|||||||
top_p,
|
top_p,
|
||||||
seed,
|
seed,
|
||||||
eos_id,
|
eos_id,
|
||||||
&id,
|
|
||||||
created,
|
|
||||||
&model_id,
|
|
||||||
&tx,
|
&tx,
|
||||||
) {
|
) {
|
||||||
Ok(()) => tracing::info!(
|
Ok(()) => tracing::info!(
|
||||||
@@ -1824,6 +1807,12 @@ impl CandleHarness {
|
|||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wrap the InferenceEvent receiver in the OpenAI chat
|
||||||
|
// projection so the HTTP handler keeps receiving
|
||||||
|
// ChatCompletionChunks bit-for-bit identical to before.
|
||||||
|
// The id/created/model_id snapshot taken at request setup
|
||||||
|
// gets stamped into every emitted chunk.
|
||||||
|
let rx = wire_chat::project_chat_stream(event_rx, id, created, model_id);
|
||||||
Ok(rx)
|
Ok(rx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2277,27 +2266,16 @@ impl CandleHarness {
|
|||||||
let created = unix_now_secs();
|
let created = unix_now_secs();
|
||||||
let tokenizer = tp.tokenizer.clone();
|
let tokenizer = tp.tokenizer.clone();
|
||||||
|
|
||||||
// Bounded channel — back-pressures the producer when the SSE
|
// Bounded channel — back-pressures the producer when
|
||||||
// writer is slow.
|
// downstream consumption (wire projector → SSE writer) is
|
||||||
let (tx, rx) = mpsc::channel::<ChatCompletionChunk>(32);
|
// slow.
|
||||||
|
let (tx, event_rx) = mpsc::channel::<InferenceEvent>(32);
|
||||||
|
|
||||||
// Role chunk first, before kicking off the heavy work — if the
|
// Start event first, before kicking off the heavy work — if
|
||||||
// receiver is gone by now there's no point starting inference.
|
// the receiver is gone by now there's no point starting
|
||||||
let role_chunk = ChatCompletionChunk {
|
// inference. The wire projector materialises this as the
|
||||||
id: id.clone(),
|
// OpenAI `delta: {role: "assistant"}` chunk.
|
||||||
object: "chat.completion.chunk".into(),
|
tx.send(InferenceEvent::Start)
|
||||||
created,
|
|
||||||
model: model_id.clone(),
|
|
||||||
choices: vec![ChunkChoice {
|
|
||||||
index: 0,
|
|
||||||
delta: json!({"role": "assistant"}),
|
|
||||||
finish_reason: None,
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
}],
|
|
||||||
usage: None,
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
};
|
|
||||||
tx.send(role_chunk)
|
|
||||||
.await
|
.await
|
||||||
.map_err(|_| InferenceError::Other(anyhow::anyhow!("client disconnected")))?;
|
.map_err(|_| InferenceError::Other(anyhow::anyhow!("client disconnected")))?;
|
||||||
|
|
||||||
@@ -2344,7 +2322,7 @@ impl CandleHarness {
|
|||||||
// UTF-8 mid-codepoint boundaries when BPE byte-fallback
|
// UTF-8 mid-codepoint boundaries when BPE byte-fallback
|
||||||
// split a multi-byte char across tokens.
|
// split a multi-byte char across tokens.
|
||||||
let mut decode_stream = tokenizer.decode_stream(true);
|
let mut decode_stream = tokenizer.decode_stream(true);
|
||||||
let mut finish_reason = "length".to_string();
|
let mut finish_reason = FinishReason::Length;
|
||||||
|
|
||||||
'work: {
|
'work: {
|
||||||
if let Err(e) = pool.clear_kv_cache(&model_id, leader_handle).await {
|
if let Err(e) = pool.clear_kv_cache(&model_id, leader_handle).await {
|
||||||
@@ -2412,12 +2390,12 @@ impl CandleHarness {
|
|||||||
};
|
};
|
||||||
|
|
||||||
if Some(next_token) == eos_id {
|
if Some(next_token) == eos_id {
|
||||||
finish_reason = "stop".into();
|
finish_reason = FinishReason::Stop;
|
||||||
} else {
|
} else {
|
||||||
all_tokens.push(next_token);
|
all_tokens.push(next_token);
|
||||||
match decode_stream.step(next_token) {
|
match decode_stream.step(next_token) {
|
||||||
Ok(Some(delta)) => {
|
Ok(Some(delta)) => {
|
||||||
if !emit_delta(&delta, &tx, &id, created, &model_id).await {
|
if !emit_delta(&delta, &tx).await {
|
||||||
// Client gone — treat as normal stream end,
|
// Client gone — treat as normal stream end,
|
||||||
// not a failure. No log spam.
|
// not a failure. No log spam.
|
||||||
break 'work;
|
break 'work;
|
||||||
@@ -2489,13 +2467,13 @@ impl CandleHarness {
|
|||||||
"TP chat_completion (stream): decode step"
|
"TP chat_completion (stream): decode step"
|
||||||
);
|
);
|
||||||
if Some(next_token) == eos_id {
|
if Some(next_token) == eos_id {
|
||||||
finish_reason = "stop".into();
|
finish_reason = FinishReason::Stop;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
all_tokens.push(next_token);
|
all_tokens.push(next_token);
|
||||||
match decode_stream.step(next_token) {
|
match decode_stream.step(next_token) {
|
||||||
Ok(Some(delta)) => {
|
Ok(Some(delta)) => {
|
||||||
if !emit_delta(&delta, &tx, &id, created, &model_id).await {
|
if !emit_delta(&delta, &tx).await {
|
||||||
break 'work;
|
break 'work;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2535,37 +2513,32 @@ impl CandleHarness {
|
|||||||
tracing::info!(
|
tracing::info!(
|
||||||
prompt_tokens = prompt_len,
|
prompt_tokens = prompt_len,
|
||||||
completion_tokens = all_tokens.len(),
|
completion_tokens = all_tokens.len(),
|
||||||
finish_reason = %finish_reason,
|
finish_reason = finish_reason.as_openai_str(),
|
||||||
total_ms = req_start.elapsed().as_millis(),
|
total_ms = req_start.elapsed().as_millis(),
|
||||||
"TP chat_completion (stream): done"
|
"TP chat_completion (stream): done"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Final chunk carrying finish_reason — only on the success
|
// Finish event — only on the success path. On
|
||||||
// path. On failure we drop the channel so the client sees
|
// failure we drop the channel so the client sees the
|
||||||
// the SSE stream end abruptly (matches pre-change behaviour
|
// SSE stream end abruptly (matches the pre-refactor
|
||||||
// when the failed-path early-returned without final chunk).
|
// behaviour when the failed-path early-returned
|
||||||
|
// without a final chunk).
|
||||||
if failure.is_none() {
|
if failure.is_none() {
|
||||||
let final_chunk = ChatCompletionChunk {
|
let _ = tx
|
||||||
id: id.clone(),
|
.send(InferenceEvent::Finish {
|
||||||
object: "chat.completion.chunk".into(),
|
reason: finish_reason,
|
||||||
created,
|
})
|
||||||
model: model_id.clone(),
|
.await;
|
||||||
choices: vec![ChunkChoice {
|
|
||||||
index: 0,
|
|
||||||
delta: serde_json::Value::Object(Default::default()),
|
|
||||||
finish_reason: Some(finish_reason),
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
}],
|
|
||||||
usage: None,
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
};
|
|
||||||
let _ = tx.send(final_chunk).await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.instrument(span),
|
.instrument(span),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Wrap the InferenceEvent receiver in the OpenAI chat
|
||||||
|
// projection so the HTTP handler keeps consuming
|
||||||
|
// ChatCompletionChunks unchanged.
|
||||||
|
let rx = wire_chat::project_chat_stream(event_rx, id, created, model_id);
|
||||||
Ok(rx)
|
Ok(rx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2793,68 +2766,36 @@ async fn chat_completion_tp_inner(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send `delta` as a `chat.completion.chunk`. Returns `false` if the
|
/// Send `delta` as an [`InferenceEvent::TextDelta`]. Returns `false`
|
||||||
/// receiver has hung up — the caller should bail. Empty deltas (the
|
/// if the receiver has hung up — the caller should bail. Empty
|
||||||
/// DecodeStream is buffering an incomplete UTF-8 sequence) are a
|
/// deltas (the DecodeStream is buffering an incomplete UTF-8
|
||||||
/// no-op return-true so the caller can treat "no delta yet" and "tx
|
/// sequence) are a no-op return-true so the caller can treat "no
|
||||||
/// still live" uniformly.
|
/// delta yet" and "tx still live" uniformly.
|
||||||
|
///
|
||||||
|
/// Wire-format-specific metadata (chunk id, created, model_id)
|
||||||
|
/// stays out of this function — the wire projector in
|
||||||
|
/// [`crate::wire::openai_chat`] stamps it onto every chunk
|
||||||
|
/// downstream.
|
||||||
#[cfg(feature = "cuda")]
|
#[cfg(feature = "cuda")]
|
||||||
async fn emit_delta(
|
async fn emit_delta(delta: &str, tx: &mpsc::Sender<InferenceEvent>) -> bool {
|
||||||
delta: &str,
|
|
||||||
tx: &mpsc::Sender<ChatCompletionChunk>,
|
|
||||||
id: &str,
|
|
||||||
created: u64,
|
|
||||||
model_id: &str,
|
|
||||||
) -> bool {
|
|
||||||
if delta.is_empty() {
|
if delta.is_empty() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
let chunk = ChatCompletionChunk {
|
tx.send(InferenceEvent::TextDelta(delta.into()))
|
||||||
id: id.into(),
|
.await
|
||||||
object: "chat.completion.chunk".into(),
|
.is_ok()
|
||||||
created,
|
|
||||||
model: model_id.into(),
|
|
||||||
choices: vec![ChunkChoice {
|
|
||||||
index: 0,
|
|
||||||
delta: json!({ "content": delta }),
|
|
||||||
finish_reason: None,
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
}],
|
|
||||||
usage: None,
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
};
|
|
||||||
tx.send(chunk).await.is_ok()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sync counterpart of [`emit_delta`] for the CPU path's
|
/// Sync counterpart of [`emit_delta`] for the CPU path's
|
||||||
/// `spawn_blocking` closure. Same shape, `blocking_send` instead of
|
/// `spawn_blocking` closure. Same shape, `blocking_send` instead of
|
||||||
/// `send`. Kept as a separate fn so the async / blocking-send choice
|
/// `send`. Kept as a separate fn so the async / blocking-send choice
|
||||||
/// is local to one place per path.
|
/// is local to one place per path.
|
||||||
fn emit_delta_blocking(
|
fn emit_delta_blocking(delta: &str, tx: &mpsc::Sender<InferenceEvent>) -> bool {
|
||||||
delta: &str,
|
|
||||||
tx: &mpsc::Sender<ChatCompletionChunk>,
|
|
||||||
id: &str,
|
|
||||||
created: u64,
|
|
||||||
model_id: &str,
|
|
||||||
) -> bool {
|
|
||||||
if delta.is_empty() {
|
if delta.is_empty() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
let chunk = ChatCompletionChunk {
|
tx.blocking_send(InferenceEvent::TextDelta(delta.into()))
|
||||||
id: id.into(),
|
.is_ok()
|
||||||
object: "chat.completion.chunk".into(),
|
|
||||||
created,
|
|
||||||
model: model_id.into(),
|
|
||||||
choices: vec![ChunkChoice {
|
|
||||||
index: 0,
|
|
||||||
delta: json!({ "content": delta }),
|
|
||||||
finish_reason: None,
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
}],
|
|
||||||
usage: None,
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
};
|
|
||||||
tx.blocking_send(chunk).is_ok()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Errors returned by `CandleHarness::chat_completion`. The
|
/// Errors returned by `CandleHarness::chat_completion`. The
|
||||||
@@ -3019,10 +2960,7 @@ async fn stream_inference_via_worker(
|
|||||||
top_p: Option<f64>,
|
top_p: Option<f64>,
|
||||||
seed: u64,
|
seed: u64,
|
||||||
eos_id: Option<u32>,
|
eos_id: Option<u32>,
|
||||||
id: String,
|
tx: mpsc::Sender<InferenceEvent>,
|
||||||
created: u64,
|
|
||||||
model_id: String,
|
|
||||||
tx: mpsc::Sender<ChatCompletionChunk>,
|
|
||||||
) -> Result<String> {
|
) -> Result<String> {
|
||||||
let mut logits_processor = {
|
let mut logits_processor = {
|
||||||
let sampling = if temperature <= 0.0 {
|
let sampling = if temperature <= 0.0 {
|
||||||
@@ -3045,7 +2983,7 @@ async fn stream_inference_via_worker(
|
|||||||
// codepoint; `Ok(None)` while it's buffering an incomplete one.
|
// codepoint; `Ok(None)` while it's buffering an incomplete one.
|
||||||
let mut decode_stream = tokenizer.decode_stream(true);
|
let mut decode_stream = tokenizer.decode_stream(true);
|
||||||
let prompt_len = prompt_tokens.len();
|
let prompt_len = prompt_tokens.len();
|
||||||
let mut finish_reason = "length".to_string();
|
let mut finish_reason = FinishReason::Length;
|
||||||
|
|
||||||
worker
|
worker
|
||||||
.clear_kv_cache(handle)
|
.clear_kv_cache(handle)
|
||||||
@@ -3071,13 +3009,13 @@ async fn stream_inference_via_worker(
|
|||||||
};
|
};
|
||||||
|
|
||||||
if Some(next_token) == eos_id {
|
if Some(next_token) == eos_id {
|
||||||
finish_reason = "stop".into();
|
finish_reason = FinishReason::Stop;
|
||||||
} else {
|
} else {
|
||||||
all_tokens.push(next_token);
|
all_tokens.push(next_token);
|
||||||
match decode_stream.step(next_token) {
|
match decode_stream.step(next_token) {
|
||||||
Ok(Some(delta)) => {
|
Ok(Some(delta)) => {
|
||||||
if !emit_delta(&delta, &tx, &id, created, &model_id).await {
|
if !emit_delta(&delta, &tx).await {
|
||||||
return Ok(finish_reason);
|
return Ok(finish_reason.as_openai_str().to_string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {}
|
Ok(None) => {}
|
||||||
@@ -3103,14 +3041,14 @@ async fn stream_inference_via_worker(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
if Some(next_token) == eos_id {
|
if Some(next_token) == eos_id {
|
||||||
finish_reason = "stop".into();
|
finish_reason = FinishReason::Stop;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
all_tokens.push(next_token);
|
all_tokens.push(next_token);
|
||||||
match decode_stream.step(next_token) {
|
match decode_stream.step(next_token) {
|
||||||
Ok(Some(delta)) => {
|
Ok(Some(delta)) => {
|
||||||
if !emit_delta(&delta, &tx, &id, created, &model_id).await {
|
if !emit_delta(&delta, &tx).await {
|
||||||
return Ok(finish_reason);
|
return Ok(finish_reason.as_openai_str().to_string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {}
|
Ok(None) => {}
|
||||||
@@ -3119,25 +3057,16 @@ async fn stream_inference_via_worker(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Final chunk carrying finish_reason. Matches the run_inference_streaming
|
// Terminal Finish event. The wire projector turns this into a
|
||||||
// shape so the SSE consumer sees an identical termination sequence.
|
// format-specific final chunk (`finish_reason: "stop"` on
|
||||||
let final_chunk = ChatCompletionChunk {
|
// OpenAI chat, `response.completed` on Responses).
|
||||||
id: id.clone(),
|
let _ = tx
|
||||||
object: "chat.completion.chunk".into(),
|
.send(InferenceEvent::Finish {
|
||||||
created,
|
reason: finish_reason,
|
||||||
model: model_id.clone(),
|
})
|
||||||
choices: vec![ChunkChoice {
|
.await;
|
||||||
index: 0,
|
|
||||||
delta: serde_json::Value::Object(Default::default()),
|
|
||||||
finish_reason: Some(finish_reason.clone()),
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
}],
|
|
||||||
usage: None,
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
};
|
|
||||||
let _ = tx.send(final_chunk).await;
|
|
||||||
|
|
||||||
Ok(finish_reason)
|
Ok(finish_reason.as_openai_str().to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
@@ -3204,10 +3133,7 @@ fn run_inference_streaming(
|
|||||||
top_p: Option<f64>,
|
top_p: Option<f64>,
|
||||||
seed: u64,
|
seed: u64,
|
||||||
eos_id: Option<u32>,
|
eos_id: Option<u32>,
|
||||||
id: &str,
|
tx: &mpsc::Sender<InferenceEvent>,
|
||||||
created: u64,
|
|
||||||
model_id: &str,
|
|
||||||
tx: &mpsc::Sender<ChatCompletionChunk>,
|
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut logits_processor = {
|
let mut logits_processor = {
|
||||||
let sampling = if temperature <= 0.0 {
|
let sampling = if temperature <= 0.0 {
|
||||||
@@ -3227,19 +3153,19 @@ fn run_inference_streaming(
|
|||||||
// buffers incomplete multi-byte UTF-8 sequences across token
|
// buffers incomplete multi-byte UTF-8 sequences across token
|
||||||
// boundaries and only emits when a clean codepoint completes.
|
// boundaries and only emits when a clean codepoint completes.
|
||||||
let mut decode_stream = tokenizer.decode_stream(true);
|
let mut decode_stream = tokenizer.decode_stream(true);
|
||||||
let mut finish_reason = "length".to_string();
|
let mut finish_reason = FinishReason::Length;
|
||||||
|
|
||||||
arch.clear_kv_cache()?;
|
arch.clear_kv_cache()?;
|
||||||
let logits = chunked_prefill_local(arch, device, prompt_tokens)?;
|
let logits = chunked_prefill_local(arch, device, prompt_tokens)?;
|
||||||
let mut next_token = sample_with_penalty(&logits, &all_tokens, &mut logits_processor)?;
|
let mut next_token = sample_with_penalty(&logits, &all_tokens, &mut logits_processor)?;
|
||||||
|
|
||||||
if Some(next_token) == eos_id {
|
if Some(next_token) == eos_id {
|
||||||
finish_reason = "stop".into();
|
finish_reason = FinishReason::Stop;
|
||||||
} else {
|
} else {
|
||||||
all_tokens.push(next_token);
|
all_tokens.push(next_token);
|
||||||
match decode_stream.step(next_token) {
|
match decode_stream.step(next_token) {
|
||||||
Ok(Some(delta)) => {
|
Ok(Some(delta)) => {
|
||||||
if !emit_delta_blocking(&delta, tx, id, created, model_id) {
|
if !emit_delta_blocking(&delta, tx) {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -3252,13 +3178,13 @@ fn run_inference_streaming(
|
|||||||
let logits = arch.forward(&input, prompt_tokens.len() + index)?;
|
let logits = arch.forward(&input, prompt_tokens.len() + index)?;
|
||||||
next_token = sample_with_penalty(&logits, &all_tokens, &mut logits_processor)?;
|
next_token = sample_with_penalty(&logits, &all_tokens, &mut logits_processor)?;
|
||||||
if Some(next_token) == eos_id {
|
if Some(next_token) == eos_id {
|
||||||
finish_reason = "stop".into();
|
finish_reason = FinishReason::Stop;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
all_tokens.push(next_token);
|
all_tokens.push(next_token);
|
||||||
match decode_stream.step(next_token) {
|
match decode_stream.step(next_token) {
|
||||||
Ok(Some(delta)) => {
|
Ok(Some(delta)) => {
|
||||||
if !emit_delta_blocking(&delta, tx, id, created, model_id) {
|
if !emit_delta_blocking(&delta, tx) {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -3268,21 +3194,9 @@ fn run_inference_streaming(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let final_chunk = ChatCompletionChunk {
|
let _ = tx.blocking_send(InferenceEvent::Finish {
|
||||||
id: id.into(),
|
reason: finish_reason,
|
||||||
object: "chat.completion.chunk".into(),
|
});
|
||||||
created,
|
|
||||||
model: model_id.into(),
|
|
||||||
choices: vec![ChunkChoice {
|
|
||||||
index: 0,
|
|
||||||
delta: serde_json::Value::Object(Default::default()),
|
|
||||||
finish_reason: Some(finish_reason),
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
}],
|
|
||||||
usage: None,
|
|
||||||
extra: serde_json::Value::Object(Default::default()),
|
|
||||||
};
|
|
||||||
let _ = tx.blocking_send(final_chunk);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,3 +6,4 @@ pub mod discovery;
|
|||||||
pub mod harness;
|
pub mod harness;
|
||||||
pub mod health;
|
pub mod health;
|
||||||
pub mod startup;
|
pub mod startup;
|
||||||
|
pub mod wire;
|
||||||
|
|||||||
99
crates/neuron/src/wire/event.rs
Normal file
99
crates/neuron/src/wire/event.rs
Normal file
@@ -0,0 +1,99 @@
|
|||||||
|
//! Format-agnostic inference event stream.
|
||||||
|
//!
|
||||||
|
//! The candle harness emits a sequence of these for every streaming
|
||||||
|
//! request. Wire-format projections in sibling modules
|
||||||
|
//! ([`super::openai_chat`], the eventual `openai_responses` /
|
||||||
|
//! `anthropic_messages` projections) read this stream and produce
|
||||||
|
//! the chunks / events their HTTP clients expect.
|
||||||
|
//!
|
||||||
|
//! Design notes:
|
||||||
|
//!
|
||||||
|
//! - [`Start`] carries no token of its own. It only signals "the
|
||||||
|
//! model has accepted the prompt and is about to begin emitting
|
||||||
|
//! text". OpenAI chat materialises this as a `role: assistant`
|
||||||
|
//! chunk; OpenAI Responses as the `response.created` +
|
||||||
|
//! `response.output_item.added` pair; Anthropic as
|
||||||
|
//! `message_start`. All three of those would otherwise have to
|
||||||
|
//! peek at the *first* token to know when to emit, which couples
|
||||||
|
//! the wire layer to the producer's pacing.
|
||||||
|
//! - [`TextDelta`] is *visible* output. Reasoning / `<think>`
|
||||||
|
//! blocks go through a future [`ReasoningDelta`] variant once
|
||||||
|
//! the harness learns to split them (today they pass through as
|
||||||
|
//! plain text inside `TextDelta`; helexa-acp picks them apart on
|
||||||
|
//! the consumer side).
|
||||||
|
//! - [`Finish`] is the only place a stream is allowed to end
|
||||||
|
//! cleanly. Projections rely on this to emit final usage
|
||||||
|
//! bookkeeping; absence means the producer crashed and the
|
||||||
|
//! consumer should treat the stream as truncated.
|
||||||
|
//!
|
||||||
|
//! [`Start`]: InferenceEvent::Start
|
||||||
|
//! [`TextDelta`]: InferenceEvent::TextDelta
|
||||||
|
//! [`Finish`]: InferenceEvent::Finish
|
||||||
|
|
||||||
|
/// One unit of output from the inference loop.
|
||||||
|
///
|
||||||
|
/// Producers send these on an `mpsc::Sender<InferenceEvent>`;
|
||||||
|
/// projection layers in sibling modules consume them and emit
|
||||||
|
/// wire-format-specific frames downstream.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum InferenceEvent {
|
||||||
|
/// The producer has accepted the prompt and is about to emit
|
||||||
|
/// the first token. Sent at most once per stream.
|
||||||
|
Start,
|
||||||
|
/// A piece of visible assistant text. Multiple deltas
|
||||||
|
/// concatenate into the complete reply.
|
||||||
|
TextDelta(String),
|
||||||
|
/// Reasoning / scratchpad text the model emitted inside a
|
||||||
|
/// `<think>` block (or equivalent). Producers that don't
|
||||||
|
/// surface reasoning separately use [`TextDelta`] for
|
||||||
|
/// everything; future split lives here.
|
||||||
|
///
|
||||||
|
/// Not yet emitted by the candle harness — present so future
|
||||||
|
/// stages (qwen3 `<think>` routing, OpenAI o-series reasoning)
|
||||||
|
/// have a typed home without breaking the existing
|
||||||
|
/// projections.
|
||||||
|
#[allow(dead_code)]
|
||||||
|
ReasoningDelta(String),
|
||||||
|
/// The stream is complete. Carries the reason so wire formats
|
||||||
|
/// that use it (OpenAI's `finish_reason`, Anthropic's
|
||||||
|
/// `stop_reason`) can render it without re-parsing.
|
||||||
|
Finish { reason: FinishReason },
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Why a stream stopped. Stays small on purpose — anything that
|
||||||
|
/// doesn't map cleanly to one of these collapses to [`Stop`].
|
||||||
|
///
|
||||||
|
/// Mappings to wire formats:
|
||||||
|
///
|
||||||
|
/// | variant | OpenAI `finish_reason` | OpenAI Responses `status` | Anthropic `stop_reason` |
|
||||||
|
/// |---------|------------------------|---------------------------|-------------------------|
|
||||||
|
/// | `Stop` | `"stop"` | `"completed"` | `"end_turn"` |
|
||||||
|
/// | `Length`| `"length"` | `"incomplete"` | `"max_tokens"` |
|
||||||
|
/// | `ToolCalls` | `"tool_calls"` | `"completed"` | `"tool_use"` |
|
||||||
|
///
|
||||||
|
/// [`Stop`]: FinishReason::Stop
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub enum FinishReason {
|
||||||
|
/// Model emitted EOS naturally.
|
||||||
|
Stop,
|
||||||
|
/// Hit `max_tokens` before EOS.
|
||||||
|
Length,
|
||||||
|
/// Stopped because the model called a tool and is waiting for
|
||||||
|
/// the result. Not yet emitted by the candle harness —
|
||||||
|
/// reserved for the day tool-call extraction lands.
|
||||||
|
#[allow(dead_code)]
|
||||||
|
ToolCalls,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FinishReason {
|
||||||
|
/// String form used by OpenAI chat completions and OpenAI
|
||||||
|
/// completions. Wire modules can call this directly or do their
|
||||||
|
/// own mapping for non-string formats.
|
||||||
|
pub fn as_openai_str(self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
FinishReason::Stop => "stop",
|
||||||
|
FinishReason::Length => "length",
|
||||||
|
FinishReason::ToolCalls => "tool_calls",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
23
crates/neuron/src/wire/mod.rs
Normal file
23
crates/neuron/src/wire/mod.rs
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
//! Wire-format projection layer.
|
||||||
|
//!
|
||||||
|
//! The candle harness produces a single, format-agnostic stream of
|
||||||
|
//! [`InferenceEvent`]s. Each wire format (OpenAI chat completions,
|
||||||
|
//! OpenAI Responses, Anthropic messages, …) lives in its own module
|
||||||
|
//! under `wire::` and projects that event stream into the chunks /
|
||||||
|
//! events its HTTP clients expect.
|
||||||
|
//!
|
||||||
|
//! The benefit over translating *between* wire shapes (OpenAI chat
|
||||||
|
//! → Anthropic, etc.) is that we never have to reason about a
|
||||||
|
//! wire-N → wire-M conversion: every translation is wire-N ↔ the
|
||||||
|
//! internal event currency, and the projections are independent. A
|
||||||
|
//! new wire format adds a new file under `wire::`; nothing else
|
||||||
|
//! needs to know about it.
|
||||||
|
//!
|
||||||
|
//! Today: [`openai_chat`]. Stage 2 adds `openai_responses`. Stage 3
|
||||||
|
//! could add a native Anthropic projection that replaces the
|
||||||
|
//! gateway-side translation.
|
||||||
|
|
||||||
|
pub mod event;
|
||||||
|
pub mod openai_chat;
|
||||||
|
|
||||||
|
pub use event::{FinishReason, InferenceEvent};
|
||||||
241
crates/neuron/src/wire/openai_chat.rs
Normal file
241
crates/neuron/src/wire/openai_chat.rs
Normal file
@@ -0,0 +1,241 @@
|
|||||||
|
//! OpenAI chat completions projection.
|
||||||
|
//!
|
||||||
|
//! Reads [`InferenceEvent`]s from a receiver and produces
|
||||||
|
//! [`ChatCompletionChunk`]s in the shape `POST /v1/chat/completions`
|
||||||
|
//! clients expect on its streaming SSE response. The HTTP handler in
|
||||||
|
//! [`crate::api`] wraps the resulting receiver in axum's
|
||||||
|
//! `Sse::new(...)` adapter; nothing in this module touches HTTP
|
||||||
|
//! framing or `data:` lines.
|
||||||
|
//!
|
||||||
|
//! Per the OpenAI streaming spec, three chunk shapes appear:
|
||||||
|
//!
|
||||||
|
//! 1. **Role chunk** — `delta: { "role": "assistant" }`, no content,
|
||||||
|
//! sent once at stream start. We emit this on [`InferenceEvent::Start`].
|
||||||
|
//! 2. **Content chunks** — `delta: { "content": "<text>" }`, one per
|
||||||
|
//! [`InferenceEvent::TextDelta`].
|
||||||
|
//! 3. **Final chunk** — empty `delta`, `finish_reason` populated.
|
||||||
|
//! Emitted on [`InferenceEvent::Finish`].
|
||||||
|
//!
|
||||||
|
//! `usage` stays `None` on every chunk; the legacy candle paths
|
||||||
|
//! never surfaced usage on the streaming endpoint and we keep that
|
||||||
|
//! behaviour bit-for-bit so existing clients see no diff.
|
||||||
|
//!
|
||||||
|
//! Back-pressure: the projection task awaits both `rx.recv()` and
|
||||||
|
//! `tx.send()`. A slow consumer fills the output channel → the
|
||||||
|
//! task blocks on send → it stops reading from the input → the
|
||||||
|
//! producer blocks on its own send. The bounded channels
|
||||||
|
//! propagate without us writing any logic.
|
||||||
|
|
||||||
|
use cortex_core::openai::{ChatCompletionChunk, ChunkChoice};
|
||||||
|
use serde_json::json;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
use super::event::{FinishReason, InferenceEvent};
|
||||||
|
|
||||||
|
/// Output channel buffer size. Mirrors the input side's bound; one
|
||||||
|
/// event maps to at most one chunk, so equal capacity keeps the
|
||||||
|
/// two ends in sync without surprising memory growth.
|
||||||
|
const CHUNK_CHANNEL_CAPACITY: usize = 32;
|
||||||
|
|
||||||
|
/// Project an [`InferenceEvent`] receiver into a
|
||||||
|
/// [`ChatCompletionChunk`] receiver. Spawns one tokio task that
|
||||||
|
/// owns the input receiver for the stream's lifetime and exits
|
||||||
|
/// when either side closes.
|
||||||
|
///
|
||||||
|
/// `id`, `created`, and `model_id` are stamped into every emitted
|
||||||
|
/// chunk so the receiver can stay generic (decoupled from
|
||||||
|
/// per-request metadata).
|
||||||
|
pub fn project_chat_stream(
|
||||||
|
mut rx: mpsc::Receiver<InferenceEvent>,
|
||||||
|
id: String,
|
||||||
|
created: u64,
|
||||||
|
model_id: String,
|
||||||
|
) -> mpsc::Receiver<ChatCompletionChunk> {
|
||||||
|
let (tx, out_rx) = mpsc::channel::<ChatCompletionChunk>(CHUNK_CHANNEL_CAPACITY);
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(event) = rx.recv().await {
|
||||||
|
let chunks = match event {
|
||||||
|
InferenceEvent::Start => vec![role_chunk(&id, created, &model_id)],
|
||||||
|
InferenceEvent::TextDelta(text) => {
|
||||||
|
if text.is_empty() {
|
||||||
|
// DecodeStream is buffering a multi-byte
|
||||||
|
// codepoint; don't bother sending an empty
|
||||||
|
// chunk downstream.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
vec![content_chunk(&id, created, &model_id, &text)]
|
||||||
|
}
|
||||||
|
InferenceEvent::ReasoningDelta(_) => {
|
||||||
|
// Reasoning isn't representable in OpenAI chat
|
||||||
|
// streaming today. The o-series uses a separate
|
||||||
|
// `summary` event but it's gated by the
|
||||||
|
// Responses API; chat-completions just drops it.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
InferenceEvent::Finish { reason } => {
|
||||||
|
vec![final_chunk(&id, created, &model_id, reason)]
|
||||||
|
}
|
||||||
|
};
|
||||||
|
for chunk in chunks {
|
||||||
|
if tx.send(chunk).await.is_err() {
|
||||||
|
// Consumer hung up; nothing more to do.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
out_rx
|
||||||
|
}
|
||||||
|
|
||||||
|
fn role_chunk(id: &str, created: u64, model_id: &str) -> ChatCompletionChunk {
|
||||||
|
ChatCompletionChunk {
|
||||||
|
id: id.into(),
|
||||||
|
object: "chat.completion.chunk".into(),
|
||||||
|
created,
|
||||||
|
model: model_id.into(),
|
||||||
|
choices: vec![ChunkChoice {
|
||||||
|
index: 0,
|
||||||
|
delta: json!({ "role": "assistant" }),
|
||||||
|
finish_reason: None,
|
||||||
|
extra: serde_json::Value::Object(Default::default()),
|
||||||
|
}],
|
||||||
|
usage: None,
|
||||||
|
extra: serde_json::Value::Object(Default::default()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn content_chunk(id: &str, created: u64, model_id: &str, text: &str) -> ChatCompletionChunk {
|
||||||
|
ChatCompletionChunk {
|
||||||
|
id: id.into(),
|
||||||
|
object: "chat.completion.chunk".into(),
|
||||||
|
created,
|
||||||
|
model: model_id.into(),
|
||||||
|
choices: vec![ChunkChoice {
|
||||||
|
index: 0,
|
||||||
|
delta: json!({ "content": text }),
|
||||||
|
finish_reason: None,
|
||||||
|
extra: serde_json::Value::Object(Default::default()),
|
||||||
|
}],
|
||||||
|
usage: None,
|
||||||
|
extra: serde_json::Value::Object(Default::default()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn final_chunk(
|
||||||
|
id: &str,
|
||||||
|
created: u64,
|
||||||
|
model_id: &str,
|
||||||
|
reason: FinishReason,
|
||||||
|
) -> ChatCompletionChunk {
|
||||||
|
ChatCompletionChunk {
|
||||||
|
id: id.into(),
|
||||||
|
object: "chat.completion.chunk".into(),
|
||||||
|
created,
|
||||||
|
model: model_id.into(),
|
||||||
|
choices: vec![ChunkChoice {
|
||||||
|
index: 0,
|
||||||
|
delta: serde_json::Value::Object(Default::default()),
|
||||||
|
finish_reason: Some(reason.as_openai_str().to_string()),
|
||||||
|
extra: serde_json::Value::Object(Default::default()),
|
||||||
|
}],
|
||||||
|
usage: None,
|
||||||
|
extra: serde_json::Value::Object(Default::default()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
/// Drain the projection's output into a Vec for assertion.
|
||||||
|
async fn collect(mut rx: mpsc::Receiver<ChatCompletionChunk>) -> Vec<ChatCompletionChunk> {
|
||||||
|
let mut out = Vec::new();
|
||||||
|
while let Some(chunk) = rx.recv().await {
|
||||||
|
out.push(chunk);
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn empty_event_stream_yields_no_chunks() {
|
||||||
|
let (tx, rx) = mpsc::channel::<InferenceEvent>(4);
|
||||||
|
drop(tx);
|
||||||
|
let out = collect(project_chat_stream(rx, "id-1".into(), 1700, "m".into())).await;
|
||||||
|
assert!(out.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn start_text_finish_produces_three_chunks() {
|
||||||
|
let (tx, rx) = mpsc::channel::<InferenceEvent>(4);
|
||||||
|
let out_rx = project_chat_stream(rx, "id-1".into(), 1700, "m".into());
|
||||||
|
|
||||||
|
tx.send(InferenceEvent::Start).await.unwrap();
|
||||||
|
tx.send(InferenceEvent::TextDelta("hello".into()))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
tx.send(InferenceEvent::Finish {
|
||||||
|
reason: FinishReason::Stop,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
drop(tx);
|
||||||
|
|
||||||
|
let out = collect(out_rx).await;
|
||||||
|
assert_eq!(out.len(), 3);
|
||||||
|
assert_eq!(out[0].choices[0].delta["role"], "assistant");
|
||||||
|
assert_eq!(out[1].choices[0].delta["content"], "hello");
|
||||||
|
assert_eq!(out[2].choices[0].finish_reason.as_deref(), Some("stop"));
|
||||||
|
// Every chunk carries the stamped metadata.
|
||||||
|
for chunk in &out {
|
||||||
|
assert_eq!(chunk.id, "id-1");
|
||||||
|
assert_eq!(chunk.created, 1700);
|
||||||
|
assert_eq!(chunk.model, "m");
|
||||||
|
assert_eq!(chunk.object, "chat.completion.chunk");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn empty_text_delta_is_dropped() {
|
||||||
|
let (tx, rx) = mpsc::channel::<InferenceEvent>(4);
|
||||||
|
let out_rx = project_chat_stream(rx, "id".into(), 1, "m".into());
|
||||||
|
tx.send(InferenceEvent::TextDelta(String::new()))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
drop(tx);
|
||||||
|
let out = collect(out_rx).await;
|
||||||
|
assert!(out.is_empty(), "empty deltas must not produce chunks");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn finish_length_maps_to_openai_string() {
|
||||||
|
let (tx, rx) = mpsc::channel::<InferenceEvent>(4);
|
||||||
|
let out_rx = project_chat_stream(rx, "id".into(), 1, "m".into());
|
||||||
|
tx.send(InferenceEvent::Finish {
|
||||||
|
reason: FinishReason::Length,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
drop(tx);
|
||||||
|
let out = collect(out_rx).await;
|
||||||
|
assert_eq!(out.len(), 1);
|
||||||
|
assert_eq!(out[0].choices[0].finish_reason.as_deref(), Some("length"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn reasoning_delta_is_dropped_in_chat_projection() {
|
||||||
|
let (tx, rx) = mpsc::channel::<InferenceEvent>(4);
|
||||||
|
let out_rx = project_chat_stream(rx, "id".into(), 1, "m".into());
|
||||||
|
tx.send(InferenceEvent::ReasoningDelta("<think>".into()))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
tx.send(InferenceEvent::TextDelta("real".into()))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
drop(tx);
|
||||||
|
let out = collect(out_rx).await;
|
||||||
|
assert_eq!(out.len(), 1);
|
||||||
|
assert_eq!(out[0].choices[0].delta["content"], "real");
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user