feat(helexa-acp): wire ACP agent loop for text-only conversations
Some checks failed
build-prerelease / Package helexa-neuron-ada RPM (push) Blocked by required conditions
build-prerelease / Package helexa-neuron-ampere RPM (push) Blocked by required conditions
build-prerelease / Package helexa-neuron-blackwell RPM (push) Blocked by required conditions
build-prerelease / Resolve version stamps (push) Successful in 41s
CI / Format (push) Successful in 38s
CI / Clippy (push) Successful in 2m35s
build-prerelease / Build cortex binary (push) Successful in 5m26s
CI / Test (push) Successful in 5m43s
build-prerelease / Build neuron-blackwell (push) Successful in 5m47s
CI / Build cortex SRPM (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Package cortex RPM (push) Successful in 1m23s
build-prerelease / Build neuron-ampere (push) Successful in 8m13s
build-prerelease / Build neuron-ada (push) Successful in 5m28s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Has been cancelled

Stage 2 lands the agent loop on top of the Stage 1 scaffold: session
state with per-session cancellation, a system-prompt builder honouring
HELEXA_ACP_SYSTEM_PROMPT_PATH / system_prompt_path TOML, and handlers
for initialize / session/new / session/prompt / session/cancel that
stream provider output back as session/update notifications. Verified
end-to-end against cortex from Zed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-28 09:46:22 +03:00
parent e267f583e1
commit 96fc379893
7 changed files with 883 additions and 55 deletions

View File

@@ -0,0 +1,549 @@
//! ACP agent loop — text-only (Stage 2).
//!
//! Handlers:
//!
//! | ACP method | Behaviour |
//! |-------------------|------------------------------------------------------------|
//! | `initialize` | echo client's protocol version, advertise capabilities |
//! | `session/new` | mint a session id, register state, return it |
//! | `session/prompt` | flatten user blocks → history, stream provider → updates |
//! | `session/cancel` | fire the session's cancellation token |
//! | (anything else) | "not implemented yet" error |
//!
//! Stage 3 adds tool calls; Stage 4 wires `session/set_model`; Stage 5
//! flips on image content. Stage 2 deliberately answers the model-picker
//! and session-modes fields with `None` so editors render a single model
//! / single mode UI.
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use agent_client_protocol::schema::{
AgentCapabilities, CancelNotification, ContentBlock, InitializeRequest, InitializeResponse,
NewSessionRequest, NewSessionResponse, PromptCapabilities, PromptRequest, PromptResponse,
SessionId, SessionNotification, SessionUpdate, StopReason, TextContent,
};
use agent_client_protocol::{Agent as AgentRole, Client, ConnectionTo, Dispatch, Stdio};
use futures::StreamExt;
use tokio_util::sync::CancellationToken;
use crate::config::{Config, parse_model_selector};
use crate::prompt::build_system_prompt;
use crate::provider::{
CompletionEvent, CompletionRequest, Message, MessageContent, Provider, Role,
};
use crate::session::{self, SessionState, SessionStore};
/// Public entry point. Wraps an `Arc<AgentInner>` so handlers can clone
/// it cheaply into every closure.
pub struct Agent {
inner: Arc<AgentInner>,
}
struct AgentInner {
/// Every successfully-built provider, indexed positionally. We look
/// providers up by name (`endpoint:` prefix) rather than by index.
providers: Vec<Arc<dyn Provider>>,
/// Name of the endpoint used when a request omits the
/// `endpoint:model` prefix.
default_endpoint_name: String,
/// Default model for the default endpoint, if configured. Required
/// for Stage 2 because session/set_model lands in Stage 4 — a
/// session with no model can't prompt anything.
default_model: Option<String>,
sessions: SessionStore,
system_prompt_path: Option<PathBuf>,
/// Monotonic counter for minting session ids. The wire format is
/// `hxa-{n}` — short, debuggable, and the protocol doesn't require
/// UUIDs for session ids (it only requires them for message ids
/// behind an unstable flag).
next_session_id: AtomicU64,
}
impl Agent {
/// Construct an agent from a validated [`Config`] and the providers
/// that were successfully built for each endpoint.
pub fn new(cfg: &Config, providers: Vec<Arc<dyn Provider>>) -> anyhow::Result<Self> {
if providers.is_empty() {
anyhow::bail!("no usable providers");
}
let default = cfg.default_endpoint();
// The default endpoint's provider must have built successfully —
// otherwise we can't honour `model = "bare-model-id"` requests.
// (If only a non-default endpoint is usable, the operator should
// promote it to `default_endpoint` in the TOML.)
if !providers.iter().any(|p| p.name() == default.name) {
anyhow::bail!(
"default endpoint '{}' has no usable provider — check config",
default.name
);
}
Ok(Self {
inner: Arc::new(AgentInner {
providers,
default_endpoint_name: default.name.clone(),
default_model: default.default_model.clone(),
sessions: session::new_store(),
system_prompt_path: cfg.system_prompt_path.clone(),
next_session_id: AtomicU64::new(1),
}),
})
}
/// Run the agent against an ACP transport (typically [`Stdio`]).
/// Returns when the transport closes or a handler errors.
pub async fn serve(self, transport: Stdio) -> agent_client_protocol::Result<()> {
let inner = self.inner;
AgentRole
.builder()
.name("helexa-acp")
.on_receive_request(
async move |req: InitializeRequest, responder, _cx| {
responder.respond(initialize_response(&req))
},
agent_client_protocol::on_receive_request!(),
)
.on_receive_request(
{
let inner = inner.clone();
async move |req: NewSessionRequest, responder, _cx| {
let result = handle_new_session(&inner, req).await;
match result {
Ok(resp) => responder.respond(resp),
Err(e) => responder.respond_with_internal_error(format!("{e:#}")),
}
}
},
agent_client_protocol::on_receive_request!(),
)
.on_receive_request(
{
let inner = inner.clone();
async move |req: PromptRequest, responder, cx: ConnectionTo<Client>| {
spawn_prompt(inner.clone(), cx, req, responder)
}
},
agent_client_protocol::on_receive_request!(),
)
.on_receive_notification(
{
let inner = inner.clone();
async move |notif: CancelNotification, _cx: ConnectionTo<Client>| {
handle_cancel(&inner, notif).await;
Ok(())
}
},
agent_client_protocol::on_receive_notification!(),
)
.on_receive_dispatch(
async move |message: Dispatch, cx: ConnectionTo<Client>| {
tracing::warn!(method = ?message.method(), "unhandled ACP message");
message.respond_with_error(
agent_client_protocol::util::internal_error("not implemented yet"),
cx,
)
},
agent_client_protocol::on_receive_dispatch!(),
)
.connect_to(transport)
.await
}
}
fn initialize_response(req: &InitializeRequest) -> InitializeResponse {
// Stage 2: text-only prompts. Image / audio / embedded resources
// flip on in later stages.
let prompt_caps = PromptCapabilities::default();
InitializeResponse::new(req.protocol_version)
.agent_capabilities(AgentCapabilities::new().prompt_capabilities(prompt_caps))
}
async fn handle_new_session(
inner: &AgentInner,
req: NewSessionRequest,
) -> anyhow::Result<NewSessionResponse> {
if !req.cwd.is_absolute() {
anyhow::bail!("session cwd must be absolute, got {}", req.cwd.display());
}
let model_id = inner
.default_model
.clone()
.ok_or_else(|| anyhow::anyhow!(
"default endpoint '{}' has no default_model — set one in config or wait for Stage 4 set_model",
inner.default_endpoint_name
))?;
let n = inner.next_session_id.fetch_add(1, Ordering::Relaxed);
let session_id = SessionId::new(format!("hxa-{n}"));
let cwd_display = req.cwd.display().to_string();
let log_model = model_id.clone();
let state = SessionState::new(req.cwd, model_id);
session::insert(&inner.sessions, session_id.clone(), state).await;
tracing::info!(
session_id = %session_id.0,
model_id = %log_model,
cwd = %cwd_display,
"session created"
);
Ok(NewSessionResponse::new(session_id))
}
async fn handle_cancel(inner: &AgentInner, notif: CancelNotification) {
let Some(state) = session::get(&inner.sessions, &notif.session_id).await else {
tracing::debug!(session_id = %notif.session_id.0, "cancel for unknown session, ignoring");
return;
};
let cancel = state.lock().await.cancel.clone();
tracing::info!(session_id = %notif.session_id.0, "cancellation requested");
cancel.cancel();
}
/// Kick the prompt off on a spawned task so the event loop is free to
/// dispatch the matching `session/cancel`. The handler itself returns
/// `Ok(())` immediately (= `Handled::Yes`); the spawned task is what
/// eventually consumes `responder`.
fn spawn_prompt(
inner: Arc<AgentInner>,
cx: ConnectionTo<Client>,
req: PromptRequest,
responder: agent_client_protocol::Responder<PromptResponse>,
) -> agent_client_protocol::Result<()> {
let task_cx = cx.clone();
cx.spawn(async move {
if let Err(e) = drive_prompt(inner, task_cx, req, responder).await {
// `drive_prompt` already consumed the responder on the
// error paths it produces; this branch only fires if the
// task itself errored before reaching responder.respond.
// Log and swallow — propagating the error would tear down
// the whole connection, which is too violent for one
// failed prompt.
tracing::error!(error = %format!("{e:#}"), "prompt task failed");
}
Ok(())
})?;
Ok(())
}
async fn drive_prompt(
inner: Arc<AgentInner>,
cx: ConnectionTo<Client>,
req: PromptRequest,
responder: agent_client_protocol::Responder<PromptResponse>,
) -> anyhow::Result<()> {
let session_id = req.session_id.clone();
let Some(session_arc) = session::get(&inner.sessions, &session_id).await else {
let _ =
responder.respond_with_internal_error(format!("unknown session id {}", session_id.0));
return Ok(());
};
// Snapshot the inputs to the upstream call under the session
// lock, then drop the lock before any `await` that touches the
// network. We *also* install a fresh cancellation token so
// `session/cancel` can fire only this prompt.
let (mut history, model_id, cwd, cancel) = {
let mut state = session_arc.lock().await;
let cancel = CancellationToken::new();
state.cancel = cancel.clone();
let user_text = flatten_prompt(&req.prompt);
state.history.push(Message {
role: Role::User,
content: MessageContent::Text(user_text),
});
(
state.history.clone(),
state.model_id.clone(),
state.cwd.clone(),
cancel,
)
};
let system_prompt = build_system_prompt(&cwd, inner.system_prompt_path.as_deref())
.map_err(|e| anyhow::anyhow!("build system prompt: {e:#}"))?;
let (provider, local_model) =
match resolve_provider(&inner.providers, &inner.default_endpoint_name, &model_id) {
Ok(pair) => pair,
Err(e) => {
let _ = responder.respond_with_internal_error(format!("{e:#}"));
return Ok(());
}
};
tracing::info!(
session_id = %session_id.0,
endpoint = %provider.name(),
model = %local_model,
history_turns = history.len(),
"sending prompt upstream"
);
let mut messages = Vec::with_capacity(history.len() + 1);
messages.push(Message {
role: Role::System,
content: MessageContent::Text(system_prompt),
});
messages.append(&mut history);
let completion_req = CompletionRequest {
model: local_model,
messages,
tools: vec![],
temperature: None,
top_p: None,
max_tokens: None,
};
let stream_result = provider.complete(completion_req, cancel.clone()).await;
let mut stream = match stream_result {
Ok(s) => s,
Err(e) => {
let _ = responder
.respond_with_internal_error(format!("{} complete: {e:#}", provider.name()));
return Ok(());
}
};
let mut assistant_text = String::new();
let mut stop_reason = StopReason::EndTurn;
while let Some(event) = stream.next().await {
let event = match event {
Ok(e) => e,
Err(e) => {
tracing::warn!(error = %format!("{e:#}"), "stream error; ending turn");
break;
}
};
match event {
CompletionEvent::TextDelta(t) => {
assistant_text.push_str(&t);
send_chunk(
&cx,
&session_id,
SessionUpdate::AgentMessageChunk(text_chunk(t)),
);
}
CompletionEvent::ReasoningDelta(t) => {
send_chunk(
&cx,
&session_id,
SessionUpdate::AgentThoughtChunk(text_chunk(t)),
);
}
CompletionEvent::Finish { reason } => {
stop_reason = map_finish_reason(reason.as_deref());
}
// Stage 2 ignores tool calls and usage. Tool calls land in
// Stage 3; usage telemetry isn't in the (non-unstable)
// PromptResponse, so there's nothing to attach it to today.
CompletionEvent::ToolCallStart { .. }
| CompletionEvent::ToolCallArgsDelta { .. }
| CompletionEvent::Usage(_) => {}
}
}
// If cancellation fired, override whatever finish reason we got
// (or didn't get). Per spec: a `session/cancel` MUST result in
// `StopReason::Cancelled`, regardless of partial output.
if cancel.is_cancelled() {
stop_reason = StopReason::Cancelled;
}
// Re-acquire the lock just long enough to persist the assistant
// turn (even partial output, so future turns have the context).
{
let mut state = session_arc.lock().await;
if !assistant_text.is_empty() {
state.history.push(Message {
role: Role::Assistant,
content: MessageContent::Text(assistant_text),
});
}
}
let _ = responder.respond(PromptResponse::new(stop_reason));
Ok(())
}
fn send_chunk(cx: &ConnectionTo<Client>, session_id: &SessionId, update: SessionUpdate) {
let notif = SessionNotification::new(session_id.clone(), update);
if let Err(e) = cx.send_notification(notif) {
tracing::warn!(error = %format!("{e:#}"), "failed to forward session update");
}
}
fn text_chunk(text: String) -> agent_client_protocol::schema::ContentChunk {
use agent_client_protocol::schema::ContentChunk;
ContentChunk::new(ContentBlock::Text(TextContent::new(text)))
}
fn map_finish_reason(reason: Option<&str>) -> StopReason {
match reason {
Some("length") => StopReason::MaxTokens,
Some("refusal") => StopReason::Refusal,
// "stop", "tool_calls" (no tools in Stage 2 — degrade to
// EndTurn so we don't surface a bogus reason), missing, or
// anything else → EndTurn.
_ => StopReason::EndTurn,
}
}
/// Pure helper — turn a prompt's ContentBlocks into the user-message
/// text that goes into history. Lifted out so unit tests don't need a
/// running runtime.
fn flatten_prompt(blocks: &[ContentBlock]) -> String {
let mut out = String::new();
for block in blocks {
if !out.is_empty() {
out.push_str("\n\n");
}
match block {
ContentBlock::Text(t) => out.push_str(&t.text),
ContentBlock::ResourceLink(link) => {
// Stage 2 has no fs access; surface the link as a
// textual reference so the model at least knows it
// was asked about something.
out.push_str(&format!("[resource link: {}]", link.uri));
}
// Image / Audio / Resource: not advertised in
// PromptCapabilities for Stage 2; a well-behaved client
// shouldn't send these. If one does, drop and warn.
other => {
tracing::warn!(?other, "ignoring unsupported content block in Stage 2");
}
}
}
out
}
/// Pure helper — pick which provider handles a session's `model_id`.
/// Returns the matching provider plus the endpoint-local model id
/// (i.e. with any `endpoint:` prefix stripped).
fn resolve_provider(
providers: &[Arc<dyn Provider>],
default_endpoint: &str,
model_id: &str,
) -> anyhow::Result<(Arc<dyn Provider>, String)> {
let (endpoint_hint, local_model) = parse_model_selector(model_id);
let target_endpoint = endpoint_hint.unwrap_or(default_endpoint);
let provider = providers
.iter()
.find(|p| p.name() == target_endpoint)
.ok_or_else(|| anyhow::anyhow!("no provider for endpoint '{target_endpoint}'"))?;
Ok((provider.clone(), local_model.to_string()))
}
#[cfg(test)]
mod tests {
use super::*;
use agent_client_protocol::schema::ResourceLink;
use async_trait::async_trait;
use futures::stream::BoxStream;
// ── flatten_prompt ──────────────────────────────────────────────
#[test]
fn flatten_empty_prompt_is_empty() {
assert_eq!(flatten_prompt(&[]), "");
}
#[test]
fn flatten_joins_text_blocks_with_blank_line() {
let blocks = vec![
ContentBlock::Text(TextContent::new("first")),
ContentBlock::Text(TextContent::new("second")),
];
assert_eq!(flatten_prompt(&blocks), "first\n\nsecond");
}
#[test]
fn flatten_resource_link_becomes_reference_line() {
let blocks = vec![ContentBlock::ResourceLink(ResourceLink::new(
"readme",
"file:///tmp/x",
))];
assert_eq!(flatten_prompt(&blocks), "[resource link: file:///tmp/x]");
}
// ── resolve_provider ────────────────────────────────────────────
/// Minimal Provider stub; just records its name. The trait methods
/// aren't exercised by resolve_provider so we leave them
/// unimplemented.
struct StubProvider(&'static str);
#[async_trait]
impl Provider for StubProvider {
fn name(&self) -> &str {
self.0
}
async fn list_models(&self) -> anyhow::Result<Vec<crate::provider::ModelInfo>> {
unimplemented!()
}
async fn complete(
&self,
_request: CompletionRequest,
_cancel: CancellationToken,
) -> anyhow::Result<BoxStream<'static, anyhow::Result<CompletionEvent>>> {
unimplemented!()
}
}
fn providers() -> Vec<Arc<dyn Provider>> {
vec![
Arc::new(StubProvider("helexa")),
Arc::new(StubProvider("openrouter")),
]
}
#[test]
fn bare_model_routes_to_default() {
let (p, m) = resolve_provider(&providers(), "helexa", "helexa/large").unwrap();
assert_eq!(p.name(), "helexa");
assert_eq!(m, "helexa/large");
}
#[test]
fn prefixed_model_routes_by_endpoint() {
let (p, m) =
resolve_provider(&providers(), "helexa", "openrouter:anthropic/claude-opus-4").unwrap();
assert_eq!(p.name(), "openrouter");
assert_eq!(m, "anthropic/claude-opus-4");
}
#[test]
fn unknown_endpoint_errors() {
// `Arc<dyn Provider>` doesn't impl Debug, which rules out
// `.unwrap_err()` (it requires T: Debug). Pattern-match instead.
match resolve_provider(&providers(), "helexa", "ghost:gpt-9") {
Ok(_) => panic!("expected error for unknown endpoint"),
Err(e) => assert!(format!("{e}").contains("ghost")),
}
}
// ── map_finish_reason ───────────────────────────────────────────
#[test]
fn maps_known_finish_reasons() {
assert!(matches!(
map_finish_reason(Some("length")),
StopReason::MaxTokens
));
assert!(matches!(
map_finish_reason(Some("refusal")),
StopReason::Refusal
));
assert!(matches!(
map_finish_reason(Some("stop")),
StopReason::EndTurn
));
assert!(matches!(
map_finish_reason(Some("tool_calls")),
StopReason::EndTurn
));
assert!(matches!(map_finish_reason(None), StopReason::EndTurn));
}
}

View File

@@ -134,7 +134,10 @@ impl EndpointConfig {
join_segments(&self.base_url, &["chat", "completions"])
}
/// `{base_url}/models`.
/// `{base_url}/models`. Called from `Provider::list_models`, which
/// Stage 4 wires into the model-picker dropdown; until then it's
/// reachable code with no in-tree callers.
#[allow(dead_code)]
pub fn models_url(&self) -> Url {
join_segments(&self.base_url, &["models"])
}

View File

@@ -2,23 +2,26 @@
//! setups (helexa, LM Studio, Ollama, OpenRouter, OpenAI, Anthropic,
//! …) with a clean per-endpoint wire-format selector.
//!
//! Speaks ACP over stdio to an editor client (Zed today). The
//! conversation is forwarded to one of the configured endpoints via
//! a wire-format-specific [`provider::Provider`] implementation.
//! The agent loop itself is provider-agnostic adding e.g. an
//! Anthropic /v1/messages provider doesn't touch `agent.rs`.
//! Speaks ACP over stdio to an editor client (Zed today). Every
//! configured endpoint produces a wire-format-specific
//! [`provider::Provider`] implementation; the agent loop in
//! [`agent::Agent`] is provider-agnostic, so adding e.g. an Anthropic
//! /v1/messages provider doesn't touch `agent.rs`.
//!
//! Config: `$XDG_CONFIG_HOME/helexa-acp/config.toml` for the multi-
//! endpoint case; env vars (`HELEXA_ACP_BASE_URL`, etc.) for the
//! single-endpoint case when no config file exists.
use agent_client_protocol::schema::{AgentCapabilities, InitializeRequest, InitializeResponse};
use agent_client_protocol::{Agent, Client, ConnectionTo, Dispatch, Result, Stdio};
use agent_client_protocol::{Result, Stdio};
use std::sync::Arc;
mod agent;
mod config;
mod prompt;
mod provider;
mod session;
use agent::Agent;
use config::{Config, EndpointConfig, WireApi};
use provider::{Provider, openai_chat::OpenAIChatProvider};
@@ -86,36 +89,8 @@ async fn main() -> Result<()> {
}
}
}
if providers.is_empty() {
return Err(agent_client_protocol::util::internal_error(
"no usable endpoints — check config",
));
}
Agent
.builder()
.name("helexa-acp")
.on_receive_request(
async move |initialize: InitializeRequest, responder, _connection| {
// Phase 1 wiring — capabilities only. Real session
// handling lands in the next iteration (agent.rs).
responder.respond(
InitializeResponse::new(initialize.protocol_version)
.agent_capabilities(AgentCapabilities::new()),
)
},
agent_client_protocol::on_receive_request!(),
)
.on_receive_dispatch(
async move |message: Dispatch, cx: ConnectionTo<Client>| {
tracing::warn!(method = ?message.method(), "unhandled ACP message");
message.respond_with_error(
agent_client_protocol::util::internal_error("not implemented yet"),
cx,
)
},
agent_client_protocol::on_receive_dispatch!(),
)
.connect_to(Stdio::new())
.await
let agent = Agent::new(&cfg, providers)
.map_err(|e| agent_client_protocol::util::internal_error(format!("agent: {e:#}")))?;
agent.serve(Stdio::new()).await
}

