From 96fc379893db94ea2edfcba60e8d9612bca0706b Mon Sep 17 00:00:00 2001 From: rob thijssen Date: Thu, 28 May 2026 09:46:22 +0300 Subject: [PATCH] feat(helexa-acp): wire ACP agent loop for text-only conversations Stage 2 lands the agent loop on top of the Stage 1 scaffold: session state with per-session cancellation, a system-prompt builder honouring HELEXA_ACP_SYSTEM_PROMPT_PATH / system_prompt_path TOML, and handlers for initialize / session/new / session/prompt / session/cancel that stream provider output back as session/update notifications. Verified end-to-end against cortex from Zed. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/helexa-acp/src/agent.rs | 549 ++++++++++++++++++ crates/helexa-acp/src/config.rs | 5 +- crates/helexa-acp/src/main.rs | 51 +- crates/helexa-acp/src/prompt.rs | 118 ++++ crates/helexa-acp/src/provider/mod.rs | 47 +- crates/helexa-acp/src/provider/openai_chat.rs | 3 +- crates/helexa-acp/src/session.rs | 165 ++++++ 7 files changed, 883 insertions(+), 55 deletions(-) create mode 100644 crates/helexa-acp/src/agent.rs create mode 100644 crates/helexa-acp/src/prompt.rs create mode 100644 crates/helexa-acp/src/session.rs diff --git a/crates/helexa-acp/src/agent.rs b/crates/helexa-acp/src/agent.rs new file mode 100644 index 0000000..c4e7af0 --- /dev/null +++ b/crates/helexa-acp/src/agent.rs @@ -0,0 +1,549 @@ +//! ACP agent loop — text-only (Stage 2). +//! +//! Handlers: +//! +//! | ACP method | Behaviour | +//! |-------------------|------------------------------------------------------------| +//! | `initialize` | echo client's protocol version, advertise capabilities | +//! | `session/new` | mint a session id, register state, return it | +//! | `session/prompt` | flatten user blocks → history, stream provider → updates | +//! | `session/cancel` | fire the session's cancellation token | +//! | (anything else) | "not implemented yet" error | +//! +//! Stage 3 adds tool calls; Stage 4 wires `session/set_model`; Stage 5 +//! flips on image content. Stage 2 deliberately answers the model-picker +//! and session-modes fields with `None` so editors render a single model +//! / single mode UI. + +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use agent_client_protocol::schema::{ + AgentCapabilities, CancelNotification, ContentBlock, InitializeRequest, InitializeResponse, + NewSessionRequest, NewSessionResponse, PromptCapabilities, PromptRequest, PromptResponse, + SessionId, SessionNotification, SessionUpdate, StopReason, TextContent, +}; +use agent_client_protocol::{Agent as AgentRole, Client, ConnectionTo, Dispatch, Stdio}; +use futures::StreamExt; +use tokio_util::sync::CancellationToken; + +use crate::config::{Config, parse_model_selector}; +use crate::prompt::build_system_prompt; +use crate::provider::{ + CompletionEvent, CompletionRequest, Message, MessageContent, Provider, Role, +}; +use crate::session::{self, SessionState, SessionStore}; + +/// Public entry point. Wraps an `Arc` so handlers can clone +/// it cheaply into every closure. +pub struct Agent { + inner: Arc, +} + +struct AgentInner { + /// Every successfully-built provider, indexed positionally. We look + /// providers up by name (`endpoint:` prefix) rather than by index. + providers: Vec>, + /// Name of the endpoint used when a request omits the + /// `endpoint:model` prefix. + default_endpoint_name: String, + /// Default model for the default endpoint, if configured. Required + /// for Stage 2 because session/set_model lands in Stage 4 — a + /// session with no model can't prompt anything. + default_model: Option, + sessions: SessionStore, + system_prompt_path: Option, + /// Monotonic counter for minting session ids. The wire format is + /// `hxa-{n}` — short, debuggable, and the protocol doesn't require + /// UUIDs for session ids (it only requires them for message ids + /// behind an unstable flag). + next_session_id: AtomicU64, +} + +impl Agent { + /// Construct an agent from a validated [`Config`] and the providers + /// that were successfully built for each endpoint. + pub fn new(cfg: &Config, providers: Vec>) -> anyhow::Result { + if providers.is_empty() { + anyhow::bail!("no usable providers"); + } + let default = cfg.default_endpoint(); + // The default endpoint's provider must have built successfully — + // otherwise we can't honour `model = "bare-model-id"` requests. + // (If only a non-default endpoint is usable, the operator should + // promote it to `default_endpoint` in the TOML.) + if !providers.iter().any(|p| p.name() == default.name) { + anyhow::bail!( + "default endpoint '{}' has no usable provider — check config", + default.name + ); + } + Ok(Self { + inner: Arc::new(AgentInner { + providers, + default_endpoint_name: default.name.clone(), + default_model: default.default_model.clone(), + sessions: session::new_store(), + system_prompt_path: cfg.system_prompt_path.clone(), + next_session_id: AtomicU64::new(1), + }), + }) + } + + /// Run the agent against an ACP transport (typically [`Stdio`]). + /// Returns when the transport closes or a handler errors. + pub async fn serve(self, transport: Stdio) -> agent_client_protocol::Result<()> { + let inner = self.inner; + AgentRole + .builder() + .name("helexa-acp") + .on_receive_request( + async move |req: InitializeRequest, responder, _cx| { + responder.respond(initialize_response(&req)) + }, + agent_client_protocol::on_receive_request!(), + ) + .on_receive_request( + { + let inner = inner.clone(); + async move |req: NewSessionRequest, responder, _cx| { + let result = handle_new_session(&inner, req).await; + match result { + Ok(resp) => responder.respond(resp), + Err(e) => responder.respond_with_internal_error(format!("{e:#}")), + } + } + }, + agent_client_protocol::on_receive_request!(), + ) + .on_receive_request( + { + let inner = inner.clone(); + async move |req: PromptRequest, responder, cx: ConnectionTo| { + spawn_prompt(inner.clone(), cx, req, responder) + } + }, + agent_client_protocol::on_receive_request!(), + ) + .on_receive_notification( + { + let inner = inner.clone(); + async move |notif: CancelNotification, _cx: ConnectionTo| { + handle_cancel(&inner, notif).await; + Ok(()) + } + }, + agent_client_protocol::on_receive_notification!(), + ) + .on_receive_dispatch( + async move |message: Dispatch, cx: ConnectionTo| { + tracing::warn!(method = ?message.method(), "unhandled ACP message"); + message.respond_with_error( + agent_client_protocol::util::internal_error("not implemented yet"), + cx, + ) + }, + agent_client_protocol::on_receive_dispatch!(), + ) + .connect_to(transport) + .await + } +} + +fn initialize_response(req: &InitializeRequest) -> InitializeResponse { + // Stage 2: text-only prompts. Image / audio / embedded resources + // flip on in later stages. + let prompt_caps = PromptCapabilities::default(); + InitializeResponse::new(req.protocol_version) + .agent_capabilities(AgentCapabilities::new().prompt_capabilities(prompt_caps)) +} + +async fn handle_new_session( + inner: &AgentInner, + req: NewSessionRequest, +) -> anyhow::Result { + if !req.cwd.is_absolute() { + anyhow::bail!("session cwd must be absolute, got {}", req.cwd.display()); + } + let model_id = inner + .default_model + .clone() + .ok_or_else(|| anyhow::anyhow!( + "default endpoint '{}' has no default_model — set one in config or wait for Stage 4 set_model", + inner.default_endpoint_name + ))?; + + let n = inner.next_session_id.fetch_add(1, Ordering::Relaxed); + let session_id = SessionId::new(format!("hxa-{n}")); + let cwd_display = req.cwd.display().to_string(); + let log_model = model_id.clone(); + let state = SessionState::new(req.cwd, model_id); + session::insert(&inner.sessions, session_id.clone(), state).await; + + tracing::info!( + session_id = %session_id.0, + model_id = %log_model, + cwd = %cwd_display, + "session created" + ); + Ok(NewSessionResponse::new(session_id)) +} + +async fn handle_cancel(inner: &AgentInner, notif: CancelNotification) { + let Some(state) = session::get(&inner.sessions, ¬if.session_id).await else { + tracing::debug!(session_id = %notif.session_id.0, "cancel for unknown session, ignoring"); + return; + }; + let cancel = state.lock().await.cancel.clone(); + tracing::info!(session_id = %notif.session_id.0, "cancellation requested"); + cancel.cancel(); +} + +/// Kick the prompt off on a spawned task so the event loop is free to +/// dispatch the matching `session/cancel`. The handler itself returns +/// `Ok(())` immediately (= `Handled::Yes`); the spawned task is what +/// eventually consumes `responder`. +fn spawn_prompt( + inner: Arc, + cx: ConnectionTo, + req: PromptRequest, + responder: agent_client_protocol::Responder, +) -> agent_client_protocol::Result<()> { + let task_cx = cx.clone(); + cx.spawn(async move { + if let Err(e) = drive_prompt(inner, task_cx, req, responder).await { + // `drive_prompt` already consumed the responder on the + // error paths it produces; this branch only fires if the + // task itself errored before reaching responder.respond. + // Log and swallow — propagating the error would tear down + // the whole connection, which is too violent for one + // failed prompt. + tracing::error!(error = %format!("{e:#}"), "prompt task failed"); + } + Ok(()) + })?; + Ok(()) +} + +async fn drive_prompt( + inner: Arc, + cx: ConnectionTo, + req: PromptRequest, + responder: agent_client_protocol::Responder, +) -> anyhow::Result<()> { + let session_id = req.session_id.clone(); + let Some(session_arc) = session::get(&inner.sessions, &session_id).await else { + let _ = + responder.respond_with_internal_error(format!("unknown session id {}", session_id.0)); + return Ok(()); + }; + + // Snapshot the inputs to the upstream call under the session + // lock, then drop the lock before any `await` that touches the + // network. We *also* install a fresh cancellation token so + // `session/cancel` can fire only this prompt. + let (mut history, model_id, cwd, cancel) = { + let mut state = session_arc.lock().await; + let cancel = CancellationToken::new(); + state.cancel = cancel.clone(); + let user_text = flatten_prompt(&req.prompt); + state.history.push(Message { + role: Role::User, + content: MessageContent::Text(user_text), + }); + ( + state.history.clone(), + state.model_id.clone(), + state.cwd.clone(), + cancel, + ) + }; + + let system_prompt = build_system_prompt(&cwd, inner.system_prompt_path.as_deref()) + .map_err(|e| anyhow::anyhow!("build system prompt: {e:#}"))?; + + let (provider, local_model) = + match resolve_provider(&inner.providers, &inner.default_endpoint_name, &model_id) { + Ok(pair) => pair, + Err(e) => { + let _ = responder.respond_with_internal_error(format!("{e:#}")); + return Ok(()); + } + }; + + tracing::info!( + session_id = %session_id.0, + endpoint = %provider.name(), + model = %local_model, + history_turns = history.len(), + "sending prompt upstream" + ); + + let mut messages = Vec::with_capacity(history.len() + 1); + messages.push(Message { + role: Role::System, + content: MessageContent::Text(system_prompt), + }); + messages.append(&mut history); + + let completion_req = CompletionRequest { + model: local_model, + messages, + tools: vec![], + temperature: None, + top_p: None, + max_tokens: None, + }; + + let stream_result = provider.complete(completion_req, cancel.clone()).await; + let mut stream = match stream_result { + Ok(s) => s, + Err(e) => { + let _ = responder + .respond_with_internal_error(format!("{} complete: {e:#}", provider.name())); + return Ok(()); + } + }; + + let mut assistant_text = String::new(); + let mut stop_reason = StopReason::EndTurn; + + while let Some(event) = stream.next().await { + let event = match event { + Ok(e) => e, + Err(e) => { + tracing::warn!(error = %format!("{e:#}"), "stream error; ending turn"); + break; + } + }; + match event { + CompletionEvent::TextDelta(t) => { + assistant_text.push_str(&t); + send_chunk( + &cx, + &session_id, + SessionUpdate::AgentMessageChunk(text_chunk(t)), + ); + } + CompletionEvent::ReasoningDelta(t) => { + send_chunk( + &cx, + &session_id, + SessionUpdate::AgentThoughtChunk(text_chunk(t)), + ); + } + CompletionEvent::Finish { reason } => { + stop_reason = map_finish_reason(reason.as_deref()); + } + // Stage 2 ignores tool calls and usage. Tool calls land in + // Stage 3; usage telemetry isn't in the (non-unstable) + // PromptResponse, so there's nothing to attach it to today. + CompletionEvent::ToolCallStart { .. } + | CompletionEvent::ToolCallArgsDelta { .. } + | CompletionEvent::Usage(_) => {} + } + } + + // If cancellation fired, override whatever finish reason we got + // (or didn't get). Per spec: a `session/cancel` MUST result in + // `StopReason::Cancelled`, regardless of partial output. + if cancel.is_cancelled() { + stop_reason = StopReason::Cancelled; + } + + // Re-acquire the lock just long enough to persist the assistant + // turn (even partial output, so future turns have the context). + { + let mut state = session_arc.lock().await; + if !assistant_text.is_empty() { + state.history.push(Message { + role: Role::Assistant, + content: MessageContent::Text(assistant_text), + }); + } + } + + let _ = responder.respond(PromptResponse::new(stop_reason)); + Ok(()) +} + +fn send_chunk(cx: &ConnectionTo, session_id: &SessionId, update: SessionUpdate) { + let notif = SessionNotification::new(session_id.clone(), update); + if let Err(e) = cx.send_notification(notif) { + tracing::warn!(error = %format!("{e:#}"), "failed to forward session update"); + } +} + +fn text_chunk(text: String) -> agent_client_protocol::schema::ContentChunk { + use agent_client_protocol::schema::ContentChunk; + ContentChunk::new(ContentBlock::Text(TextContent::new(text))) +} + +fn map_finish_reason(reason: Option<&str>) -> StopReason { + match reason { + Some("length") => StopReason::MaxTokens, + Some("refusal") => StopReason::Refusal, + // "stop", "tool_calls" (no tools in Stage 2 — degrade to + // EndTurn so we don't surface a bogus reason), missing, or + // anything else → EndTurn. + _ => StopReason::EndTurn, + } +} + +/// Pure helper — turn a prompt's ContentBlocks into the user-message +/// text that goes into history. Lifted out so unit tests don't need a +/// running runtime. +fn flatten_prompt(blocks: &[ContentBlock]) -> String { + let mut out = String::new(); + for block in blocks { + if !out.is_empty() { + out.push_str("\n\n"); + } + match block { + ContentBlock::Text(t) => out.push_str(&t.text), + ContentBlock::ResourceLink(link) => { + // Stage 2 has no fs access; surface the link as a + // textual reference so the model at least knows it + // was asked about something. + out.push_str(&format!("[resource link: {}]", link.uri)); + } + // Image / Audio / Resource: not advertised in + // PromptCapabilities for Stage 2; a well-behaved client + // shouldn't send these. If one does, drop and warn. + other => { + tracing::warn!(?other, "ignoring unsupported content block in Stage 2"); + } + } + } + out +} + +/// Pure helper — pick which provider handles a session's `model_id`. +/// Returns the matching provider plus the endpoint-local model id +/// (i.e. with any `endpoint:` prefix stripped). +fn resolve_provider( + providers: &[Arc], + default_endpoint: &str, + model_id: &str, +) -> anyhow::Result<(Arc, String)> { + let (endpoint_hint, local_model) = parse_model_selector(model_id); + let target_endpoint = endpoint_hint.unwrap_or(default_endpoint); + let provider = providers + .iter() + .find(|p| p.name() == target_endpoint) + .ok_or_else(|| anyhow::anyhow!("no provider for endpoint '{target_endpoint}'"))?; + Ok((provider.clone(), local_model.to_string())) +} + +#[cfg(test)] +mod tests { + use super::*; + use agent_client_protocol::schema::ResourceLink; + use async_trait::async_trait; + use futures::stream::BoxStream; + + // ── flatten_prompt ────────────────────────────────────────────── + + #[test] + fn flatten_empty_prompt_is_empty() { + assert_eq!(flatten_prompt(&[]), ""); + } + + #[test] + fn flatten_joins_text_blocks_with_blank_line() { + let blocks = vec![ + ContentBlock::Text(TextContent::new("first")), + ContentBlock::Text(TextContent::new("second")), + ]; + assert_eq!(flatten_prompt(&blocks), "first\n\nsecond"); + } + + #[test] + fn flatten_resource_link_becomes_reference_line() { + let blocks = vec![ContentBlock::ResourceLink(ResourceLink::new( + "readme", + "file:///tmp/x", + ))]; + assert_eq!(flatten_prompt(&blocks), "[resource link: file:///tmp/x]"); + } + + // ── resolve_provider ──────────────────────────────────────────── + + /// Minimal Provider stub; just records its name. The trait methods + /// aren't exercised by resolve_provider so we leave them + /// unimplemented. + struct StubProvider(&'static str); + + #[async_trait] + impl Provider for StubProvider { + fn name(&self) -> &str { + self.0 + } + async fn list_models(&self) -> anyhow::Result> { + unimplemented!() + } + async fn complete( + &self, + _request: CompletionRequest, + _cancel: CancellationToken, + ) -> anyhow::Result>> { + unimplemented!() + } + } + + fn providers() -> Vec> { + vec![ + Arc::new(StubProvider("helexa")), + Arc::new(StubProvider("openrouter")), + ] + } + + #[test] + fn bare_model_routes_to_default() { + let (p, m) = resolve_provider(&providers(), "helexa", "helexa/large").unwrap(); + assert_eq!(p.name(), "helexa"); + assert_eq!(m, "helexa/large"); + } + + #[test] + fn prefixed_model_routes_by_endpoint() { + let (p, m) = + resolve_provider(&providers(), "helexa", "openrouter:anthropic/claude-opus-4").unwrap(); + assert_eq!(p.name(), "openrouter"); + assert_eq!(m, "anthropic/claude-opus-4"); + } + + #[test] + fn unknown_endpoint_errors() { + // `Arc` doesn't impl Debug, which rules out + // `.unwrap_err()` (it requires T: Debug). Pattern-match instead. + match resolve_provider(&providers(), "helexa", "ghost:gpt-9") { + Ok(_) => panic!("expected error for unknown endpoint"), + Err(e) => assert!(format!("{e}").contains("ghost")), + } + } + + // ── map_finish_reason ─────────────────────────────────────────── + + #[test] + fn maps_known_finish_reasons() { + assert!(matches!( + map_finish_reason(Some("length")), + StopReason::MaxTokens + )); + assert!(matches!( + map_finish_reason(Some("refusal")), + StopReason::Refusal + )); + assert!(matches!( + map_finish_reason(Some("stop")), + StopReason::EndTurn + )); + assert!(matches!( + map_finish_reason(Some("tool_calls")), + StopReason::EndTurn + )); + assert!(matches!(map_finish_reason(None), StopReason::EndTurn)); + } +} diff --git a/crates/helexa-acp/src/config.rs b/crates/helexa-acp/src/config.rs index 9a40f99..4af61c7 100644 --- a/crates/helexa-acp/src/config.rs +++ b/crates/helexa-acp/src/config.rs @@ -134,7 +134,10 @@ impl EndpointConfig { join_segments(&self.base_url, &["chat", "completions"]) } - /// `{base_url}/models`. + /// `{base_url}/models`. Called from `Provider::list_models`, which + /// Stage 4 wires into the model-picker dropdown; until then it's + /// reachable code with no in-tree callers. + #[allow(dead_code)] pub fn models_url(&self) -> Url { join_segments(&self.base_url, &["models"]) } diff --git a/crates/helexa-acp/src/main.rs b/crates/helexa-acp/src/main.rs index 452b81b..abe40ed 100644 --- a/crates/helexa-acp/src/main.rs +++ b/crates/helexa-acp/src/main.rs @@ -2,23 +2,26 @@ //! setups (helexa, LM Studio, Ollama, OpenRouter, OpenAI, Anthropic, //! …) with a clean per-endpoint wire-format selector. //! -//! Speaks ACP over stdio to an editor client (Zed today). The -//! conversation is forwarded to one of the configured endpoints via -//! a wire-format-specific [`provider::Provider`] implementation. -//! The agent loop itself is provider-agnostic — adding e.g. an -//! Anthropic /v1/messages provider doesn't touch `agent.rs`. +//! Speaks ACP over stdio to an editor client (Zed today). Every +//! configured endpoint produces a wire-format-specific +//! [`provider::Provider`] implementation; the agent loop in +//! [`agent::Agent`] is provider-agnostic, so adding e.g. an Anthropic +//! /v1/messages provider doesn't touch `agent.rs`. //! //! Config: `$XDG_CONFIG_HOME/helexa-acp/config.toml` for the multi- //! endpoint case; env vars (`HELEXA_ACP_BASE_URL`, etc.) for the //! single-endpoint case when no config file exists. -use agent_client_protocol::schema::{AgentCapabilities, InitializeRequest, InitializeResponse}; -use agent_client_protocol::{Agent, Client, ConnectionTo, Dispatch, Result, Stdio}; +use agent_client_protocol::{Result, Stdio}; use std::sync::Arc; +mod agent; mod config; +mod prompt; mod provider; +mod session; +use agent::Agent; use config::{Config, EndpointConfig, WireApi}; use provider::{Provider, openai_chat::OpenAIChatProvider}; @@ -86,36 +89,8 @@ async fn main() -> Result<()> { } } } - if providers.is_empty() { - return Err(agent_client_protocol::util::internal_error( - "no usable endpoints — check config", - )); - } - Agent - .builder() - .name("helexa-acp") - .on_receive_request( - async move |initialize: InitializeRequest, responder, _connection| { - // Phase 1 wiring — capabilities only. Real session - // handling lands in the next iteration (agent.rs). - responder.respond( - InitializeResponse::new(initialize.protocol_version) - .agent_capabilities(AgentCapabilities::new()), - ) - }, - agent_client_protocol::on_receive_request!(), - ) - .on_receive_dispatch( - async move |message: Dispatch, cx: ConnectionTo| { - tracing::warn!(method = ?message.method(), "unhandled ACP message"); - message.respond_with_error( - agent_client_protocol::util::internal_error("not implemented yet"), - cx, - ) - }, - agent_client_protocol::on_receive_dispatch!(), - ) - .connect_to(Stdio::new()) - .await + let agent = Agent::new(&cfg, providers) + .map_err(|e| agent_client_protocol::util::internal_error(format!("agent: {e:#}")))?; + agent.serve(Stdio::new()).await } diff --git a/crates/helexa-acp/src/prompt.rs b/crates/helexa-acp/src/prompt.rs new file mode 100644 index 0000000..90520ce --- /dev/null +++ b/crates/helexa-acp/src/prompt.rs @@ -0,0 +1,118 @@ +//! System prompt assembly. +//! +//! Stage 2 ships a small built-in prompt aimed at coding assistance: +//! it tells the model the working directory and reminds it that no +//! tools are available yet. Users who want something different point +//! `HELEXA_ACP_SYSTEM_PROMPT_PATH` (env) or `system_prompt_path` (TOML) +//! at a file and we read that verbatim. The literal token `{cwd}` in +//! a user-supplied file is substituted with the session's working +//! directory so editor templates can include it without templating. + +use anyhow::Context; +use std::path::Path; + +const DEFAULT_PROMPT: &str = "\ +You are helexa-acp, a coding assistant. + +Working directory: {cwd} + +Stage 2 build: you have no tools available — answer with text only. +When you need to refer to files or directories, describe paths +relative to the working directory above. Be concise; the user is +reading your output in an editor pane."; + +/// Build the system prompt for a session. +/// +/// `cwd` is the session's working directory (substituted for `{cwd}` +/// in both the default prompt and any user-supplied template). +/// `override_path` is the user's `system_prompt_path` (TOML) or +/// `HELEXA_ACP_SYSTEM_PROMPT_PATH` (env) value, already resolved by +/// [`crate::config::Config`]. +pub fn build_system_prompt(cwd: &Path, override_path: Option<&Path>) -> anyhow::Result { + let template = match override_path { + Some(path) => std::fs::read_to_string(path) + .with_context(|| format!("read system prompt from {}", path.display()))?, + None => DEFAULT_PROMPT.to_string(), + }; + Ok(template.replace("{cwd}", &cwd.display().to_string())) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write; + + #[test] + fn default_prompt_substitutes_cwd() { + let prompt = build_system_prompt(Path::new("/home/me/proj"), None).unwrap(); + assert!( + prompt.contains("/home/me/proj"), + "cwd not interpolated: {prompt}" + ); + assert!(prompt.contains("helexa-acp")); + assert!( + !prompt.contains("{cwd}"), + "left-over placeholder in default prompt" + ); + } + + #[test] + fn override_path_is_read_and_templated() { + let mut tmp = tempfile_in_target("prompt.txt"); + tmp.write_all(b"custom prompt for {cwd} only").unwrap(); + tmp.flush().unwrap(); + + let path = tmp.path().to_path_buf(); + drop(tmp); + + let prompt = + build_system_prompt(Path::new("/etc"), Some(path.as_path())).expect("read override"); + assert_eq!(prompt, "custom prompt for /etc only"); + + let _ = std::fs::remove_file(&path); + } + + #[test] + fn missing_override_path_errors() { + let err = build_system_prompt( + Path::new("/tmp"), + Some(Path::new("/definitely/not/a/real/path")), + ) + .unwrap_err(); + assert!(format!("{err:#}").contains("read system prompt")); + } + + /// Tiny temp-file helper that doesn't pull in the `tempfile` crate. + /// Writes under `target/` so it's cleaned up by `cargo clean`. + fn tempfile_in_target(name: &str) -> TempHandle { + let base = std::env::var("CARGO_TARGET_TMPDIR") + .ok() + .map(std::path::PathBuf::from) + .unwrap_or_else(std::env::temp_dir); + let _ = std::fs::create_dir_all(&base); + let pid = std::process::id(); + let path = base.join(format!("helexa-acp-{pid}-{name}")); + let file = std::fs::File::create(&path).expect("create temp file"); + TempHandle { file, path } + } + + struct TempHandle { + file: std::fs::File, + path: std::path::PathBuf, + } + + impl TempHandle { + fn path(&self) -> &Path { + &self.path + } + } + + impl Write for TempHandle { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.file.write(buf) + } + fn flush(&mut self) -> std::io::Result<()> { + self.file.flush() + } + } +} diff --git a/crates/helexa-acp/src/provider/mod.rs b/crates/helexa-acp/src/provider/mod.rs index 2fc69e7..7aba5e6 100644 --- a/crates/helexa-acp/src/provider/mod.rs +++ b/crates/helexa-acp/src/provider/mod.rs @@ -11,14 +11,6 @@ //! Day-1 provider: [`openai_chat::OpenAIChatProvider`]. Day-N //! providers slot in without touching `agent.rs`. -// Many fields and variants in the public surface here aren't read yet: -// the agent loop that consumes `CompletionEvent`s and constructs -// `CompletionRequest`s lands in the next commit. They're not -// speculative — the unit tests in `provider::openai_chat::tests` -// already verify the encoder/decoder produces them. Once `agent.rs` -// arrives this allow comes off. -#![allow(dead_code)] - use async_trait::async_trait; use futures::stream::BoxStream; use serde::{Deserialize, Serialize}; @@ -38,8 +30,9 @@ pub trait Provider: Send + Sync { fn name(&self) -> &str; /// List models available at this endpoint. Used to build the - /// model-picker dropdown in editor clients. Should return quickly - /// (cache if necessary). + /// model-picker dropdown in editor clients (Stage 4). Should + /// return quickly (cache if necessary). + #[allow(dead_code)] async fn list_models(&self) -> anyhow::Result>; /// Run a chat completion. Returns a stream of provider-agnostic @@ -52,7 +45,10 @@ pub trait Provider: Send + Sync { ) -> anyhow::Result>>; } -/// One model exposed by a provider. +/// One model exposed by a provider. Constructed by `list_models` — +/// Stage 4 is when the agent loop starts consuming it for the +/// model-picker dropdown. +#[allow(dead_code)] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ModelInfo { pub id: String, @@ -91,19 +87,26 @@ pub enum Role { /// Tool result message. Provider impls turn this into whatever /// shape the upstream wire format wants (OpenAI uses /// `role: "tool"` + `tool_call_id`; Anthropic uses content blocks). + /// Stage 3 (tools) constructs this; Stage 2 never does. + #[allow(dead_code)] Tool, } #[derive(Debug, Clone)] pub enum MessageContent { Text(String), - /// Assistant turn that called one or more tools. + /// Assistant turn that called one or more tools. Stage 3 starts + /// constructing this when the provider stream yields a + /// `ToolCallStart` / `ToolCallArgsDelta` sequence. + #[allow(dead_code)] ToolCalls { /// Optional text the assistant said alongside the tool calls. text: Option, calls: Vec, }, /// Tool result. `tool_call_id` matches the assistant's call id. + /// Stage 3 constructs this after the tool runner finishes. + #[allow(dead_code)] ToolResult { tool_call_id: String, content: String, @@ -138,22 +141,36 @@ pub enum CompletionEvent { /// (e.g. Qwen3 with `` tags surfaced as a separate stream, /// or OpenAI reasoning models). ReasoningDelta(String), - /// A new tool call has started. + /// A new tool call has started. Stage 2 ignores the payload; the + /// agent loop in Stage 3 reads `index` to correlate with + /// [`Self::ToolCallArgsDelta`], `id` for the eventual tool-result + /// turn, and `name` to dispatch the runner. + #[allow(dead_code)] ToolCallStart { index: usize, id: String, name: String, }, /// More argument bytes for a tool call already announced via - /// [`Self::ToolCallStart`]. + /// [`Self::ToolCallStart`]. Stage 2 ignores; Stage 3 accumulates + /// the bytes by `index` until the call's arguments are complete. + #[allow(dead_code)] ToolCallArgsDelta { index: usize, args_delta: String }, /// Stream finished. Carries the upstream `finish_reason` if it /// gave one (`"stop"`, `"length"`, `"tool_calls"`, …). Finish { reason: Option }, - /// Final usage stats, if the provider supplied them. + /// Final usage stats, if the provider supplied them. Stage 2 + /// matches the variant to drop it; Stage 6b (token metrics) is + /// when the payload starts being read. + #[allow(dead_code)] Usage(UsageStats), } +/// Token accounting reported by the provider at the end of a stream. +/// Stage 2 doesn't surface usage anywhere — the stable `PromptResponse` +/// has no usage field, and the unstable variant is gated. Stage 6b +/// turns these on with Prometheus metrics. +#[allow(dead_code)] #[derive(Debug, Clone, Copy, Default)] pub struct UsageStats { pub prompt_tokens: u64, diff --git a/crates/helexa-acp/src/provider/openai_chat.rs b/crates/helexa-acp/src/provider/openai_chat.rs index c04236a..53271bb 100644 --- a/crates/helexa-acp/src/provider/openai_chat.rs +++ b/crates/helexa-acp/src/provider/openai_chat.rs @@ -15,7 +15,7 @@ use tokio_util::sync::CancellationToken; use super::{ CompletionEvent, CompletionRequest, Message, MessageContent, ModelInfo, Provider, Role, - ToolCall, ToolSpec, UsageStats, + ToolSpec, UsageStats, }; use crate::config::EndpointConfig; @@ -126,6 +126,7 @@ impl Provider for OpenAIChatProvider { #[cfg(test)] mod tests { use super::*; + use crate::provider::ToolCall; use futures::stream; use url::Url; diff --git a/crates/helexa-acp/src/session.rs b/crates/helexa-acp/src/session.rs new file mode 100644 index 0000000..18c5954 --- /dev/null +++ b/crates/helexa-acp/src/session.rs @@ -0,0 +1,165 @@ +//! Per-session state for the ACP agent loop. +//! +//! Concurrency: +//! +//! - [`SessionStore`] is an `Arc>>`. The map +//! itself is read-mostly: it changes only on `session/new` and never +//! shrinks during Stage 2, so an `RwLock` keeps concurrent reads +//! contention-free. +//! - Each session is wrapped in its own `Arc>`. Holding +//! one session's lock doesn't block requests against any other session, +//! which matters once a client opens multiple sessions in parallel. +//! +//! All operations hold a lock only long enough to copy out (or mutate) the +//! state they need — never across an `await` that drives the upstream +//! provider stream. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; + +use agent_client_protocol::schema::SessionId; +use tokio::sync::{Mutex, RwLock}; +use tokio_util::sync::CancellationToken; + +use crate::provider::Message; + +/// State carried for a single ACP session. +/// +/// Mutated under `Mutex`; never share a clone across +/// tasks expecting to see the same `cancel` token — clone the token +/// explicitly when handing it to the streaming task. +#[derive(Debug)] +pub struct SessionState { + /// Conversation history in chronological order (user / assistant + /// turns). The system prompt is *not* stored here — it's built + /// fresh per request so any cwd / config changes take effect. + pub history: Vec, + /// Working directory the client opened the session against. Used + /// by [`crate::prompt::build_system_prompt`] and (Stage 3) by + /// filesystem tools. + pub cwd: PathBuf, + /// Currently-selected model id. Format is either a bare model id + /// (resolved against the default endpoint) or `endpoint:model`. + /// Mutated by `session/set_model` in Stage 4; Stage 2 sets it + /// once at session creation and never changes it. + pub model_id: String, + /// Cancellation handle for the in-flight prompt, if any. A fresh + /// token is installed at the start of every `session/prompt` + /// request; `session/cancel` fires this one. Between prompts the + /// token is "spent" — firing it does nothing — which is fine, + /// `session/cancel` is a no-op when there's nothing to cancel. + pub cancel: CancellationToken, +} + +impl SessionState { + pub fn new(cwd: PathBuf, model_id: String) -> Self { + Self { + history: Vec::new(), + cwd, + model_id, + cancel: CancellationToken::new(), + } + } +} + +/// Concurrent map of live sessions. +/// +/// Cloning is cheap (`Arc` bump). Pass clones into every handler that +/// needs session access; never hold a clone across an `.await` that +/// could outlive the request. +pub type SessionStore = Arc>>>>; + +/// Fresh, empty session store. +pub fn new_store() -> SessionStore { + Arc::new(RwLock::new(HashMap::new())) +} + +/// Look up a session by id. Returns `None` if no such session is registered. +pub async fn get(store: &SessionStore, id: &SessionId) -> Option>> { + store.read().await.get(id).cloned() +} + +/// Register a fresh session. Overwrites any prior entry with the same id +/// (which should never happen — ids are uniquely generated by the agent). +pub async fn insert(store: &SessionStore, id: SessionId, state: SessionState) { + store.write().await.insert(id, Arc::new(Mutex::new(state))); +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::provider::{MessageContent, Role}; + + fn id(s: &str) -> SessionId { + SessionId::new(s) + } + + #[tokio::test] + async fn insert_then_get_round_trip() { + let store = new_store(); + let state = SessionState::new(PathBuf::from("/tmp"), "m".into()); + insert(&store, id("s1"), state).await; + let got = get(&store, &id("s1")).await.expect("session present"); + let locked = got.lock().await; + assert_eq!(locked.cwd, PathBuf::from("/tmp")); + assert_eq!(locked.model_id, "m"); + assert!(locked.history.is_empty()); + } + + #[tokio::test] + async fn missing_session_is_none() { + let store = new_store(); + assert!(get(&store, &id("nope")).await.is_none()); + } + + #[tokio::test] + async fn history_is_per_session() { + let store = new_store(); + insert( + &store, + id("a"), + SessionState::new(PathBuf::from("/a"), "m".into()), + ) + .await; + insert( + &store, + id("b"), + SessionState::new(PathBuf::from("/b"), "m".into()), + ) + .await; + + // Appending to a's history must not affect b's. + get(&store, &id("a")) + .await + .unwrap() + .lock() + .await + .history + .push(Message { + role: Role::User, + content: MessageContent::Text("hello".into()), + }); + + assert_eq!( + get(&store, &id("a")) + .await + .unwrap() + .lock() + .await + .history + .len(), + 1 + ); + assert_eq!( + get(&store, &id("b")) + .await + .unwrap() + .lock() + .await + .history + .len(), + 0 + ); + } +}