fix(helexa-acp): persist per round, cancel previous prompt, log loop
All checks were successful
build-prerelease / Resolve version stamps (push) Successful in 34s
CI / Format (push) Successful in 35s
CI / Clippy (push) Successful in 2m32s
CI / Test (push) Successful in 5m8s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Build neuron-blackwell (push) Successful in 6m4s
build-prerelease / Build neuron-ampere (push) Successful in 8m13s
build-prerelease / Build neuron-ada (push) Successful in 5m18s
build-prerelease / Build cortex binary (push) Successful in 16m12s
build-prerelease / Package cortex RPM (push) Successful in 1m15s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 2m57s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 3m2s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 3m39s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 1m3s

Three changes addressing "session stops mid-turn and disk store
doesn't update":

1. Per-round persistence. drive_prompt previously called
   store::save() once at the very end of the turn. If the loop
   stalled in a later round (long-running bash, upstream SSE that
   never finished, wedged ACP roundtrip), earlier successful
   rounds lived only in the spawned task's `new_turns` and never
   reached disk. Move the extend-history + save into a helper
   (extend_and_persist) and call it at the end of every loop
   iteration. The post-loop save catches whatever the break paths
   leave behind. Failure is logged not propagated.

2. Cancel previous in-flight prompt on new session/prompt. The
   handler used to overwrite SessionState.cancel with a fresh
   token *without firing the old one*. A wedged prior prompt would
   then live forever, holding session-state references and never
   persisting. Now we fire the existing cancel under the lock
   before installing the new token — the old task observes
   is_cancelled() on its next .await and unwinds.

3. Per-round and per-tool log lines. drive_prompt now emits:
   - INFO  prompt round: streaming { round, of, history_turns }
   - INFO  dispatch tool { tool, tool_call_id }
   - INFO  dispatch tool complete { tool_call_id, is_error }
   - INFO  prompt round complete; persisting { round, turns }
   - INFO  prompt complete { stop_reason }
   so the next hang shows up by line number in /tmp/helexa-acp.log
   instead of as silence.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-28 16:29:22 +03:00
parent 0d841a4981
commit 3ecbb21ece

View File