View File

@@ -0,0 +1,118 @@
//! System prompt assembly.
//!
//! Stage 2 ships a small built-in prompt aimed at coding assistance:
//! it tells the model the working directory and reminds it that no
//! tools are available yet. Users who want something different point
//! `HELEXA_ACP_SYSTEM_PROMPT_PATH` (env) or `system_prompt_path` (TOML)
//! at a file and we read that verbatim. The literal token `{cwd}` in
//! a user-supplied file is substituted with the session's working
//! directory so editor templates can include it without templating.
use anyhow::Context;
use std::path::Path;
const DEFAULT_PROMPT: &str = "\
You are helexa-acp, a coding assistant.
Working directory: {cwd}
Stage 2 build: you have no tools available — answer with text only.
When you need to refer to files or directories, describe paths
relative to the working directory above. Be concise; the user is
reading your output in an editor pane.";
/// Build the system prompt for a session.
///
/// `cwd` is the session's working directory (substituted for `{cwd}`
/// in both the default prompt and any user-supplied template).
/// `override_path` is the user's `system_prompt_path` (TOML) or
/// `HELEXA_ACP_SYSTEM_PROMPT_PATH` (env) value, already resolved by
/// [`crate::config::Config`].
pub fn build_system_prompt(cwd: &Path, override_path: Option<&Path>) -> anyhow::Result<String> {
let template = match override_path {
Some(path) => std::fs::read_to_string(path)
.with_context(|| format!("read system prompt from {}", path.display()))?,
None => DEFAULT_PROMPT.to_string(),
};
Ok(template.replace("{cwd}", &cwd.display().to_string()))
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
#[test]
fn default_prompt_substitutes_cwd() {
let prompt = build_system_prompt(Path::new("/home/me/proj"), None).unwrap();
assert!(
prompt.contains("/home/me/proj"),
"cwd not interpolated: {prompt}"
);
assert!(prompt.contains("helexa-acp"));
assert!(
!prompt.contains("{cwd}"),
"left-over placeholder in default prompt"
);
}
#[test]
fn override_path_is_read_and_templated() {
let mut tmp = tempfile_in_target("prompt.txt");
tmp.write_all(b"custom prompt for {cwd} only").unwrap();
tmp.flush().unwrap();
let path = tmp.path().to_path_buf();
drop(tmp);
let prompt =
build_system_prompt(Path::new("/etc"), Some(path.as_path())).expect("read override");
assert_eq!(prompt, "custom prompt for /etc only");
let _ = std::fs::remove_file(&path);
}
#[test]
fn missing_override_path_errors() {
let err = build_system_prompt(
Path::new("/tmp"),
Some(Path::new("/definitely/not/a/real/path")),
)
.unwrap_err();
assert!(format!("{err:#}").contains("read system prompt"));
}
/// Tiny temp-file helper that doesn't pull in the `tempfile` crate.
/// Writes under `target/` so it's cleaned up by `cargo clean`.
fn tempfile_in_target(name: &str) -> TempHandle {
let base = std::env::var("CARGO_TARGET_TMPDIR")
.ok()
.map(std::path::PathBuf::from)
.unwrap_or_else(std::env::temp_dir);
let _ = std::fs::create_dir_all(&base);
let pid = std::process::id();
let path = base.join(format!("helexa-acp-{pid}-{name}"));
let file = std::fs::File::create(&path).expect("create temp file");
TempHandle { file, path }
}
struct TempHandle {
file: std::fs::File,
path: std::path::PathBuf,
}
impl TempHandle {
fn path(&self) -> &Path {
&self.path
}
}
impl Write for TempHandle {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.file.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.file.flush()
}
}
}

