From 3ecbb21ece89a50d51c17f3d85be3724fccb36e7 Mon Sep 17 00:00:00 2001 From: rob thijssen Date: Thu, 28 May 2026 16:29:22 +0300 Subject: [PATCH] fix(helexa-acp): persist per round, cancel previous prompt, log loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three changes addressing "session stops mid-turn and disk store doesn't update": 1. Per-round persistence. drive_prompt previously called store::save() once at the very end of the turn. If the loop stalled in a later round (long-running bash, upstream SSE that never finished, wedged ACP roundtrip), earlier successful rounds lived only in the spawned task's `new_turns` and never reached disk. Move the extend-history + save into a helper (extend_and_persist) and call it at the end of every loop iteration. The post-loop save catches whatever the break paths leave behind. Failure is logged not propagated. 2. Cancel previous in-flight prompt on new session/prompt. The handler used to overwrite SessionState.cancel with a fresh token *without firing the old one*. A wedged prior prompt would then live forever, holding session-state references and never persisting. Now we fire the existing cancel under the lock before installing the new token — the old task observes is_cancelled() on its next .await and unwinds. 3. Per-round and per-tool log lines. drive_prompt now emits: - INFO prompt round: streaming { round, of, history_turns } - INFO dispatch tool { tool, tool_call_id } - INFO dispatch tool complete { tool_call_id, is_error } - INFO prompt round complete; persisting { round, turns } - INFO prompt complete { stop_reason } so the next hang shows up by line number in /tmp/helexa-acp.log instead of as silence. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/helexa-acp/src/agent.rs | 101 ++++++++++++++++++++++++++------- 1 file changed, 80 insertions(+), 21 deletions(-) diff --git a/crates/helexa-acp/src/agent.rs b/crates/helexa-acp/src/agent.rs index bc77ce4..f35c7ae 100644 --- a/crates/helexa-acp/src/agent.rs +++ b/crates/helexa-acp/src/agent.rs @@ -643,6 +643,14 @@ async fn drive_prompt( // mid-turn). let (existing_history, model_id, cwd, cancel, mut mode_id) = { let mut state = session_arc.lock().await; + // Fire the session's current cancel before installing a new + // one. If a previous prompt task is still in-flight (model + // stalled mid-stream, a long-running bash, a wedged ACP + // roundtrip), this lets it observe is_cancelled() at the + // next .await and unwind cleanly — instead of two tasks + // racing each other to mutate session.history and to + // persist the same file. + state.cancel.cancel(); let cancel = CancellationToken::new(); state.cancel = cancel.clone(); let user_text = flatten_prompt(&req.prompt); @@ -696,9 +704,10 @@ async fn drive_prompt( }); messages.extend(existing_history); - // Whatever new turns this prompt generates beyond the user's - // input — we persist these to session.history at the end so - // future prompts see them. + // Buffer for turns produced this round. Flushed into + // session.history *and* persisted at the end of every iteration + // (and once more after the loop). Per-round persistence means + // a stall later in the conversation doesn't lose earlier rounds. let mut new_turns: Vec = Vec::new(); // Monotonic counter for synthetic ids assigned to unparseable // blocks across all rounds of this prompt. @@ -707,6 +716,13 @@ async fn drive_prompt( let mut stop_reason = StopReason::EndTurn; for round in 0..MAX_TOOL_ROUNDS { + tracing::info!( + session_id = %session_id.0, + round = round + 1, + of = MAX_TOOL_ROUNDS, + history_turns = messages.len(), + "prompt round: streaming" + ); if cancel.is_cancelled() { stop_reason = StopReason::Cancelled; break; @@ -906,8 +922,20 @@ async fn drive_prompt( name: bucket.name, arguments: bucket.arguments, }; + tracing::info!( + session_id = %session_id.0, + tool = %event.name, + tool_call_id = %event.id, + "dispatch tool" + ); let result = dispatch_tool_call(&ops, &session_id, &mode_id, &cwd, event, &cancel).await; + tracing::info!( + session_id = %session_id.0, + tool_call_id = %result.tool_call_id, + is_error = result.is_error, + "dispatch tool complete" + ); let result_turn = Message { role: Role::Tool, content: MessageContent::ToolResult { @@ -951,13 +979,52 @@ async fn drive_prompt( ); stop_reason = StopReason::MaxTurnRequests; } + + // Per-round flush: push this round's turns into the in-memory + // history and persist to disk. If the model stalls in a later + // round (long bash, upstream SSE that never finishes, etc.) + // earlier rounds still survive a binary restart. + if !new_turns.is_empty() { + let drained = std::mem::take(&mut new_turns); + tracing::info!( + session_id = %session_id.0, + round = round + 1, + turns = drained.len(), + "prompt round complete; persisting" + ); + extend_and_persist(&session_arc, &session_id, drained).await; + } } - // Append the new turns to the session's in-memory history, then - // snapshot the state for persistence. We snapshot *under the - // lock* so the on-disk store reflects exactly what's in memory, - // but the actual blocking I/O (file write) happens outside the - // lock so a slow disk doesn't stall concurrent session work. + // Final flush for whatever the break paths above left behind. + // No-op when the per-round flush already drained new_turns. + if !new_turns.is_empty() { + extend_and_persist(&session_arc, &session_id, new_turns).await; + } + + tracing::info!( + session_id = %session_id.0, + ?stop_reason, + "prompt complete" + ); + let _ = responder.respond(PromptResponse::new(stop_reason)); + Ok(()) +} + +/// Push `new_turns` into the session's in-memory history under the +/// session lock, then snapshot the full state and write it to disk +/// *outside* the lock. Used by `drive_prompt` at the end of every +/// tool-call round (so partial progress survives a stall) and once +/// more after the loop (catching any turns the break paths left +/// behind). +/// +/// Persistence failures log a warning and don't propagate — losing +/// a save shouldn't tear down a live conversation. +async fn extend_and_persist( + session_arc: &Arc>, + session_id: &SessionId, + new_turns: Vec, +) { let snapshot = { let mut state = session_arc.lock().await; state.history.extend(new_turns); @@ -967,29 +1034,21 @@ async fn drive_prompt( model_id: state.model_id.clone(), mode_id: state.mode_id.0.as_ref().to_string(), history: state.history.clone(), - // `created_at` would be ideal to preserve across saves; - // we read it back via store::load on resume but the - // in-memory SessionState doesn't carry it (yet). For - // now persistence treats every save as a refresh, - // updating both timestamps. Future work: thread - // `created_at` through SessionState. + // `created_at` ought to be preserved across saves — + // currently SessionState doesn't carry it, so every + // save refreshes both timestamps. Acceptable for + // resume; future work: thread `created_at` through. created_at: store::now_secs(), updated_at: store::now_secs(), } }; if let Err(e) = store::save(&snapshot) { - // Persistence failure is a warning, not a fatal — the - // prompt response still goes through. Operator can grep - // for this to diagnose disk issues. tracing::warn!( session_id = %session_id.0, error = %format!("{e:#}"), - "session/persist failed; resume from disk will miss this turn" + "session/persist failed; resume from disk will miss this round" ); } - - let _ = responder.respond(PromptResponse::new(stop_reason)); - Ok(()) } /// Accumulator for one streamed tool call: the OpenAI wire format