diff --git a/crates/helexa-acp/src/agent.rs b/crates/helexa-acp/src/agent.rs index c3ac51a..f588413 100644 --- a/crates/helexa-acp/src/agent.rs +++ b/crates/helexa-acp/src/agent.rs @@ -19,9 +19,10 @@ use std::sync::atomic::{AtomicU64, Ordering}; use agent_client_protocol::schema::{ AgentCapabilities, CancelNotification, ContentBlock, InitializeRequest, InitializeResponse, - NewSessionRequest, NewSessionResponse, PromptCapabilities, PromptRequest, PromptResponse, - SessionId, SessionMode, SessionModeId, SessionModeState, SessionNotification, SessionUpdate, - SetSessionModeRequest, SetSessionModeResponse, StopReason, TextContent, + LoadSessionRequest, LoadSessionResponse, NewSessionRequest, NewSessionResponse, + PromptCapabilities, PromptRequest, PromptResponse, SessionId, SessionMode, SessionModeId, + SessionModeState, SessionNotification, SessionUpdate, SetSessionModeRequest, + SetSessionModeResponse, StopReason, TextContent, }; use agent_client_protocol::{Agent as AgentRole, Client, ConnectionTo, Dispatch, Stdio}; use futures::StreamExt; @@ -34,6 +35,7 @@ use crate::provider::{ CompletionEvent, CompletionRequest, Message, MessageContent, Provider, Role, ToolCall, }; use crate::session::{self, MODE_BYPASS, MODE_DEFAULT, SessionState, SessionStore}; +use crate::store::{self, PersistedSession}; use crate::tool_runner::{AcpClientOps, ToolCallEvent, dispatch_tool_call}; use crate::tools; @@ -135,6 +137,20 @@ impl Agent { }, agent_client_protocol::on_receive_request!(), ) + .on_receive_request( + { + let inner = inner.clone(); + async move |req: LoadSessionRequest, responder, _cx| match handle_load_session( + &inner, req, + ) + .await + { + Ok(resp) => responder.respond(resp), + Err(e) => responder.respond_with_internal_error(format!("{e:#}")), + } + }, + agent_client_protocol::on_receive_request!(), + ) .on_receive_request( { let inner = inner.clone(); @@ -205,8 +221,16 @@ fn initialize_response(req: &InitializeRequest) -> InitializeResponse { // Stage 2: text-only prompts. Image / audio / embedded resources // flip on in later stages. let prompt_caps = PromptCapabilities::default(); - InitializeResponse::new(req.protocol_version) - .agent_capabilities(AgentCapabilities::new().prompt_capabilities(prompt_caps)) + InitializeResponse::new(req.protocol_version).agent_capabilities( + AgentCapabilities::new() + .prompt_capabilities(prompt_caps) + // Stage 3b: `session/load` is implemented. Persisted + // sessions live on disk under + // `$XDG_DATA_HOME/helexa-acp/sessions/`; clients (Zed) + // can hand us back any session_id we previously + // minted to resume the conversation. + .load_session(true), + ) } async fn handle_new_session( @@ -240,6 +264,57 @@ async fn handle_new_session( Ok(NewSessionResponse::new(session_id).modes(default_mode_state())) } +/// Rehydrate a session from disk. +/// +/// Behaviour: +/// +/// - Reads the persisted JSON from +/// `$XDG_DATA_HOME/helexa-acp/sessions/{id}.json`. Missing file → +/// error (Zed falls back to `session/new`). +/// - Overwrites the persisted `cwd` with the one the client just +/// sent. The user may have moved or symlinked the repo since +/// the session was first created; the *current* cwd is the +/// right place to root subsequent tool dispatches. +/// - Materialises an in-memory `SessionState` with the persisted +/// model + mode + history. +/// - Returns `LoadSessionResponse` carrying the same mode list as +/// `session/new`, plus the persisted `current_mode_id` so the +/// client renders the mode dropdown in the correct state. +async fn handle_load_session( + inner: &AgentInner, + req: LoadSessionRequest, +) -> anyhow::Result { + if !req.cwd.is_absolute() { + anyhow::bail!("session cwd must be absolute, got {}", req.cwd.display()); + } + let persisted = store::load(&req.session_id)?; + // Snapshot the values we need for logging + the response + // before we move pieces of `persisted` into `state`. + let model_id = persisted.model_id.clone(); + let mode_id = persisted.mode_id.clone(); + let history_turns = persisted.history.len(); + + let mut state = SessionState::new(req.cwd.clone(), persisted.model_id); + state.history = persisted.history; + state.mode_id = SessionModeId::new(persisted.mode_id); + session::insert(&inner.sessions, req.session_id.clone(), state).await; + + tracing::info!( + session_id = %req.session_id.0, + model_id = %model_id, + mode = %mode_id, + cwd = %req.cwd.display(), + history_turns, + "session loaded from disk" + ); + + let modes = SessionModeState::new( + SessionModeId::new(mode_id), + default_mode_state().available_modes, + ); + Ok(LoadSessionResponse::new().modes(modes)) +} + /// The two modes every Stage 3 session advertises. Stage 7 may grow /// this list (e.g. "plan" for plan-only output, "ask" for read-only), /// but Default + Bypass cover the two operationally distinct @@ -341,7 +416,7 @@ async fn drive_prompt( let user_text = flatten_prompt(&req.prompt); state.history.push(Message { role: Role::User, - content: MessageContent::Text(user_text), + content: MessageContent::Text { text: user_text }, }); ( state.history.clone(), @@ -383,7 +458,9 @@ async fn drive_prompt( let mut messages: Vec = Vec::with_capacity(existing_history.len() + 1); messages.push(Message { role: Role::System, - content: MessageContent::Text(system_prompt), + content: MessageContent::Text { + text: system_prompt, + }, }); messages.extend(existing_history); @@ -494,7 +571,9 @@ async fn drive_prompt( if !assistant_text.is_empty() { new_turns.push(Message { role: Role::Assistant, - content: MessageContent::Text(assistant_text), + content: MessageContent::Text { + text: assistant_text, + }, }); } break; @@ -540,7 +619,9 @@ async fn drive_prompt( if !assistant_text.is_empty() { new_turns.push(Message { role: Role::Assistant, - content: MessageContent::Text(assistant_text), + content: MessageContent::Text { + text: assistant_text, + }, }); } stop_reason = map_finish_reason(finish_reason.as_deref()); @@ -567,7 +648,9 @@ async fn drive_prompt( calls, } } else { - MessageContent::Text(assistant_text) + MessageContent::Text { + text: assistant_text, + } }, }; new_turns.push(assistant_turn.clone()); @@ -638,9 +721,39 @@ async fn drive_prompt( } } - { + // Append the new turns to the session's in-memory history, then + // snapshot the state for persistence. We snapshot *under the + // lock* so the on-disk store reflects exactly what's in memory, + // but the actual blocking I/O (file write) happens outside the + // lock so a slow disk doesn't stall concurrent session work. + let snapshot = { let mut state = session_arc.lock().await; state.history.extend(new_turns); + PersistedSession { + session_id: session_id.0.as_ref().to_string(), + cwd: state.cwd.clone(), + model_id: state.model_id.clone(), + mode_id: state.mode_id.0.as_ref().to_string(), + history: state.history.clone(), + // `created_at` would be ideal to preserve across saves; + // we read it back via store::load on resume but the + // in-memory SessionState doesn't carry it (yet). For + // now persistence treats every save as a refresh, + // updating both timestamps. Future work: thread + // `created_at` through SessionState. + created_at: store::now_secs(), + updated_at: store::now_secs(), + } + }; + if let Err(e) = store::save(&snapshot) { + // Persistence failure is a warning, not a fatal — the + // prompt response still goes through. Operator can grep + // for this to diagnose disk issues. + tracing::warn!( + session_id = %session_id.0, + error = %format!("{e:#}"), + "session/persist failed; resume from disk will miss this turn" + ); } let _ = responder.respond(PromptResponse::new(stop_reason)); diff --git a/crates/helexa-acp/src/main.rs b/crates/helexa-acp/src/main.rs index 31e255b..834d361 100644 --- a/crates/helexa-acp/src/main.rs +++ b/crates/helexa-acp/src/main.rs @@ -21,6 +21,7 @@ mod prompt; mod provider; mod qwen3; mod session; +mod store; mod tool_runner; mod tools; diff --git a/crates/helexa-acp/src/provider/mod.rs b/crates/helexa-acp/src/provider/mod.rs index 4797599..c2769e2 100644 --- a/crates/helexa-acp/src/provider/mod.rs +++ b/crates/helexa-acp/src/provider/mod.rs @@ -73,13 +73,14 @@ pub struct CompletionRequest { pub max_tokens: Option, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Message { pub role: Role, pub content: MessageContent, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum Role { System, User, @@ -88,17 +89,21 @@ pub enum Role { /// shape the upstream wire format wants (OpenAI uses /// `role: "tool"` + `tool_call_id`; Anthropic uses content blocks). /// Stage 3 (tools) constructs this; Stage 2 never does. - #[allow(dead_code)] Tool, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] pub enum MessageContent { - Text(String), + /// Plain text turn (system / user / assistant). Struct variant + /// rather than newtype so the persisted JSON has an explicit + /// `text` field — that lets us use internal tagging on the + /// enum, which is incompatible with newtype-of-primitive + /// variants. + Text { text: String }, /// Assistant turn that called one or more tools. Stage 3 starts /// constructing this when the provider stream yields a /// `ToolCallStart` / `ToolCallArgsDelta` sequence. - #[allow(dead_code)] ToolCalls { /// Optional text the assistant said alongside the tool calls. text: Option, @@ -106,14 +111,13 @@ pub enum MessageContent { }, /// Tool result. `tool_call_id` matches the assistant's call id. /// Stage 3 constructs this after the tool runner finishes. - #[allow(dead_code)] ToolResult { tool_call_id: String, content: String, }, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ToolCall { /// Provider-assigned id that ties the call to its result. The /// Qwen3 wire format we use today doesn't carry this on the diff --git a/crates/helexa-acp/src/provider/openai_chat.rs b/crates/helexa-acp/src/provider/openai_chat.rs index bb59cda..f19173d 100644 --- a/crates/helexa-acp/src/provider/openai_chat.rs +++ b/crates/helexa-acp/src/provider/openai_chat.rs @@ -159,11 +159,13 @@ mod tests { messages: vec![ Message { role: Role::System, - content: MessageContent::Text("you are helpful".into()), + content: MessageContent::Text { + text: "you are helpful".into(), + }, }, Message { role: Role::User, - content: MessageContent::Text("hi".into()), + content: MessageContent::Text { text: "hi".into() }, }, ], tools: vec![], @@ -503,9 +505,13 @@ fn encode_request(req: &CompletionRequest) -> Value { fn encode_message(m: &Message) -> Value { match (m.role, &m.content) { - (Role::System, MessageContent::Text(s)) => json!({"role": "system", "content": s}), - (Role::User, MessageContent::Text(s)) => json!({"role": "user", "content": s}), - (Role::Assistant, MessageContent::Text(s)) => json!({"role": "assistant", "content": s}), + (Role::System, MessageContent::Text { text }) => { + json!({"role": "system", "content": text}) + } + (Role::User, MessageContent::Text { text }) => json!({"role": "user", "content": text}), + (Role::Assistant, MessageContent::Text { text }) => { + json!({"role": "assistant", "content": text}) + } // Qwen3 wire shape: assistant turns that called tools come // back to the model with `{…}` blocks // inline in `content`, *not* via the structured `tool_calls` @@ -553,7 +559,7 @@ fn role_str(r: Role) -> &'static str { fn content_as_text(c: &MessageContent) -> String { match c { - MessageContent::Text(s) => s.clone(), + MessageContent::Text { text } => text.clone(), MessageContent::ToolCalls { text, .. } => text.clone().unwrap_or_default(), MessageContent::ToolResult { content, .. } => content.clone(), } diff --git a/crates/helexa-acp/src/session.rs b/crates/helexa-acp/src/session.rs index d13de34..aa3931e 100644 --- a/crates/helexa-acp/src/session.rs +++ b/crates/helexa-acp/src/session.rs @@ -152,7 +152,9 @@ mod tests { .history .push(Message { role: Role::User, - content: MessageContent::Text("hello".into()), + content: MessageContent::Text { + text: "hello".into(), + }, }); assert_eq!( diff --git a/crates/helexa-acp/src/store.rs b/crates/helexa-acp/src/store.rs new file mode 100644 index 0000000..6674273 --- /dev/null +++ b/crates/helexa-acp/src/store.rs @@ -0,0 +1,277 @@ +//! On-disk session persistence for `session/load` support. +//! +//! Storage layout: +//! +//! ```text +//! $XDG_DATA_HOME/helexa-acp/sessions/{session_id}.json +//! ``` +//! +//! (Fallback to `~/.local/share/helexa-acp/sessions/` when +//! `$XDG_DATA_HOME` is unset.) One JSON file per session. Writes +//! happen at the end of every `session/prompt` round through +//! [`save`], using tempfile-plus-rename so a crash mid-write can't +//! corrupt the store. Reads happen on `session/load` via [`load`]. +//! +//! No compaction, no rotation: files accumulate until the user +//! cleans them up. That's deliberate — disk is cheap, and the +//! resume-on-restart workflow matters more than tidiness. The +//! [`SESSIONS_DIRNAME`] subdirectory is created lazily on first +//! save so an unprivileged install path never errors at startup. + +use std::path::PathBuf; +use std::time::SystemTime; + +use agent_client_protocol::schema::SessionId; +use serde::{Deserialize, Serialize}; + +use crate::provider::Message; + +const APP_DIRNAME: &str = "helexa-acp"; +const SESSIONS_DIRNAME: &str = "sessions"; + +/// The shape persisted to disk for one session. Only what we can't +/// rebuild from the running config goes in here: the conversation +/// history, the mode toggle, the model id, and the cwd-at-creation. +/// +/// `created_at` / `updated_at` are seconds-since-epoch — cheap to +/// compare, no third-party time crate, and stable across runs. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PersistedSession { + pub session_id: String, + pub cwd: PathBuf, + pub model_id: String, + pub mode_id: String, + pub history: Vec, + pub created_at: u64, + pub updated_at: u64, +} + +/// Resolve the directory that holds session JSON files. Honors +/// `$XDG_DATA_HOME`; falls back to `~/.local/share/helexa-acp/sessions/`. +/// Returns `None` if neither is resolvable (no `HOME` set — possible +/// in stripped-down container environments). +pub fn sessions_dir() -> Option { + let base = std::env::var("XDG_DATA_HOME") + .ok() + .filter(|s| !s.is_empty()) + .map(PathBuf::from) + .or_else(|| { + std::env::var("HOME") + .ok() + .map(|h| PathBuf::from(h).join(".local").join("share")) + })?; + Some(base.join(APP_DIRNAME).join(SESSIONS_DIRNAME)) +} + +/// Atomic save into the default sessions directory. +pub fn save(session: &PersistedSession) -> anyhow::Result<()> { + let dir = sessions_dir() + .ok_or_else(|| anyhow::anyhow!("can't resolve XDG_DATA_HOME or HOME for session store"))?; + save_to_dir(&dir, session) +} + +/// Load from the default sessions directory. +pub fn load(session_id: &SessionId) -> anyhow::Result { + let dir = sessions_dir() + .ok_or_else(|| anyhow::anyhow!("can't resolve XDG_DATA_HOME or HOME for session store"))?; + load_from_dir(&dir, session_id) +} + +/// Atomic save into an explicit directory. Writes to +/// `{id}.json.tmp` then renames over `{id}.json`. Creates the +/// target directory if it doesn't exist. Split from [`save`] so +/// unit tests can target a per-test scratch dir without mutating +/// process-global env vars. +pub fn save_to_dir(dir: &std::path::Path, session: &PersistedSession) -> anyhow::Result<()> { + std::fs::create_dir_all(dir).map_err(|e| anyhow::anyhow!("create {}: {e}", dir.display()))?; + let safe = sanitize_id(&session.session_id); + let final_path = dir.join(format!("{safe}.json")); + let tmp_path = dir.join(format!("{safe}.json.tmp")); + let json = serde_json::to_string_pretty(session)?; + std::fs::write(&tmp_path, json) + .map_err(|e| anyhow::anyhow!("write {}: {e}", tmp_path.display()))?; + std::fs::rename(&tmp_path, &final_path) + .map_err(|e| anyhow::anyhow!("rename → {}: {e}", final_path.display()))?; + Ok(()) +} + +/// Load from an explicit directory. Returns a friendly error +/// message when the session id has no file on disk so the caller +/// can map it to a clean ACP error response. +pub fn load_from_dir( + dir: &std::path::Path, + session_id: &SessionId, +) -> anyhow::Result { + let safe = sanitize_id(session_id.0.as_ref()); + let path = dir.join(format!("{safe}.json")); + let bytes = std::fs::read(&path).map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + anyhow::anyhow!("no persisted session at {}", path.display()) + } else { + anyhow::anyhow!("read {}: {e}", path.display()) + } + })?; + let session: PersistedSession = serde_json::from_slice(&bytes) + .map_err(|e| anyhow::anyhow!("parse {}: {e}", path.display()))?; + Ok(session) +} + +/// Seconds-since-epoch, saturating to 0 if the system clock is +/// behind epoch (which shouldn't happen but the type system +/// requires a fallible read). +pub fn now_secs() -> u64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0) +} + +/// Strip anything that isn't a safe filename character so a +/// mischievous (or just unconventional) session id can't escape +/// the sessions directory. +fn sanitize_id(id: &str) -> String { + id.chars() + .map(|c| { + if c.is_ascii_alphanumeric() || c == '-' || c == '_' { + c + } else { + '_' + } + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::provider::{MessageContent, Role}; + + /// Unique scratch dir per test invocation. We use this dir + /// directly with the `*_to_dir` / `*_from_dir` functions so + /// the tests never mutate `$XDG_DATA_HOME` — that env var + /// would race across the parallel test harness. + fn unique_dir() -> PathBuf { + let base = std::env::var("CARGO_TARGET_TMPDIR") + .ok() + .map(PathBuf::from) + .unwrap_or_else(std::env::temp_dir); + let pid = std::process::id(); + let nanos = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map(|d| d.subsec_nanos()) + .unwrap_or(0); + let dir = base.join(format!("helexa-acp-store-test-{pid}-{nanos}")); + std::fs::create_dir_all(&dir).expect("create test dir"); + dir + } + + fn sample(id: &str) -> PersistedSession { + PersistedSession { + session_id: id.into(), + cwd: PathBuf::from("/home/me/proj"), + model_id: "Qwen/Qwen3.6-27B".into(), + mode_id: "default".into(), + history: vec![ + Message { + role: Role::User, + content: MessageContent::Text { + text: "hello".into(), + }, + }, + Message { + role: Role::Assistant, + content: MessageContent::Text { text: "hi".into() }, + }, + ], + created_at: 1_700_000_000, + updated_at: 1_700_000_001, + } + } + + #[test] + fn round_trip_save_then_load() { + let dir = unique_dir(); + save_to_dir(&dir, &sample("hxa-1")).expect("save"); + let loaded = load_from_dir(&dir, &SessionId::new("hxa-1")).expect("load"); + assert_eq!(loaded.session_id, "hxa-1"); + assert_eq!(loaded.cwd, PathBuf::from("/home/me/proj")); + assert_eq!(loaded.history.len(), 2); + let _ = std::fs::remove_dir_all(&dir); + } + + #[test] + fn load_missing_session_errors_with_not_found_message() { + let dir = unique_dir(); + let err = load_from_dir(&dir, &SessionId::new("nope")).unwrap_err(); + let msg = format!("{err}"); + assert!( + msg.contains("no persisted session"), + "want NotFound, got: {msg}" + ); + let _ = std::fs::remove_dir_all(&dir); + } + + #[test] + fn save_overwrites_existing_atomically() { + let dir = unique_dir(); + save_to_dir(&dir, &sample("hxa-1")).expect("save"); + let mut updated = sample("hxa-1"); + updated.history.push(Message { + role: Role::User, + content: MessageContent::Text { + text: "third turn".into(), + }, + }); + updated.updated_at = 1_700_000_500; + save_to_dir(&dir, &updated).expect("re-save"); + let loaded = load_from_dir(&dir, &SessionId::new("hxa-1")).expect("load"); + assert_eq!(loaded.history.len(), 3); + assert_eq!(loaded.updated_at, 1_700_000_500); + let _ = std::fs::remove_dir_all(&dir); + } + + #[test] + fn save_then_load_preserves_tool_calls_and_results() { + use crate::provider::ToolCall; + let dir = unique_dir(); + let mut session = sample("hxa-2"); + session.history.push(Message { + role: Role::Assistant, + content: MessageContent::ToolCalls { + text: Some("calling".into()), + calls: vec![ToolCall { + id: "call_0".into(), + name: "read_file".into(), + arguments: r#"{"path":"/etc/hostname"}"#.into(), + }], + }, + }); + session.history.push(Message { + role: Role::Tool, + content: MessageContent::ToolResult { + tool_call_id: "call_0".into(), + content: "host".into(), + }, + }); + save_to_dir(&dir, &session).expect("save"); + let loaded = load_from_dir(&dir, &SessionId::new("hxa-2")).expect("load"); + assert_eq!(loaded.history.len(), 4); + match &loaded.history[2].content { + MessageContent::ToolCalls { calls, .. } => { + assert_eq!(calls[0].name, "read_file"); + } + other => panic!("expected ToolCalls, got {other:?}"), + } + let _ = std::fs::remove_dir_all(&dir); + } + + #[test] + fn sanitize_id_rejects_path_traversal() { + // `../../etc/passwd` — 6 non-alnum chars before "etc" + // (`.`, `.`, `/`, `.`, `.`, `/`), one between, none + // after, none before nothing. Every disallowed char + // collapses to `_`. + assert_eq!(sanitize_id("../../etc/passwd"), "______etc_passwd"); + assert_eq!(sanitize_id("ok-name_42"), "ok-name_42"); + } +}