View File

@@ -11,14 +11,6 @@
//! Day-1 provider: [`openai_chat::OpenAIChatProvider`]. Day-N
//! providers slot in without touching `agent.rs`.
// Many fields and variants in the public surface here aren't read yet:
// the agent loop that consumes `CompletionEvent`s and constructs
// `CompletionRequest`s lands in the next commit. They're not
// speculative — the unit tests in `provider::openai_chat::tests`
// already verify the encoder/decoder produces them. Once `agent.rs`
// arrives this allow comes off.
#![allow(dead_code)]
use async_trait::async_trait;
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};
@@ -38,8 +30,9 @@ pub trait Provider: Send + Sync {
fn name(&self) -> &str;
/// List models available at this endpoint. Used to build the
/// model-picker dropdown in editor clients. Should return quickly
/// (cache if necessary).
/// model-picker dropdown in editor clients (Stage 4). Should
/// return quickly (cache if necessary).
#[allow(dead_code)]
async fn list_models(&self) -> anyhow::Result<Vec<ModelInfo>>;
/// Run a chat completion. Returns a stream of provider-agnostic
@@ -52,7 +45,10 @@ pub trait Provider: Send + Sync {
) -> anyhow::Result<BoxStream<'static, anyhow::Result<CompletionEvent>>>;
}
/// One model exposed by a provider.
/// One model exposed by a provider. Constructed by `list_models` —
/// Stage 4 is when the agent loop starts consuming it for the
/// model-picker dropdown.
#[allow(dead_code)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelInfo {
pub id: String,
@@ -91,19 +87,26 @@ pub enum Role {
/// Tool result message. Provider impls turn this into whatever
/// shape the upstream wire format wants (OpenAI uses
/// `role: "tool"` + `tool_call_id`; Anthropic uses content blocks).
/// Stage 3 (tools) constructs this; Stage 2 never does.
#[allow(dead_code)]
Tool,
}
#[derive(Debug, Clone)]
pub enum MessageContent {
Text(String),
/// Assistant turn that called one or more tools.
/// Assistant turn that called one or more tools. Stage 3 starts
/// constructing this when the provider stream yields a
/// `ToolCallStart` / `ToolCallArgsDelta` sequence.
#[allow(dead_code)]
ToolCalls {
/// Optional text the assistant said alongside the tool calls.
text: Option<String>,
calls: Vec<ToolCall>,
},
/// Tool result. `tool_call_id` matches the assistant's call id.
/// Stage 3 constructs this after the tool runner finishes.
#[allow(dead_code)]
ToolResult {
tool_call_id: String,
content: String,
@@ -138,22 +141,36 @@ pub enum CompletionEvent {
/// (e.g. Qwen3 with `<think>` tags surfaced as a separate stream,
/// or OpenAI reasoning models).
ReasoningDelta(String),
/// A new tool call has started.
/// A new tool call has started. Stage 2 ignores the payload; the
/// agent loop in Stage 3 reads `index` to correlate with
/// [`Self::ToolCallArgsDelta`], `id` for the eventual tool-result
/// turn, and `name` to dispatch the runner.
#[allow(dead_code)]
ToolCallStart {
index: usize,
id: String,
name: String,
},
/// More argument bytes for a tool call already announced via
/// [`Self::ToolCallStart`].
/// [`Self::ToolCallStart`]. Stage 2 ignores; Stage 3 accumulates
/// the bytes by `index` until the call's arguments are complete.
#[allow(dead_code)]
ToolCallArgsDelta { index: usize, args_delta: String },
/// Stream finished. Carries the upstream `finish_reason` if it
/// gave one (`"stop"`, `"length"`, `"tool_calls"`, …).
Finish { reason: Option<String> },
/// Final usage stats, if the provider supplied them.
/// Final usage stats, if the provider supplied them. Stage 2
/// matches the variant to drop it; Stage 6b (token metrics) is
/// when the payload starts being read.
#[allow(dead_code)]
Usage(UsageStats),
}
/// Token accounting reported by the provider at the end of a stream.
/// Stage 2 doesn't surface usage anywhere — the stable `PromptResponse`
/// has no usage field, and the unstable variant is gated. Stage 6b
/// turns these on with Prometheus metrics.
#[allow(dead_code)]
#[derive(Debug, Clone, Copy, Default)]
pub struct UsageStats {
pub prompt_tokens: u64,

View File

@@ -15,7 +15,7 @@ use tokio_util::sync::CancellationToken;
use super::{
CompletionEvent, CompletionRequest, Message, MessageContent, ModelInfo, Provider, Role,
ToolCall, ToolSpec, UsageStats,
ToolSpec, UsageStats,
};
use crate::config::EndpointConfig;
@@ -126,6 +126,7 @@ impl Provider for OpenAIChatProvider {
#[cfg(test)]
mod tests {
use super::*;
use crate::provider::ToolCall;
use futures::stream;
use url::Url;

View File

@@ -0,0 +1,165 @@
//! Per-session state for the ACP agent loop.
//!
//! Concurrency:
//!
//! - [`SessionStore`] is an `Arc<RwLock<HashMap<SessionId, …>>>`. The map
//! itself is read-mostly: it changes only on `session/new` and never
//! shrinks during Stage 2, so an `RwLock` keeps concurrent reads
//! contention-free.
//! - Each session is wrapped in its own `Arc<Mutex<SessionState>>`. Holding
//! one session's lock doesn't block requests against any other session,
//! which matters once a client opens multiple sessions in parallel.
//!
//! All operations hold a lock only long enough to copy out (or mutate) the
//! state they need — never across an `await` that drives the upstream
//! provider stream.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use agent_client_protocol::schema::SessionId;
use tokio::sync::{Mutex, RwLock};
use tokio_util::sync::CancellationToken;
use crate::provider::Message;
/// State carried for a single ACP session.
///
/// Mutated under `Mutex<SessionState>`; never share a clone across
/// tasks expecting to see the same `cancel` token — clone the token
/// explicitly when handing it to the streaming task.
#[derive(Debug)]
pub struct SessionState {
/// Conversation history in chronological order (user / assistant
/// turns). The system prompt is *not* stored here — it's built
/// fresh per request so any cwd / config changes take effect.
pub history: Vec<Message>,
/// Working directory the client opened the session against. Used
/// by [`crate::prompt::build_system_prompt`] and (Stage 3) by
/// filesystem tools.
pub cwd: PathBuf,
/// Currently-selected model id. Format is either a bare model id
/// (resolved against the default endpoint) or `endpoint:model`.
/// Mutated by `session/set_model` in Stage 4; Stage 2 sets it
/// once at session creation and never changes it.
pub model_id: String,
/// Cancellation handle for the in-flight prompt, if any. A fresh
/// token is installed at the start of every `session/prompt`
/// request; `session/cancel` fires this one. Between prompts the
/// token is "spent" — firing it does nothing — which is fine,
/// `session/cancel` is a no-op when there's nothing to cancel.
pub cancel: CancellationToken,
}
impl SessionState {
pub fn new(cwd: PathBuf, model_id: String) -> Self {
Self {
history: Vec::new(),
cwd,
model_id,
cancel: CancellationToken::new(),
}
}
}
/// Concurrent map of live sessions.
///
/// Cloning is cheap (`Arc` bump). Pass clones into every handler that
/// needs session access; never hold a clone across an `.await` that
/// could outlive the request.
pub type SessionStore = Arc<RwLock<HashMap<SessionId, Arc<Mutex<SessionState>>>>>;
/// Fresh, empty session store.
pub fn new_store() -> SessionStore {
Arc::new(RwLock::new(HashMap::new()))
}
/// Look up a session by id. Returns `None` if no such session is registered.
pub async fn get(store: &SessionStore, id: &SessionId) -> Option<Arc<Mutex<SessionState>>> {
store.read().await.get(id).cloned()
}
/// Register a fresh session. Overwrites any prior entry with the same id
/// (which should never happen — ids are uniquely generated by the agent).
pub async fn insert(store: &SessionStore, id: SessionId, state: SessionState) {
store.write().await.insert(id, Arc::new(Mutex::new(state)));
}
#[cfg(test)]
mod tests {
use super::*;
use crate::provider::{MessageContent, Role};
fn id(s: &str) -> SessionId {
SessionId::new(s)
}
#[tokio::test]
async fn insert_then_get_round_trip() {
let store = new_store();
let state = SessionState::new(PathBuf::from("/tmp"), "m".into());
insert(&store, id("s1"), state).await;
let got = get(&store, &id("s1")).await.expect("session present");
let locked = got.lock().await;
assert_eq!(locked.cwd, PathBuf::from("/tmp"));
assert_eq!(locked.model_id, "m");
assert!(locked.history.is_empty());
}
#[tokio::test]
async fn missing_session_is_none() {
let store = new_store();
assert!(get(&store, &id("nope")).await.is_none());
}
#[tokio::test]
async fn history_is_per_session() {
let store = new_store();
insert(
&store,
id("a"),
SessionState::new(PathBuf::from("/a"), "m".into()),
)
.await;
insert(
&store,
id("b"),
SessionState::new(PathBuf::from("/b"), "m".into()),
)
.await;
// Appending to a's history must not affect b's.
get(&store, &id("a"))
.await
.unwrap()
.lock()
.await
.history
.push(Message {
role: Role::User,
content: MessageContent::Text("hello".into()),
});
assert_eq!(
get(&store, &id("a"))
.await
.unwrap()
.lock()
.await
.history
.len(),
1
);
assert_eq!(
get(&store, &id("b"))
.await
.unwrap()
.lock()
.await
.history
.len(),
0
);
}
}