7 Commits

Author SHA1 Message Date
60f5598542 build(neuron): bump cudarc fork to 63327a2 (idempotent abort + Comm Send+Sync)
Some checks failed
build-prerelease / Resolve version stamps (push) Successful in 29s
CI / CUDA type-check (push) Successful in 31s
CI / Format (push) Successful in 35s
CI / Test (push) Failing after 1m9s
CI / Clippy (push) Successful in 2m36s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Build neuron-blackwell (push) Successful in 6m10s
build-prerelease / Build neuron-ampere (push) Successful in 7m35s
build-prerelease / Build neuron-ada (push) Successful in 5m7s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 2m53s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 3m14s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 3m48s
build-prerelease / Build cortex binary (push) Successful in 4m33s
build-prerelease / Package cortex RPM (push) Successful in 1m21s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 1m1s
The fork's new commit makes `Comm: Send + Sync` (asserting NCCL's
thread-safety invariant upstream) and makes `Comm::abort` idempotent via
an `aborted` flag (so abort-then-Drop can't double-free) — strictly
better than the previous Drop-no-panic workaround, and the `abort()`
signature is unchanged so the watchdog call site is unaffected.

Because `Comm` is now `Send + Sync`, `Arc<Comm>` and the `SendComm` /
`NcclState` wrappers auto-derive `Send`/`Sync`, which conflicts (E0119)
with neuron's manual `unsafe impl`s. Remove the four now-redundant impls
— the safety assertion lives upstream in cudarc where it belongs. The
conflict is in cuda-gated code, so only the CUDA type-check catches it
(non-cuda build + clippy + tests stay green).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 16:33:14 +03:00
7945240646 chore: re-trigger deploy (#17 Stage 2, attempt 3)
All checks were successful
CI / CUDA type-check (push) Successful in 31s
build-prerelease / Resolve version stamps (push) Successful in 31s
CI / Format (push) Successful in 33s
CI / Clippy (push) Successful in 2m41s
build-prerelease / Build cortex binary (push) Successful in 4m45s
build-prerelease / Build neuron-blackwell (push) Successful in 5m50s
CI / Test (push) Successful in 6m44s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Package cortex RPM (push) Successful in 1m23s
build-prerelease / Build neuron-ampere (push) Successful in 8m38s
build-prerelease / Build neuron-ada (push) Successful in 5m36s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 2m55s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 2m59s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 3m43s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 59s
No code change. Each deploy run, the degraded CI runner kills a different
single arch build (blackwell, then ada) ~fast, and the all-arch-gated
packaging skips → no publish. Every arch HAS built green across runs
(blackwell  in 342, ampere , ada  in 339) and the gate + CUDA
type-check pass. Re-running to catch all three green in one run so the
Stage-2 RPMs publish. Runner FS/cache health is the real fix (separate
infra work).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 15:06:04 +03:00
0c74d89d15 chore: re-trigger deploy (#17 Stage 2)
Some checks failed
CI / CUDA type-check (push) Successful in 32s
build-prerelease / Resolve version stamps (push) Successful in 29s
CI / Format (push) Successful in 30s
build-prerelease / Build neuron-ada (push) Failing after 51s
CI / Clippy (push) Successful in 2m41s
build-prerelease / Build cortex binary (push) Successful in 4m28s
build-prerelease / Build neuron-blackwell (push) Successful in 6m32s
build-prerelease / Build neuron-ampere (push) Successful in 7m42s
build-prerelease / Package helexa-neuron-ada RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-ampere RPM (push) Has been skipped
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been skipped
CI / Test (push) Successful in 6m6s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Package cortex RPM (push) Successful in 1m21s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Has been skipped
No code change. The c94a2ae deploy's neuron-blackwell build died ~12min
into the Blackwell kernel compile on the degraded runner, while
neuron-ampere + neuron-ada built the identical Rust + patched cudarc
cleanly and the CUDA type-check passed. Transient infra; re-running to
get a healthy blackwell build so the RPMs publish and beast (Blackwell)
picks it up.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 14:45:16 +03:00
c94a2ae755 fix(neuron): correct nccl_state path on WorkerPool.leader_comm (#17 S2)
Some checks failed
CI / CUDA type-check (push) Successful in 32s
build-prerelease / Resolve version stamps (push) Successful in 35s
CI / Format (push) Successful in 44s
build-prerelease / Build cortex binary (push) Successful in 4m57s
build-prerelease / Package cortex RPM (push) Successful in 1m36s
CI / Test (push) Successful in 7m10s
CI / Clippy (push) Failing after 1m21s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Build neuron-ampere (push) Successful in 8m40s
build-prerelease / Build neuron-ada (push) Successful in 9m5s
build-prerelease / Build neuron-blackwell (push) Failing after 12m2s
build-prerelease / Package helexa-neuron-ada RPM (push) Has been cancelled
build-prerelease / Package helexa-neuron-ampere RPM (push) Has been cancelled
build-prerelease / Package helexa-neuron-blackwell RPM (push) Has been cancelled
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Has been cancelled
`super::nccl_state` from tp/mod.rs resolves to `crate::harness::nccl_state`
(nonexistent); the module is the child `nccl_state` (cf. the existing
`nccl_state::generate_comm_id_hex` call). The field is cuda-gated so the
non-cuda build couldn't catch it; the branch CUDA type-check flaked on the
runner before compiling. Self-audited fix.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 14:21:43 +03:00
99920dd322 feat(neuron): TP step watchdog aborts wedged collectives (#17 Stage 2)
Some checks failed
CI / CUDA type-check (push) Failing after 47s
CI / Format (push) Successful in 31s
CI / Test (push) Failing after 1m3s
CI / Clippy (push) Successful in 2m44s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
Make a hung NCCL collective recoverable instead of a permanent brick.
Today a wedged collective hangs the in-process leader thread forever, and
even Stage 1's recovery can't help — its unload's DropTp queues behind the
stuck thread and hangs too.

- Cache the leader's NCCL Comm handle async-side at init (new cuda-gated
  Job::GetLeaderComm → DeviceWorkerHandle::get_leader_comm → stored on
  WorkerPool.leader_comm). Fetched while the thread is responsive — a
  wedged thread can't service the fetch, which is why it's cached up front.
- Wrap the leader forward in both generate_step and
  generate_step_with_images in tokio::time::timeout (default 120s,
  NEURON_TP_STEP_TIMEOUT_S). On expiry the watchdog calls
  Comm::abort() (ncclCommAbort) on the cached handle from the async
  thread — the one NCCL op sanctioned concurrently with an in-flight
  collective — which unblocks the leader thread, then fails the step
  WITHOUT draining (workers are wedged too; recovery's unload kills them).
  The error is a device fault → poison → Stage 1 auto-recovery, which now
  completes because the leader thread is responsive again.
- Bumps the cudarc patch to dbc425a (adds the Drop-must-not-panic fix so
  the post-abort comm teardown during recovery doesn't double-abort-panic).

Logs the whole sequence at ERROR with greppable `tp watchdog:` /
`ncclCommAbort` markers so a real-world hang leaves a forensic trail —
verification is by inspecting journals after real hangs, not a synthetic
harness. cuda-gated → validated by the blackwell build.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 14:15:29 +03:00
c4f239ceb9 build(neuron): patch cudarc to expose Comm::abort/get_async_error (#17 Stage 2)
All checks were successful
CI / CUDA type-check (push) Successful in 33s
CI / Format (push) Successful in 35s
CI / Clippy (push) Successful in 2m34s
CI / Test (push) Successful in 6m1s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
#17 Stage 2 (TP hang-recovery) needs to call ncclCommAbort on a LIVE
communicator from another thread — to unblock a collective wedged on a
dead/hung peer so the ranks can resync. No cudarc release (incl. main)
exposes this: the safe Comm only aborts in Drop, which can't fire while a
stuck thread holds an Arc<Comm> clone.

Pin neuron's cudarc 0.19.7 to a fork (grenade/cudarc @ nccl-comm-abort,
rev 4dff0be) adding three thin methods — Comm::abort, get_async_error,
and a raw comm() accessor — to be submitted upstream. The patch targets
0.19.x only; candle's transitive cudarc 0.17.8 stays on crates.io.

Foundation only; the watchdog + abort + comm-rebuild that consume these
land in follow-up commits (cuda-gated → validated by the blackwell build).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 13:49:59 +03:00
ac445c1569 chore: re-trigger deploy (#17 Stage 1)
Some checks failed
CI / CUDA type-check (push) Failing after 19s
CI / Format (push) Successful in 37s
build-prerelease / Resolve version stamps (push) Successful in 42s
CI / Clippy (push) Successful in 3m54s
build-prerelease / Build cortex binary (push) Successful in 4m43s
CI / Test (push) Successful in 6m35s
build-prerelease / Build neuron-blackwell (push) Successful in 5m58s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Package cortex RPM (push) Successful in 1m21s
build-prerelease / Build neuron-ampere (push) Successful in 8m10s
build-prerelease / Build neuron-ada (push) Successful in 5m21s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 2m56s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 3m1s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 3m46s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 1m4s
No code change. The abc6e60 deploy's neuron-ada build died on the
degraded CI runner (container dropped mid-checkout), skipping the
gated publish — even though neuron-blackwell + neuron-ampere compiled
the Stage-1 fault-recovery code cleanly. Re-running to get a healthy
ada build so the RPMs publish and beast picks up the build.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 09:34:20 +03:00
7 changed files with 186 additions and 41 deletions

3
Cargo.lock generated
View File

@@ -905,8 +905,7 @@ dependencies = [
[[package]] [[package]]
name = "cudarc" name = "cudarc"
version = "0.19.7" version = "0.19.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/grenade/cudarc?rev=63327a256059f8252641ae46c6bb9eefe707f382#63327a256059f8252641ae46c6bb9eefe707f382"
checksum = "1cea5f10a99e025c1b44ae2354c2d8326b25ddbd0baf76bde8e55cfd4018a2cc"
dependencies = [ dependencies = [
"float8", "float8",
"half", "half",

View File

@@ -61,3 +61,12 @@ eventsource-stream = "0.2"
# workspace crates # workspace crates
cortex-core = { path = "crates/cortex-core" } cortex-core = { path = "crates/cortex-core" }
cortex-gateway = { path = "crates/cortex-gateway" } cortex-gateway = { path = "crates/cortex-gateway" }
# Patched cudarc (affects neuron's 0.19.x only; candle's 0.17.x is
# untouched since the fork is 0.19.7 and doesn't satisfy a 0.17 req). Adds
# Comm::abort / get_async_error / raw comm() — needed for #17 Stage 2 TP
# hang-recovery (abort a wedged collective from another thread, then
# rebuild the comm). Pinned to a fork revision pending upstream review
# (grenade/cudarc @ nccl-comm-abort).
[patch.crates-io]
cudarc = { git = "https://github.com/grenade/cudarc", rev = "63327a256059f8252641ae46c6bb9eefe707f382" }

View File

@@ -201,6 +201,16 @@ pub(crate) fn run(device_index: u32, rx: Receiver<Job>, poisoned: Arc<AtomicBool
let _ = reply.send(resp); let _ = reply.send(resp);
} }
#[cfg(feature = "cuda")] #[cfg(feature = "cuda")]
Job::GetLeaderComm { reply } => {
// Clone the leader's Arc<Comm> out for the async-side
// watchdog. `None` before NcclInit. (#17 Stage 2)
let comm = state
.nccl
.comm()
.map(crate::harness::tp::nccl_state::SendComm);
let _ = reply.send(comm);
}
#[cfg(feature = "cuda")]
Job::TpLoadShard { Job::TpLoadShard {
model_id, model_id,
config_json, config_json,
@@ -1004,6 +1014,10 @@ fn drain_poisoned(job: Job, device_index: u32) {
message: format!("device worker {device_index} poisoned"), message: format!("device worker {device_index} poisoned"),
}); });
} }
#[cfg(feature = "cuda")]
Job::GetLeaderComm { reply } => {
let _ = reply.send(None);
}
Job::NcclSanity { reply } => { Job::NcclSanity { reply } => {
let _ = reply.send(crate::harness::tp::rpc::WorkerResponse::Error { let _ = reply.send(crate::harness::tp::rpc::WorkerResponse::Error {
kind: "device_worker_poisoned".into(), kind: "device_worker_poisoned".into(),

View File

@@ -192,6 +192,17 @@ pub enum Job {
NcclSanity { NcclSanity {
reply: oneshot::Sender<crate::harness::tp::rpc::WorkerResponse>, reply: oneshot::Sender<crate::harness::tp::rpc::WorkerResponse>,
}, },
/// Hand a clonable handle to the leader's NCCL `Comm` back to the
/// async side, so the TP step watchdog can call `ncclCommAbort` on
/// it from a *different* thread to unblock a wedged collective
/// (#17 Stage 2). Fetched once at init while the worker thread is
/// still responsive — a thread already wedged in a collective can't
/// service this job, which is exactly why the handle is cached
/// up front. Replies `None` before `NcclInit` has run.
#[cfg(feature = "cuda")]
GetLeaderComm {
reply: oneshot::Sender<Option<crate::harness::tp::nccl_state::SendComm>>,
},
/// Load the leader's TP shard on the worker thread. The dispatch /// Load the leader's TP shard on the worker thread. The dispatch
/// handler reads `state.nccl.comm()` directly (no cross-thread /// handler reads `state.nccl.comm()` directly (no cross-thread
/// `Arc<Comm>` transfer, no `SendComm` wrapper) and builds the /// `Arc<Comm>` transfer, no `SendComm` wrapper) and builds the

View File

@@ -161,6 +161,27 @@ impl DeviceWorkerHandle {
} }
} }
/// Fetch a clonable handle to the leader's NCCL `Comm` (#17 Stage 2).
/// The TP step watchdog caches this at init so it can call
/// `ncclCommAbort` from the async thread to unblock a wedged
/// collective. Returns `None` if uninitialised, poisoned, or gone —
/// the caller treats a missing handle as "can't abort" and logs it.
#[cfg(feature = "cuda")]
pub async fn get_leader_comm(&self) -> Option<crate::harness::tp::nccl_state::SendComm> {
if self.poisoned.load(Ordering::Acquire) {
return None;
}
let (reply_tx, reply_rx) = oneshot::channel();
if self
.tx
.send(Job::GetLeaderComm { reply: reply_tx })
.is_err()
{
return None;
}
reply_rx.await.ok().flatten()
}
/// Load a GGUF (pre-quantized) single-GPU model on the worker /// Load a GGUF (pre-quantized) single-GPU model on the worker
/// thread. The hf-hub resolution happens on the async caller; the /// thread. The hf-hub resolution happens on the async caller; the
/// resolved local `gguf_path` plus the spec's model_id are sent /// resolved local `gguf_path` plus the spec's model_id are sent

View File

@@ -245,9 +245,67 @@ pub struct WorkerPool {
/// Phase 4 the load itself moves onto the worker and that bridge /// Phase 4 the load itself moves onto the worker and that bridge
/// goes away. /// goes away.
pub(crate) leader_worker: std::sync::Arc<super::device_worker::DeviceWorkerHandle>, pub(crate) leader_worker: std::sync::Arc<super::device_worker::DeviceWorkerHandle>,
/// Cached handle to the leader's NCCL `Comm`, fetched at `init_nccl`
/// while the worker thread is responsive. The TP step watchdog uses
/// it to `ncclCommAbort` a wedged collective from the async thread —
/// the one NCCL op allowed concurrently with an in-flight collective,
/// and the only way to unblock the in-process leader thread so
/// recovery's `unload` doesn't itself hang (#17 Stage 2). `None` if
/// init couldn't cache it; the watchdog then logs that it can't abort.
#[cfg(feature = "cuda")]
leader_comm: Option<nccl_state::SendComm>,
}
/// Per-step deadline for a TP forward (#17 Stage 2). A healthy decode
/// step or chunked prefill completes in well under a second; a wedged
/// NCCL collective never returns. Generous default so no legitimate step
/// trips it; overridable via `NEURON_TP_STEP_TIMEOUT_S` (seconds).
#[cfg(feature = "cuda")]
fn tp_step_timeout() -> std::time::Duration {
let secs = std::env::var("NEURON_TP_STEP_TIMEOUT_S")
.ok()
.and_then(|v| v.trim().parse::<u64>().ok())
.filter(|&s| s > 0)
.unwrap_or(120);
std::time::Duration::from_secs(secs)
} }
impl WorkerPool { impl WorkerPool {
/// Abort the leader's NCCL comm to unblock a collective the watchdog
/// found wedged (#17 Stage 2). Logs the whole sequence loudly so a
/// real-world hang leaves a greppable forensic trail
/// (`tp watchdog:` / `ncclCommAbort`). Calling abort from this async
/// thread while the worker thread is blocked inside the collective is
/// the one concurrent NCCL op the library sanctions — it is how a
/// stuck/failed collective is unblocked.
#[cfg(feature = "cuda")]
fn watchdog_abort_leader_comm(&self, model_id: &str, secs: u64) {
tracing::error!(
model = %model_id,
timeout_s = secs,
"tp watchdog: leader forward exceeded deadline — NCCL collective wedged; \
aborting comm to unblock the leader thread for auto-recovery"
);
match &self.leader_comm {
Some(c) => match c.0.abort() {
Ok(()) => tracing::error!(
model = %model_id,
"tp watchdog: ncclCommAbort succeeded — wedged collective unblocked; \
failing the step so the model auto-recovers (unload+reload)"
),
Err(e) => tracing::error!(
model = %model_id, error = ?e,
"tp watchdog: ncclCommAbort failed — recovery may stall until a process restart"
),
},
None => tracing::error!(
model = %model_id,
"tp watchdog: no cached leader comm handle — cannot abort; recovery will rely \
on a process restart"
),
}
}
/// Spawn `world_size - 1` worker subprocesses. Rank 0 is the /// Spawn `world_size - 1` worker subprocesses. Rank 0 is the
/// leader (in-process) and is *not* spawned here — the leader /// leader (in-process) and is *not* spawned here — the leader
/// holds rank 0's NCCL Comm and shard in its own address space. /// holds rank 0's NCCL Comm and shard in its own address space.
@@ -324,6 +382,8 @@ impl WorkerPool {
workers, workers,
exe, exe,
leader_worker, leader_worker,
#[cfg(feature = "cuda")]
leader_comm: None,
}) })
} }
@@ -404,6 +464,23 @@ impl WorkerPool {
world_size = self.world_size, world_size = self.world_size,
"NCCL communicator established across all ranks" "NCCL communicator established across all ranks"
); );
// Cache the leader's Comm handle now, while the worker thread is
// responsive, so the TP step watchdog can abort a wedged
// collective later (it can't fetch it then — the thread is stuck).
// (#17 Stage 2.)
#[cfg(feature = "cuda")]
{
self.leader_comm = self.leader_worker.get_leader_comm().await;
if self.leader_comm.is_some() {
tracing::debug!("cached leader NCCL comm handle for the TP step watchdog");
} else {
tracing::warn!(
"could not cache leader NCCL comm handle; the TP step watchdog will be \
unable to abort a wedged collective (a hang would need a process restart)"
);
}
}
Ok(()) Ok(())
} }
@@ -628,10 +705,27 @@ impl WorkerPool {
// that's the invariant the whole refactor exists to // that's the invariant the whole refactor exists to
// preserve. // preserve.
let leader_start = std::time::Instant::now(); let leader_start = std::time::Instant::now();
let leader_result = self let timeout = tp_step_timeout();
let leader_fut = self
.leader_worker .leader_worker
.tp_forward_logits(leader_handle, tokens, offset) .tp_forward_logits(leader_handle, tokens, offset);
.await; let leader_result = match tokio::time::timeout(timeout, leader_fut).await {
Ok(r) => r,
Err(_elapsed) => {
// Watchdog (#17 Stage 2): the NCCL collective is wedged.
// Abort the leader comm to unblock its thread, then fail
// the step WITHOUT draining (the subprocess workers are
// wedged too; recovery's unload kills them). The error
// poisons the model → auto-recovery, which no longer hangs
// because the leader thread is now responsive.
self.watchdog_abort_leader_comm(model_id, timeout.as_secs());
anyhow::bail!(
"tp watchdog: leader forward exceeded {}s deadline; aborted wedged NCCL \
comm — model will auto-recover",
timeout.as_secs()
);
}
};
let leader_ok = leader_result.is_ok(); let leader_ok = leader_result.is_ok();
let leader_ms = leader_start.elapsed().as_millis(); let leader_ms = leader_start.elapsed().as_millis();
// Surface the leader's own error at WARN before draining // Surface the leader's own error at WARN before draining
@@ -767,17 +861,29 @@ impl WorkerPool {
// matching collective; CPU-side logits keep the device tensor // matching collective; CPU-side logits keep the device tensor
// from escaping the worker thread. // from escaping the worker thread.
let leader_start = std::time::Instant::now(); let leader_start = std::time::Instant::now();
let leader_result = self let timeout = tp_step_timeout();
.leader_worker let leader_fut = self.leader_worker.tp_forward_logits_with_images(
.tp_forward_logits_with_images( leader_handle,
leader_handle, tokens,
tokens, offset,
offset, image_token_id,
image_token_id, image_data_uris,
image_data_uris, chunk_size,
chunk_size, );
) let leader_result = match tokio::time::timeout(timeout, leader_fut).await {
.await; Ok(r) => r,
Err(_elapsed) => {
// Watchdog (#17 Stage 2) — see generate_step. Vision
// prefill is still well under the deadline on healthy
// hardware; a timeout means a wedged collective.
self.watchdog_abort_leader_comm(model_id, timeout.as_secs());
anyhow::bail!(
"tp watchdog: leader image forward exceeded {}s deadline; aborted wedged \
NCCL comm — model will auto-recover",
timeout.as_secs()
);
}
};
let leader_ok = leader_result.is_ok(); let leader_ok = leader_result.is_ok();
let leader_ms = leader_start.elapsed().as_millis(); let leader_ms = leader_start.elapsed().as_millis();
if !leader_ok { if !leader_ok {

View File

@@ -119,40 +119,25 @@ mod cuda_impl {
} }
} }
/// `Arc<Comm>` doesn't impl `Send` because `Comm` wraps a raw /// Thin newtype over `Arc<Comm>`, kept for call-site clarity — it marks
/// `ncclComm_t` pointer. The NCCL contract is "operations against a /// the points where a comm handle is intentionally moved across threads
/// given comm must be serialised", not "the handle must stay on the /// (e.g. cached async-side for the TP step watchdog's `ncclCommAbort`).
/// thread that created it" — so it's safe to move an `Arc<Comm>`
/// across threads as long as no concurrent ops are issued. The
/// pool's outer Mutex serialises us into `spawn_blocking`, so this
/// wrapper at the move boundary is the only thing missing.
/// ///
/// `Sync` is also marked safe because the `Arc<Comm>` clones held /// `Send`/`Sync` are provided upstream by `cudarc`'s `Comm` (which
/// by the row-parallel layers are only used from the /// asserts the NCCL thread-safety invariant, including aborting from a
/// `spawn_blocking` thread driving the forward pass; concurrent /// different thread than one inside a collective), so this type derives
/// access from another thread would still be a bug. /// them automatically — no manual `unsafe impl` here.
pub struct SendComm(pub Arc<Comm>); pub struct SendComm(pub Arc<Comm>);
// SAFETY: see the doc-comment above; the invariant is enforced at
// the call site (pool Mutex + single spawn_blocking thread), not at
// the type level.
unsafe impl Send for SendComm {}
unsafe impl Sync for SendComm {}
impl SendComm { impl SendComm {
pub fn into_inner(self) -> Arc<Comm> { pub fn into_inner(self) -> Arc<Comm> {
self.0 self.0
} }
} }
// SAFETY: `cudarc::nccl::Comm` contains a raw `ncclComm_t` pointer // `NcclState`'s `Send`/`Sync` are auto-derived: its `Arc<Comm>` and
// (libnccl-allocated state). NCCL requires that operations against // `Arc<CudaContext>` fields are now `Send`/`Sync` (cudarc asserts the
// one Comm be issued one at a time; we serialise access by storing // comm thread-safety invariant), so no manual `unsafe impl` is needed.
// NcclState behind a Mutex in `WorkerPool`. The Comm itself is
// move-safe — NCCL doesn't track the calling OS thread, only the
// stream the operations are dispatched against.
unsafe impl Send for NcclState {}
unsafe impl Sync for NcclState {}
/// Generate a fresh NCCL `Id` and return it hex-encoded. Used by /// Generate a fresh NCCL `Id` and return it hex-encoded. Used by
/// the leader to mint the shared communicator id which is then /// the leader to mint the shared communicator id which is then