feat(helexa-acp): add tools, session modes, and permission gating
All checks were successful
build-prerelease / Resolve version stamps (push) Successful in 36s
CI / Format (push) Successful in 39s
CI / Clippy (push) Successful in 2m38s
CI / Test (push) Successful in 5m9s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Build neuron-blackwell (push) Successful in 5m54s
build-prerelease / Build neuron-ampere (push) Successful in 7m54s
build-prerelease / Build neuron-ada (push) Successful in 4m59s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 2m56s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 3m14s
build-prerelease / Build cortex binary (push) Successful in 4m9s
build-prerelease / Package cortex RPM (push) Successful in 1m22s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 6m47s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 3m54s

Stage 3 introduces five tools (read_file, write_file, edit_file,
list_dir, bash) backed by ACP fs/* and terminal/* calls, a
ClientOps trait so the runner is mock-testable, two session modes
(default + bypassPermissions) with session/set_mode honouring them,
and a tool-call loop in the agent that streams the model, dispatches
each call, feeds results back into history, and re-enters until the
model finishes or MAX_TOOL_ROUNDS is hit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-28 10:01:32 +03:00
parent 96fc379893
commit 0609f1ac5d
5 changed files with 1514 additions and 86 deletions

View File

@@ -1,19 +1,17 @@
//! ACP agent loop — text-only (Stage 2). //! ACP agent loop with tools and session modes (Stage 3).
//! //!
//! Handlers: //! Handlers:
//! //!
//! | ACP method | Behaviour | //! | ACP method | Behaviour |
//! |-------------------|------------------------------------------------------------| //! |-----------------------|-------------------------------------------------------------|
//! | `initialize` | echo client's protocol version, advertise capabilities | //! | `initialize` | echo protocol version, advertise capabilities |
//! | `session/new` | mint a session id, register state, return it | //! | `session/new` | mint id, register state, advertise [Default, Bypass] modes |
//! | `session/prompt` | flatten user blocks → history, stream provider → updates | //! | `session/prompt` | tool-call loop: stream → dispatch tools → re-enter, repeat |
//! | `session/cancel` | fire the session's cancellation token | //! | `session/cancel` | fire the session's cancellation token |
//! | (anything else) | "not implemented yet" error | //! | `session/set_mode` | mutate the session's mode (gated vs. bypass-permissions) |
//! | (anything else) | "not implemented yet" error |
//! //!
//! Stage 3 adds tool calls; Stage 4 wires `session/set_model`; Stage 5 //! Stage 4 wires `session/set_model`; Stage 5 flips on image content.
//! flips on image content. Stage 2 deliberately answers the model-picker
//! and session-modes fields with `None` so editors render a single model
//! / single mode UI.
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
@@ -22,18 +20,28 @@ use std::sync::atomic::{AtomicU64, Ordering};
use agent_client_protocol::schema::{ use agent_client_protocol::schema::{
AgentCapabilities, CancelNotification, ContentBlock, InitializeRequest, InitializeResponse, AgentCapabilities, CancelNotification, ContentBlock, InitializeRequest, InitializeResponse,
NewSessionRequest, NewSessionResponse, PromptCapabilities, PromptRequest, PromptResponse, NewSessionRequest, NewSessionResponse, PromptCapabilities, PromptRequest, PromptResponse,
SessionId, SessionNotification, SessionUpdate, StopReason, TextContent, SessionId, SessionMode, SessionModeId, SessionModeState, SessionNotification, SessionUpdate,
SetSessionModeRequest, SetSessionModeResponse, StopReason, TextContent,
}; };
use agent_client_protocol::{Agent as AgentRole, Client, ConnectionTo, Dispatch, Stdio}; use agent_client_protocol::{Agent as AgentRole, Client, ConnectionTo, Dispatch, Stdio};
use futures::StreamExt; use futures::StreamExt;
use std::collections::BTreeMap;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::config::{Config, parse_model_selector}; use crate::config::{Config, parse_model_selector};
use crate::prompt::build_system_prompt; use crate::prompt::build_system_prompt;
use crate::provider::{ use crate::provider::{
CompletionEvent, CompletionRequest, Message, MessageContent, Provider, Role, CompletionEvent, CompletionRequest, Message, MessageContent, Provider, Role, ToolCall,
}; };
use crate::session::{self, SessionState, SessionStore}; use crate::session::{self, MODE_BYPASS, MODE_DEFAULT, SessionState, SessionStore};
use crate::tool_runner::{AcpClientOps, ToolCallEvent, dispatch_tool_call};
use crate::tools;
/// Maximum number of provider→tool→provider round-trips per
/// `session/prompt` request. Bound exists to keep a runaway model
/// from looping forever; the spec maps this to
/// [`StopReason::MaxTurnRequests`].
const MAX_TOOL_ROUNDS: usize = 25;
/// Public entry point. Wraps an `Arc<AgentInner>` so handlers can clone /// Public entry point. Wraps an `Arc<AgentInner>` so handlers can clone
/// it cheaply into every closure. /// it cheaply into every closure.
@@ -126,6 +134,18 @@ impl Agent {
}, },
agent_client_protocol::on_receive_request!(), agent_client_protocol::on_receive_request!(),
) )
.on_receive_request(
{
let inner = inner.clone();
async move |req: SetSessionModeRequest, responder, _cx| {
match handle_set_session_mode(&inner, req).await {
Ok(()) => responder.respond(SetSessionModeResponse::new()),
Err(e) => responder.respond_with_internal_error(format!("{e:#}")),
}
}
},
agent_client_protocol::on_receive_request!(),
)
.on_receive_notification( .on_receive_notification(
{ {
let inner = inner.clone(); let inner = inner.clone();
@@ -187,7 +207,48 @@ async fn handle_new_session(
cwd = %cwd_display, cwd = %cwd_display,
"session created" "session created"
); );
Ok(NewSessionResponse::new(session_id)) Ok(NewSessionResponse::new(session_id).modes(default_mode_state()))
}
/// The two modes every Stage 3 session advertises. Stage 7 may grow
/// this list (e.g. "plan" for plan-only output, "ask" for read-only),
/// but Default + Bypass cover the two operationally distinct
/// permission policies.
fn default_mode_state() -> SessionModeState {
SessionModeState::new(
SessionModeId::new(MODE_DEFAULT),
vec![
SessionMode::new(SessionModeId::new(MODE_DEFAULT), "Default")
.description("Prompt for permission before writes or shell commands."),
SessionMode::new(SessionModeId::new(MODE_BYPASS), "Bypass Permissions")
.description("Auto-allow all tool calls. Use with care."),
],
)
}
async fn handle_set_session_mode(
inner: &AgentInner,
req: SetSessionModeRequest,
) -> anyhow::Result<()> {
let Some(state) = session::get(&inner.sessions, &req.session_id).await else {
anyhow::bail!("unknown session id {}", req.session_id.0);
};
let accepted = req.mode_id.0.as_ref() == MODE_DEFAULT || req.mode_id.0.as_ref() == MODE_BYPASS;
if !accepted {
anyhow::bail!(
"unknown mode '{}' — must be one of: {}, {}",
req.mode_id.0,
MODE_DEFAULT,
MODE_BYPASS
);
}
state.lock().await.mode_id = req.mode_id.clone();
tracing::info!(
session_id = %req.session_id.0,
mode = %req.mode_id.0,
"session mode changed"
);
Ok(())
} }
async fn handle_cancel(inner: &AgentInner, notif: CancelNotification) { async fn handle_cancel(inner: &AgentInner, notif: CancelNotification) {
@@ -239,11 +300,11 @@ async fn drive_prompt(
return Ok(()); return Ok(());
}; };
// Snapshot the inputs to the upstream call under the session // Snapshot the inputs under the session lock, then drop the lock
// lock, then drop the lock before any `await` that touches the // before any `await` that touches the network. `mode_id` is
// network. We *also* install a fresh cancellation token so // refreshed between tool rounds (the user can toggle modes
// `session/cancel` can fire only this prompt. // mid-turn).
let (mut history, model_id, cwd, cancel) = { let (existing_history, model_id, cwd, cancel, mut mode_id) = {
let mut state = session_arc.lock().await; let mut state = session_arc.lock().await;
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
state.cancel = cancel.clone(); state.cancel = cancel.clone();
@@ -257,6 +318,7 @@ async fn drive_prompt(
state.model_id.clone(), state.model_id.clone(),
state.cwd.clone(), state.cwd.clone(),
cancel, cancel,
state.mode_id.clone(),
) )
}; };
@@ -276,98 +338,220 @@ async fn drive_prompt(
session_id = %session_id.0, session_id = %session_id.0,
endpoint = %provider.name(), endpoint = %provider.name(),
model = %local_model, model = %local_model,
history_turns = history.len(), mode = %mode_id.0,
history_turns = existing_history.len(),
"sending prompt upstream" "sending prompt upstream"
); );
let mut messages = Vec::with_capacity(history.len() + 1); let ops = AcpClientOps::new(cx.clone());
// `messages` is the rolling conversation we send to the provider
// each round. We seed it with the system prompt + the snapshot
// (which includes the new user turn) and grow it with each
// round's assistant turn + tool-result turns.
let mut messages: Vec<Message> = Vec::with_capacity(existing_history.len() + 1);
messages.push(Message { messages.push(Message {
role: Role::System, role: Role::System,
content: MessageContent::Text(system_prompt), content: MessageContent::Text(system_prompt),
}); });
messages.append(&mut history); messages.extend(existing_history);
let completion_req = CompletionRequest { // Whatever new turns this prompt generates beyond the user's
model: local_model, // input — we persist these to session.history at the end so
messages, // future prompts see them.
tools: vec![], let mut new_turns: Vec<Message> = Vec::new();
temperature: None,
top_p: None,
max_tokens: None,
};
let stream_result = provider.complete(completion_req, cancel.clone()).await; let tool_specs = tools::all_tools();
let mut stream = match stream_result {
Ok(s) => s,
Err(e) => {
let _ = responder
.respond_with_internal_error(format!("{} complete: {e:#}", provider.name()));
return Ok(());
}
};
let mut assistant_text = String::new();
let mut stop_reason = StopReason::EndTurn; let mut stop_reason = StopReason::EndTurn;
while let Some(event) = stream.next().await { for round in 0..MAX_TOOL_ROUNDS {
let event = match event { if cancel.is_cancelled() {
Ok(e) => e, stop_reason = StopReason::Cancelled;
break;
}
let completion_req = CompletionRequest {
model: local_model.clone(),
messages: messages.clone(),
tools: tool_specs.clone(),
temperature: None,
top_p: None,
max_tokens: None,
};
let mut stream = match provider.complete(completion_req, cancel.clone()).await {
Ok(s) => s,
Err(e) => { Err(e) => {
tracing::warn!(error = %format!("{e:#}"), "stream error; ending turn"); let _ = responder
break; .respond_with_internal_error(format!("{} complete: {e:#}", provider.name()));
return Ok(());
} }
}; };
match event {
CompletionEvent::TextDelta(t) => { let mut assistant_text = String::new();
assistant_text.push_str(&t); let mut finish_reason: Option<String> = None;
send_chunk( // `BTreeMap` keyed by the provider's tool-call index keeps
&cx, // insertion order while allowing arg deltas to mutate any
&session_id, // bucket — `ToolCallStart` may arrive interleaved with
SessionUpdate::AgentMessageChunk(text_chunk(t)), // `ToolCallArgsDelta` for different indices.
); let mut tool_buckets: BTreeMap<usize, ToolCallBucket> = BTreeMap::new();
while let Some(event) = stream.next().await {
let event = match event {
Ok(e) => e,
Err(e) => {
tracing::warn!(error = %format!("{e:#}"), "stream error; ending round");
break;
}
};
match event {
CompletionEvent::TextDelta(t) => {
assistant_text.push_str(&t);
send_chunk(
&cx,
&session_id,
SessionUpdate::AgentMessageChunk(text_chunk(t)),
);
}
CompletionEvent::ReasoningDelta(t) => {
send_chunk(
&cx,
&session_id,
SessionUpdate::AgentThoughtChunk(text_chunk(t)),
);
}
CompletionEvent::ToolCallStart { index, id, name } => {
tool_buckets.insert(
index,
ToolCallBucket {
id,
name,
arguments: String::new(),
},
);
}
CompletionEvent::ToolCallArgsDelta { index, args_delta } => {
tool_buckets
.entry(index)
.or_default()
.arguments
.push_str(&args_delta);
}
CompletionEvent::Finish { reason } => finish_reason = reason,
CompletionEvent::Usage(_) => {}
} }
CompletionEvent::ReasoningDelta(t) => { }
send_chunk(
&cx, if cancel.is_cancelled() {
&session_id, stop_reason = StopReason::Cancelled;
SessionUpdate::AgentThoughtChunk(text_chunk(t)), // Persist any partial text so the next turn has context.
); if !assistant_text.is_empty() {
new_turns.push(Message {
role: Role::Assistant,
content: MessageContent::Text(assistant_text),
});
} }
CompletionEvent::Finish { reason } => { break;
stop_reason = map_finish_reason(reason.as_deref()); }
let has_tool_calls = !tool_buckets.is_empty();
if !has_tool_calls {
// Terminal turn: just text. Save and finish.
if !assistant_text.is_empty() {
new_turns.push(Message {
role: Role::Assistant,
content: MessageContent::Text(assistant_text),
});
} }
// Stage 2 ignores tool calls and usage. Tool calls land in stop_reason = map_finish_reason(finish_reason.as_deref());
// Stage 3; usage telemetry isn't in the (non-unstable) break;
// PromptResponse, so there's nothing to attach it to today. }
CompletionEvent::ToolCallStart { .. }
| CompletionEvent::ToolCallArgsDelta { .. } // Assistant turn carrying the tool calls.
| CompletionEvent::Usage(_) => {} let calls: Vec<ToolCall> = tool_buckets
.values()
.map(|b| ToolCall {
id: b.id.clone(),
name: b.name.clone(),
arguments: b.arguments.clone(),
})
.collect();
let assistant_turn = Message {
role: Role::Assistant,
content: MessageContent::ToolCalls {
text: (!assistant_text.is_empty()).then_some(assistant_text),
calls,
},
};
new_turns.push(assistant_turn.clone());
messages.push(assistant_turn);
// Refresh the mode in case the user toggled it during the
// streaming above (cheap — one mutex acquisition).
mode_id = session_arc.lock().await.mode_id.clone();
// Dispatch every tool call sequentially. Parallelism is
// tempting but would require Zed to handle interleaved
// permission prompts; serial is friendlier.
for bucket in tool_buckets.into_values() {
if cancel.is_cancelled() {
stop_reason = StopReason::Cancelled;
break;
}
let event = ToolCallEvent {
id: bucket.id,
name: bucket.name,
arguments: bucket.arguments,
};
let result =
dispatch_tool_call(&ops, &session_id, &mode_id, &cwd, event, &cancel).await;
let result_turn = Message {
role: Role::Tool,
content: MessageContent::ToolResult {
tool_call_id: result.tool_call_id,
content: result.content,
},
};
new_turns.push(result_turn.clone());
messages.push(result_turn);
}
if cancel.is_cancelled() {
stop_reason = StopReason::Cancelled;
break;
}
if round + 1 == MAX_TOOL_ROUNDS {
tracing::warn!(
session_id = %session_id.0,
rounds = MAX_TOOL_ROUNDS,
"hit MAX_TOOL_ROUNDS, returning MaxTurnRequests"
);
stop_reason = StopReason::MaxTurnRequests;
} }
} }
// If cancellation fired, override whatever finish reason we got
// (or didn't get). Per spec: a `session/cancel` MUST result in
// `StopReason::Cancelled`, regardless of partial output.
if cancel.is_cancelled() {
stop_reason = StopReason::Cancelled;
}
// Re-acquire the lock just long enough to persist the assistant
// turn (even partial output, so future turns have the context).
{ {
let mut state = session_arc.lock().await; let mut state = session_arc.lock().await;
if !assistant_text.is_empty() { state.history.extend(new_turns);
state.history.push(Message {
role: Role::Assistant,
content: MessageContent::Text(assistant_text),
});
}
} }
let _ = responder.respond(PromptResponse::new(stop_reason)); let _ = responder.respond(PromptResponse::new(stop_reason));
Ok(()) Ok(())
} }
/// Accumulator for one streamed tool call: the OpenAI wire format
/// sends `id` + `name` once (in the first chunk for that index) and
/// then argument bytes piecemeal. We gather them all before
/// dispatching.
#[derive(Debug, Default)]
struct ToolCallBucket {
id: String,
name: String,
arguments: String,
}
fn send_chunk(cx: &ConnectionTo<Client>, session_id: &SessionId, update: SessionUpdate) { fn send_chunk(cx: &ConnectionTo<Client>, session_id: &SessionId, update: SessionUpdate) {
let notif = SessionNotification::new(session_id.clone(), update); let notif = SessionNotification::new(session_id.clone(), update);
if let Err(e) = cx.send_notification(notif) { if let Err(e) = cx.send_notification(notif) {

View File

@@ -20,6 +20,8 @@ mod config;
mod prompt; mod prompt;
mod provider; mod provider;
mod session; mod session;
mod tool_runner;
mod tools;
use agent::Agent; use agent::Agent;
use config::{Config, EndpointConfig, WireApi}; use config::{Config, EndpointConfig, WireApi};

View File

@@ -18,12 +18,20 @@ use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use agent_client_protocol::schema::SessionId; use agent_client_protocol::schema::{SessionId, SessionModeId};
use tokio::sync::{Mutex, RwLock}; use tokio::sync::{Mutex, RwLock};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::provider::Message; use crate::provider::Message;
/// Mode id advertised as the gated default. Writes / bash prompt for
/// permission via `session/request_permission`.
pub const MODE_DEFAULT: &str = "default";
/// Mode id advertised as "auto-allow everything". Matches the
/// favorite name (`bypassPermissions`) Zed clients tend to reference.
pub const MODE_BYPASS: &str = "bypassPermissions";
/// State carried for a single ACP session. /// State carried for a single ACP session.
/// ///
/// Mutated under `Mutex<SessionState>`; never share a clone across /// Mutated under `Mutex<SessionState>`; never share a clone across
@@ -50,6 +58,11 @@ pub struct SessionState {
/// token is "spent" — firing it does nothing — which is fine, /// token is "spent" — firing it does nothing — which is fine,
/// `session/cancel` is a no-op when there's nothing to cancel. /// `session/cancel` is a no-op when there's nothing to cancel.
pub cancel: CancellationToken, pub cancel: CancellationToken,
/// Permission gating mode. Stage 3 advertises two ids in
/// `NewSessionResponse.modes`: [`MODE_DEFAULT`] (writes / bash
/// prompt the user) and [`MODE_BYPASS`] (auto-allow). Mutated by
/// `session/set_mode`.
pub mode_id: SessionModeId,
} }
impl SessionState { impl SessionState {
@@ -59,6 +72,7 @@ impl SessionState {
cwd, cwd,
model_id, model_id,
cancel: CancellationToken::new(), cancel: CancellationToken::new(),
mode_id: SessionModeId::new(MODE_DEFAULT),
} }
} }
} }

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,179 @@
//! Tool schemas sent to the upstream model on every completion.
//!
//! These are the OpenAI-function-style declarations the LLM sees in
//! `CompletionRequest.tools`; the runtime dispatch happens in
//! [`crate::tool_runner`]. Keeping declarations and execution in
//! separate modules makes it easy to add a tool without touching the
//! runner, and vice versa.
//!
//! Stage 3 ships five: filesystem read / write / edit, directory
//! listing, and `bash`. Image generation, web fetch, MCP-derived
//! tools, etc. are out of scope here.
use serde_json::json;
use crate::provider::ToolSpec;
pub const READ_FILE: &str = "read_file";
pub const WRITE_FILE: &str = "write_file";
pub const EDIT_FILE: &str = "edit_file";
pub const LIST_DIR: &str = "list_dir";
pub const BASH: &str = "bash";
/// Build the static tool list passed to the model on every prompt.
/// Cheap — the JSON Schema fragments are constructed each call but
/// the bodies are small constants. If this ever shows up in a
/// profile we can `OnceLock` the Vec.
pub fn all_tools() -> Vec<ToolSpec> {
vec![
ToolSpec {
name: READ_FILE.to_string(),
description: "Read the contents of a text file. Returns the file's text.".to_string(),
parameters: json!({
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the file."
},
"line": {
"type": "integer",
"description": "Optional 1-based line number to start reading from.",
"minimum": 1
},
"limit": {
"type": "integer",
"description": "Optional maximum number of lines to read.",
"minimum": 1
}
},
"required": ["path"],
"additionalProperties": false
}),
},
ToolSpec {
name: WRITE_FILE.to_string(),
description: "Write text content to a file, replacing any existing contents. \
Creates the file (and parent directories) if needed."
.to_string(),
parameters: json!({
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the file."
},
"content": {
"type": "string",
"description": "Full new contents of the file."
}
},
"required": ["path", "content"],
"additionalProperties": false
}),
},
ToolSpec {
name: EDIT_FILE.to_string(),
description: "Replace one exact substring in a file with another. \
Fails if `old_text` does not appear in the file, or appears more than once. \
Use multiple edit_file calls for multiple edits."
.to_string(),
parameters: json!({
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the file."
},
"old_text": {
"type": "string",
"description": "Exact text fragment to replace. Must be unique within the file."
},
"new_text": {
"type": "string",
"description": "Replacement text."
}
},
"required": ["path", "old_text", "new_text"],
"additionalProperties": false
}),
},
ToolSpec {
name: LIST_DIR.to_string(),
description:
"List the entries of a directory. Returns names and a (f|d|l) kind per entry."
.to_string(),
parameters: json!({
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the directory."
}
},
"required": ["path"],
"additionalProperties": false
}),
},
ToolSpec {
name: BASH.to_string(),
description: "Run a shell command via `sh -c`. \
Returns combined stdout+stderr and the exit status. \
The command runs in the session's working directory unless `cwd` is given."
.to_string(),
parameters: json!({
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Shell command line, evaluated by `sh -c`."
},
"cwd": {
"type": "string",
"description": "Optional absolute path to run the command from."
}
},
"required": ["command"],
"additionalProperties": false
}),
},
]
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn all_tools_has_five_named_entries() {
let tools = all_tools();
let names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect();
assert_eq!(
names,
vec![READ_FILE, WRITE_FILE, EDIT_FILE, LIST_DIR, BASH]
);
}
#[test]
fn every_tool_has_an_object_parameter_schema() {
for tool in all_tools() {
let ty = tool.parameters.get("type").and_then(|v| v.as_str());
assert_eq!(
ty,
Some("object"),
"tool {} parameters.type must be \"object\"",
tool.name
);
assert!(
tool.parameters.get("properties").is_some(),
"tool {} missing properties",
tool.name
);
assert!(
tool.parameters.get("required").is_some(),
"tool {} missing required list",
tool.name
);
}
}
}