@@ -643,6 +643,14 @@ async fn drive_prompt(
// mid-turn). // mid-turn).
let (existing_history, model_id, cwd, cancel, mut mode_id) = { let (existing_history, model_id, cwd, cancel, mut mode_id) = {
let mut state = session_arc.lock().await; let mut state = session_arc.lock().await;
// Fire the session's current cancel before installing a new
// one. If a previous prompt task is still in-flight (model
// stalled mid-stream, a long-running bash, a wedged ACP
// roundtrip), this lets it observe is_cancelled() at the
// next .await and unwind cleanly — instead of two tasks
// racing each other to mutate session.history and to
// persist the same file.
state.cancel.cancel();
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
state.cancel = cancel.clone(); state.cancel = cancel.clone();
let user_text = flatten_prompt(&req.prompt); let user_text = flatten_prompt(&req.prompt);
@@ -696,9 +704,10 @@ async fn drive_prompt(
}); });
messages.extend(existing_history); messages.extend(existing_history);
// Whatever new turns this prompt generates beyond the user's // Buffer for turns produced this round. Flushed into
// input — we persist these to session.history at the end so // session.history *and* persisted at the end of every iteration
// future prompts see them. // (and once more after the loop). Per-round persistence means
// a stall later in the conversation doesn't lose earlier rounds.
let mut new_turns: Vec<Message> = Vec::new(); let mut new_turns: Vec<Message> = Vec::new();
// Monotonic counter for synthetic ids assigned to unparseable // Monotonic counter for synthetic ids assigned to unparseable
// <tool_call> blocks across all rounds of this prompt. // <tool_call> blocks across all rounds of this prompt.
@@ -707,6 +716,13 @@ async fn drive_prompt(
let mut stop_reason = StopReason::EndTurn; let mut stop_reason = StopReason::EndTurn;
for round in 0..MAX_TOOL_ROUNDS { for round in 0..MAX_TOOL_ROUNDS {
tracing::info!(
session_id = %session_id.0,
round = round + 1,
of = MAX_TOOL_ROUNDS,
history_turns = messages.len(),
"prompt round: streaming"
);
if cancel.is_cancelled() { if cancel.is_cancelled() {
stop_reason = StopReason::Cancelled; stop_reason = StopReason::Cancelled;
break; break;
@@ -906,8 +922,20 @@ async fn drive_prompt(
name: bucket.name, name: bucket.name,
arguments: bucket.arguments, arguments: bucket.arguments,
}; };
tracing::info!(
session_id = %session_id.0,
tool = %event.name,
tool_call_id = %event.id,
"dispatch tool"
);
let result = let result =
dispatch_tool_call(&ops, &session_id, &mode_id, &cwd, event, &cancel).await; dispatch_tool_call(&ops, &session_id, &mode_id, &cwd, event, &cancel).await;
tracing::info!(
session_id = %session_id.0,
tool_call_id = %result.tool_call_id,
is_error = result.is_error,
"dispatch tool complete"
);
let result_turn = Message { let result_turn = Message {
role: Role::Tool, role: Role::Tool,
content: MessageContent::ToolResult { content: MessageContent::ToolResult {
@@ -951,13 +979,52 @@ async fn drive_prompt(
); );
stop_reason = StopReason::MaxTurnRequests; stop_reason = StopReason::MaxTurnRequests;
} }
// Per-round flush: push this round's turns into the in-memory
// history and persist to disk. If the model stalls in a later
// round (long bash, upstream SSE that never finishes, etc.)
// earlier rounds still survive a binary restart.
if !new_turns.is_empty() {
let drained = std::mem::take(&mut new_turns);
tracing::info!(
session_id = %session_id.0,
round = round + 1,
turns = drained.len(),
"prompt round complete; persisting"
);
extend_and_persist(&session_arc, &session_id, drained).await;
}
} }
// Append the new turns to the session's in-memory history, then // Final flush for whatever the break paths above left behind.
// snapshot the state for persistence. We snapshot *under the // No-op when the per-round flush already drained new_turns.
// lock* so the on-disk store reflects exactly what's in memory, if !new_turns.is_empty() {
// but the actual blocking I/O (file write) happens outside the extend_and_persist(&session_arc, &session_id, new_turns).await;
// lock so a slow disk doesn't stall concurrent session work. }
tracing::info!(
session_id = %session_id.0,
?stop_reason,
"prompt complete"
);
let _ = responder.respond(PromptResponse::new(stop_reason));
Ok(())
}
/// Push `new_turns` into the session's in-memory history under the
/// session lock, then snapshot the full state and write it to disk
/// *outside* the lock. Used by `drive_prompt` at the end of every
/// tool-call round (so partial progress survives a stall) and once
/// more after the loop (catching any turns the break paths left
/// behind).
///
/// Persistence failures log a warning and don't propagate — losing
/// a save shouldn't tear down a live conversation.
async fn extend_and_persist(
session_arc: &Arc<tokio::sync::Mutex<SessionState>>,
session_id: &SessionId,
new_turns: Vec<Message>,
) {
let snapshot = { let snapshot = {
let mut state = session_arc.lock().await; let mut state = session_arc.lock().await;
state.history.extend(new_turns); state.history.extend(new_turns);
@@ -967,29 +1034,21 @@ async fn drive_prompt(
model_id: state.model_id.clone(), model_id: state.model_id.clone(),
mode_id: state.mode_id.0.as_ref().to_string(), mode_id: state.mode_id.0.as_ref().to_string(),
history: state.history.clone(), history: state.history.clone(),
// `created_at` would be ideal to preserve across saves; // `created_at` ought to be preserved across saves
// we read it back via store::load on resume but the // currently SessionState doesn't carry it, so every
// in-memory SessionState doesn't carry it (yet). For // save refreshes both timestamps. Acceptable for
// now persistence treats every save as a refresh, // resume; future work: thread `created_at` through.
// updating both timestamps. Future work: thread
// `created_at` through SessionState.
created_at: store::now_secs(), created_at: store::now_secs(),
updated_at: store::now_secs(), updated_at: store::now_secs(),
} }
}; };
if let Err(e) = store::save(&snapshot) { if let Err(e) = store::save(&snapshot) {
// Persistence failure is a warning, not a fatal — the
// prompt response still goes through. Operator can grep
// for this to diagnose disk issues.
tracing::warn!( tracing::warn!(
session_id = %session_id.0, session_id = %session_id.0,
error = %format!("{e:#}"), error = %format!("{e:#}"),
"session/persist failed; resume from disk will miss this turn" "session/persist failed; resume from disk will miss this round"
); );
} }
let _ = responder.respond(PromptResponse::new(stop_reason));
Ok(())
} }
/// Accumulator for one streamed tool call: the OpenAI wire format /// Accumulator for one streamed tool call: the OpenAI wire format