diff --git a/crates/neuron/src/harness/candle.rs b/crates/neuron/src/harness/candle.rs index 7daad46..000e590 100644 --- a/crates/neuron/src/harness/candle.rs +++ b/crates/neuron/src/harness/candle.rs @@ -106,18 +106,18 @@ impl LoadedHandle { } } - /// Modalities the loaded model supports. Stage B7. TP models are - /// always text-only today — TP-vision is tracked under issue #12. + /// Modalities the loaded model supports. Stage B7 (single-GPU) + + /// TP-vision (#12) — both single-GPU and TP loads advertise + /// `"vision"` when a replicated vision tower materialised. pub fn capabilities(&self) -> Vec { let mut caps = vec!["text".to_string()]; - match self { - LoadedHandle::Single(m) => { - if m.has_vision { - caps.push("vision".to_string()); - } - } + let has_vision = match self { + LoadedHandle::Single(m) => m.has_vision, #[cfg(feature = "cuda")] - LoadedHandle::Tp(_) => {} + LoadedHandle::Tp(m) => m.has_vision, + }; + if has_vision { + caps.push("vision".to_string()); } caps } @@ -281,6 +281,16 @@ pub struct TpLoadedModel { pub tool_call_tokens: Option, /// Same shape as [`LoadedModel::chat_template`]. pub chat_template: Option, + /// Vision capability flag (TP-vision). `true` iff every rank + /// materialised a replicated vision tower. Mirrors + /// [`LoadedModel::has_vision`]; drives capability advertising and + /// the TP vision dispatch. + pub has_vision: bool, + /// `<|image_pad|>` token id — same as [`LoadedModel::image_token_id`]. + pub image_token_id: Option, + /// LM-side tokens per image at the fixed 448×448 resolution — same + /// as [`LoadedModel::lm_tokens_per_image`]. + pub lm_tokens_per_image: Option, } #[cfg(feature = "cuda")] @@ -2675,6 +2685,20 @@ impl CandleHarness { ); } + // Vision metadata from the same config.json the shards loaded + // from. The TP model builder (Stage 1) materialises a replicated + // vision tower on every rank when `vision_config` is present, so + // `has_vision` here is consistent with what each rank loaded. + let vision_meta = VisionMeta::from_config_path(&config_path); + if vision_meta.has_vision { + tracing::info!( + model = %spec.model_id, + image_token_id = ?vision_meta.image_token_id, + lm_tokens_per_image = ?vision_meta.lm_tokens_per_image, + "TP load: vision tower present, advertising vision capability" + ); + } + let tp_loaded = StdArc::new(TpLoadedModel { model_id: spec.model_id.clone(), tokenizer, @@ -2690,6 +2714,9 @@ impl CandleHarness { reasoning_tokens, tool_call_tokens, chat_template, + has_vision: vision_meta.has_vision, + image_token_id: vision_meta.image_token_id, + lm_tokens_per_image: vision_meta.lm_tokens_per_image, }); let mut models = self.models.write().await; @@ -2739,15 +2766,15 @@ impl CandleHarness { return Err(poisoned_error(&model_id)); } - // Stage 0 (TP-vision): the TP path has no vision tower yet, so - // an image-bearing request can't be honoured. Reject it cleanly - // with `vision_unsupported` instead of silently dropping the - // image and answering from text alone (the issue-#3 confident- - // hallucination pattern). Made conditional on the TP model's - // `has_vision` once Stage 3 wires real TP-vision. - if request_has_images(&request) { + // Reject image-bearing requests against a TP model with no + // vision tower, cleanly (`vision_unsupported`) rather than + // silently dropping the image. Vision-capable TP loads fall + // through to the image-aware prefill in chat_completion_tp_inner. + if request_has_images(&request) && !tp.has_vision { let _g = span.enter(); - tracing::warn!("TP chat_completion: rejecting image request, TP vision unsupported"); + tracing::warn!( + "TP chat_completion: rejecting image request, model has no vision tower" + ); return Err(InferenceError::VisionUnsupported { model_id }); } @@ -2828,14 +2855,12 @@ impl CandleHarness { return Err(poisoned_error(&request.model)); } - // Stage 0 (TP-vision): reject image requests on the TP streaming - // path before opening the SSE stream — the TP path has no vision - // tower yet, so honouring the image is impossible and silently - // dropping it would hallucinate. Returns a clean 400; made - // conditional on `has_vision` in Stage 3. - if request_has_images(&request) { + // Reject image requests against a non-vision TP model before + // opening the SSE stream. Vision-capable TP loads fall through + // to the image-aware prefill in the orchestration task below. + if request_has_images(&request) && !tp.has_vision { tracing::warn!( - "TP chat_completion (stream): rejecting image request, TP vision unsupported" + "TP chat_completion (stream): rejecting image request, model has no vision tower" ); return Err(InferenceError::VisionUnsupported { model_id: request.model.clone(), @@ -2847,7 +2872,44 @@ impl CandleHarness { .tokenizer .encode(prompt.as_str(), true) .map_err(|e| InferenceError::Other(anyhow::anyhow!("tokenize: {e}")))?; - let prompt_tokens: Vec = encoding.get_ids().to_vec(); + let mut prompt_tokens: Vec = encoding.get_ids().to_vec(); + + // TP-vision (streaming): same detection + pad expansion as the + // non-streaming path. The resulting `vision_route` moves into + // the orchestration task, which runs a single-shot image prefill + // when present. Returning early here keeps a rejected request + // from opening the SSE stream. + let vision_route: Option<(Vec, u32)> = if request_has_images(&request) { + if !tp.has_vision { + return Err(InferenceError::VisionUnsupported { + model_id: request.model.clone(), + }); + } + let image_token_id = + tp.image_token_id + .ok_or_else(|| InferenceError::VisionUnsupported { + model_id: request.model.clone(), + })?; + let patches_per_image = + tp.lm_tokens_per_image + .ok_or_else(|| InferenceError::VisionUnsupported { + model_id: request.model.clone(), + })?; + let data_uris = extract_image_data_uris(&request); + if data_uris.is_empty() { + return Err(InferenceError::Other(anyhow::anyhow!( + "request has image content but extractor produced zero data URIs" + ))); + } + let per_image_counts: Vec = vec![patches_per_image; data_uris.len()]; + prompt_tokens = + expand_image_pad_tokens(&prompt_tokens, image_token_id, &per_image_counts) + .map_err(InferenceError::Other)?; + Some((data_uris, image_token_id)) + } else { + None + }; + let prompt_len = prompt_tokens.len(); let temperature = request.temperature.unwrap_or(0.7); @@ -2961,14 +3023,27 @@ impl CandleHarness { // chunk fans out to every rank with a growing // offset; only the final chunk's logits are kept // for the first sample. - let logits_vec = match chunked_prefill_tp( - &mut pool, - &model_id, - leader_handle, - &prompt_tokens, - ) - .await - { + // Vision requests do a single-shot image prefill; + // text requests chunk it. `vision_route` was moved + // into this task from the synchronous setup above. + let prefill_result = match &vision_route { + Some((data_uris, image_token_id)) => { + pool.generate_step_with_images( + &model_id, + leader_handle, + prompt_tokens.clone(), + 0, + *image_token_id, + data_uris.clone(), + ) + .await + } + None => { + chunked_prefill_tp(&mut pool, &model_id, leader_handle, &prompt_tokens) + .await + } + }; + let logits_vec = match prefill_result { Ok(l) => l, Err(e) => { failure = Some(format!("prefill: {e:#}")); @@ -3311,7 +3386,43 @@ async fn chat_completion_tp_inner( .tokenizer .encode(prompt.as_str(), true) .map_err(|e| InferenceError::Other(anyhow::anyhow!("tokenize: {e}")))?; - let prompt_tokens: Vec = encoding.get_ids().to_vec(); + let mut prompt_tokens: Vec = encoding.get_ids().to_vec(); + + // TP-vision: when the request carries images (and the model has a + // replicated tower — enforced by the caller's guard), expand each + // `<|image_pad|>` sentinel to the per-image patch count and carry + // the source data URIs through to the single-shot image prefill. + // Mirrors the single-GPU `chat_completion` vision_route block. + let vision_route: Option<(Vec, u32)> = if request_has_images(&request) { + if !tp.has_vision { + return Err(InferenceError::VisionUnsupported { + model_id: request.model.clone(), + }); + } + let image_token_id = + tp.image_token_id + .ok_or_else(|| InferenceError::VisionUnsupported { + model_id: request.model.clone(), + })?; + let patches_per_image = + tp.lm_tokens_per_image + .ok_or_else(|| InferenceError::VisionUnsupported { + model_id: request.model.clone(), + })?; + let data_uris = extract_image_data_uris(&request); + if data_uris.is_empty() { + return Err(InferenceError::Other(anyhow::anyhow!( + "request has image content but extractor produced zero data URIs" + ))); + } + let per_image_counts: Vec = vec![patches_per_image; data_uris.len()]; + prompt_tokens = expand_image_pad_tokens(&prompt_tokens, image_token_id, &per_image_counts) + .map_err(InferenceError::Other)?; + Some((data_uris, image_token_id)) + } else { + None + }; + let prompt_len = prompt_tokens.len(); let temperature = request.temperature.unwrap_or(0.7); @@ -3381,9 +3492,24 @@ async fn chat_completion_tp_inner( // spread across multiple `generate_step` calls with monotonically // growing offsets. let prefill_start = std::time::Instant::now(); - let logits_vec = chunked_prefill_tp(&mut pool, &model_id, leader_handle, &prompt_tokens) - .await - .map_err(InferenceError::Other)?; + // Vision requests do a single-shot image prefill (every rank encodes + // + splices its replicated tower); text requests chunk the prefill. + let logits_vec = match &vision_route { + Some((data_uris, image_token_id)) => pool + .generate_step_with_images( + &model_id, + leader_handle, + prompt_tokens.clone(), + 0, + *image_token_id, + data_uris.clone(), + ) + .await + .map_err(InferenceError::Other)?, + None => chunked_prefill_tp(&mut pool, &model_id, leader_handle, &prompt_tokens) + .await + .map_err(InferenceError::Other)?, + }; let (post_prefill_vram_free_mb, _) = tp.query_vram().await; tracing::info!( model = %model_id, @@ -3841,6 +3967,37 @@ fn extract_images_from_request( Ok(out) } +/// Collect the raw `image_url.url` strings (data URIs) from a chat +/// request, in prompt order. The TP vision path (Stage C / TP-vision) +/// ships these verbatim to every rank, which each preprocess + encode +/// identically — so unlike `extract_images_from_request` (which +/// preprocesses on the leader for the single-GPU worker job) this +/// keeps the source form for replicated per-rank encoding. +/// +/// Cuda-gated: the only callers are the TP entry points, which compile +/// only under the `cuda` feature. +#[cfg(feature = "cuda")] +fn extract_image_data_uris(request: &ChatCompletionRequest) -> Vec { + let mut out = Vec::new(); + for msg in &request.messages { + if let MessageContent::Parts(parts) = &msg.content { + for part in parts { + if part.get("type").and_then(|v| v.as_str()) != Some("image_url") { + continue; + } + if let Some(url) = part + .get("image_url") + .and_then(|v| v.get("url")) + .and_then(|v| v.as_str()) + { + out.push(url.to_string()); + } + } + } + } + out +} + /// Expand each occurrence of `image_token_id` in `input_ids` into /// `patches_per_image[i]` copies (one expansion per image, in order). /// Stage B4 helper.