feat(helexa-acp): session resume via session/load
All checks were successful
CI / Format (push) Successful in 31s
build-prerelease / Resolve version stamps (push) Successful in 40s
CI / Clippy (push) Successful in 2m37s
CI / Test (push) Successful in 4m59s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Build cortex binary (push) Successful in 4m35s
build-prerelease / Package cortex RPM (push) Successful in 1m19s
build-prerelease / Build neuron-blackwell (push) Successful in 6m4s
build-prerelease / Build neuron-ampere (push) Successful in 7m45s
build-prerelease / Build neuron-ada (push) Successful in 5m31s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 2m53s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 3m0s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 3m43s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 1m1s
All checks were successful
CI / Format (push) Successful in 31s
build-prerelease / Resolve version stamps (push) Successful in 40s
CI / Clippy (push) Successful in 2m37s
CI / Test (push) Successful in 4m59s
CI / Build cortex SRPM (push) Has been skipped
CI / Publish cortex to COPR (push) Has been skipped
CI / Build neuron SRPM (push) Has been skipped
CI / Publish neuron to COPR (push) Has been skipped
CI / Bump version in source (push) Has been skipped
build-prerelease / Build cortex binary (push) Successful in 4m35s
build-prerelease / Package cortex RPM (push) Successful in 1m19s
build-prerelease / Build neuron-blackwell (push) Successful in 6m4s
build-prerelease / Build neuron-ampere (push) Successful in 7m45s
build-prerelease / Build neuron-ada (push) Successful in 5m31s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 2m53s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 3m0s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 3m43s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 1m1s
Zed restarts (frequent during helexa-acp dogfooding) used to lose
every conversation because we'd ignore the load_session capability
and treat every project-reopen as a fresh session/new. Persist
sessions to disk and honour session/load so the agent panel comes
back where it left off.
Storage layout:
$XDG_DATA_HOME/helexa-acp/sessions/{session_id}.json
Each file holds session_id, cwd, model_id, mode_id, full Message
history, plus created/updated timestamps. Atomic save via
tempfile+rename so a crash mid-write can't corrupt the store.
Touch points:
- src/store.rs (new) — sessions_dir() resolution, save/load via
default and explicit-dir entry points (so unit tests don't have
to race on XDG_DATA_HOME). 5 unit tests cover round-trip,
not-found errors, atomic overwrite, tool-call/result preservation,
and the filename sanitiser's path-traversal handling.
- src/provider/mod.rs — Serialize/Deserialize on Role, Message,
MessageContent, ToolCall. MessageContent::Text turned into a
struct variant ({text: ...}) so internally-tagged JSON works.
- src/agent.rs — initialize_response advertises load_session: true;
handle_load_session reads the file, snapshots in-memory state,
returns LoadSessionResponse with the persisted mode preselected;
drive_prompt persists at the end of every prompt round under the
session lock with the I/O outside the lock.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -19,9 +19,10 @@ use std::sync::atomic::{AtomicU64, Ordering};
|
|||||||
|
|
||||||
use agent_client_protocol::schema::{
|
use agent_client_protocol::schema::{
|
||||||
AgentCapabilities, CancelNotification, ContentBlock, InitializeRequest, InitializeResponse,
|
AgentCapabilities, CancelNotification, ContentBlock, InitializeRequest, InitializeResponse,
|
||||||
NewSessionRequest, NewSessionResponse, PromptCapabilities, PromptRequest, PromptResponse,
|
LoadSessionRequest, LoadSessionResponse, NewSessionRequest, NewSessionResponse,
|
||||||
SessionId, SessionMode, SessionModeId, SessionModeState, SessionNotification, SessionUpdate,
|
PromptCapabilities, PromptRequest, PromptResponse, SessionId, SessionMode, SessionModeId,
|
||||||
SetSessionModeRequest, SetSessionModeResponse, StopReason, TextContent,
|
SessionModeState, SessionNotification, SessionUpdate, SetSessionModeRequest,
|
||||||
|
SetSessionModeResponse, StopReason, TextContent,
|
||||||
};
|
};
|
||||||
use agent_client_protocol::{Agent as AgentRole, Client, ConnectionTo, Dispatch, Stdio};
|
use agent_client_protocol::{Agent as AgentRole, Client, ConnectionTo, Dispatch, Stdio};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
@@ -34,6 +35,7 @@ use crate::provider::{
|
|||||||
CompletionEvent, CompletionRequest, Message, MessageContent, Provider, Role, ToolCall,
|
CompletionEvent, CompletionRequest, Message, MessageContent, Provider, Role, ToolCall,
|
||||||
};
|
};
|
||||||
use crate::session::{self, MODE_BYPASS, MODE_DEFAULT, SessionState, SessionStore};
|
use crate::session::{self, MODE_BYPASS, MODE_DEFAULT, SessionState, SessionStore};
|
||||||
|
use crate::store::{self, PersistedSession};
|
||||||
use crate::tool_runner::{AcpClientOps, ToolCallEvent, dispatch_tool_call};
|
use crate::tool_runner::{AcpClientOps, ToolCallEvent, dispatch_tool_call};
|
||||||
use crate::tools;
|
use crate::tools;
|
||||||
|
|
||||||
@@ -135,6 +137,20 @@ impl Agent {
|
|||||||
},
|
},
|
||||||
agent_client_protocol::on_receive_request!(),
|
agent_client_protocol::on_receive_request!(),
|
||||||
)
|
)
|
||||||
|
.on_receive_request(
|
||||||
|
{
|
||||||
|
let inner = inner.clone();
|
||||||
|
async move |req: LoadSessionRequest, responder, _cx| match handle_load_session(
|
||||||
|
&inner, req,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(resp) => responder.respond(resp),
|
||||||
|
Err(e) => responder.respond_with_internal_error(format!("{e:#}")),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
agent_client_protocol::on_receive_request!(),
|
||||||
|
)
|
||||||
.on_receive_request(
|
.on_receive_request(
|
||||||
{
|
{
|
||||||
let inner = inner.clone();
|
let inner = inner.clone();
|
||||||
@@ -205,8 +221,16 @@ fn initialize_response(req: &InitializeRequest) -> InitializeResponse {
|
|||||||
// Stage 2: text-only prompts. Image / audio / embedded resources
|
// Stage 2: text-only prompts. Image / audio / embedded resources
|
||||||
// flip on in later stages.
|
// flip on in later stages.
|
||||||
let prompt_caps = PromptCapabilities::default();
|
let prompt_caps = PromptCapabilities::default();
|
||||||
InitializeResponse::new(req.protocol_version)
|
InitializeResponse::new(req.protocol_version).agent_capabilities(
|
||||||
.agent_capabilities(AgentCapabilities::new().prompt_capabilities(prompt_caps))
|
AgentCapabilities::new()
|
||||||
|
.prompt_capabilities(prompt_caps)
|
||||||
|
// Stage 3b: `session/load` is implemented. Persisted
|
||||||
|
// sessions live on disk under
|
||||||
|
// `$XDG_DATA_HOME/helexa-acp/sessions/`; clients (Zed)
|
||||||
|
// can hand us back any session_id we previously
|
||||||
|
// minted to resume the conversation.
|
||||||
|
.load_session(true),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_new_session(
|
async fn handle_new_session(
|
||||||
@@ -240,6 +264,57 @@ async fn handle_new_session(
|
|||||||
Ok(NewSessionResponse::new(session_id).modes(default_mode_state()))
|
Ok(NewSessionResponse::new(session_id).modes(default_mode_state()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Rehydrate a session from disk.
|
||||||
|
///
|
||||||
|
/// Behaviour:
|
||||||
|
///
|
||||||
|
/// - Reads the persisted JSON from
|
||||||
|
/// `$XDG_DATA_HOME/helexa-acp/sessions/{id}.json`. Missing file →
|
||||||
|
/// error (Zed falls back to `session/new`).
|
||||||
|
/// - Overwrites the persisted `cwd` with the one the client just
|
||||||
|
/// sent. The user may have moved or symlinked the repo since
|
||||||
|
/// the session was first created; the *current* cwd is the
|
||||||
|
/// right place to root subsequent tool dispatches.
|
||||||
|
/// - Materialises an in-memory `SessionState` with the persisted
|
||||||
|
/// model + mode + history.
|
||||||
|
/// - Returns `LoadSessionResponse` carrying the same mode list as
|
||||||
|
/// `session/new`, plus the persisted `current_mode_id` so the
|
||||||
|
/// client renders the mode dropdown in the correct state.
|
||||||
|
async fn handle_load_session(
|
||||||
|
inner: &AgentInner,
|
||||||
|
req: LoadSessionRequest,
|
||||||
|
) -> anyhow::Result<LoadSessionResponse> {
|
||||||
|
if !req.cwd.is_absolute() {
|
||||||
|
anyhow::bail!("session cwd must be absolute, got {}", req.cwd.display());
|
||||||
|
}
|
||||||
|
let persisted = store::load(&req.session_id)?;
|
||||||
|
// Snapshot the values we need for logging + the response
|
||||||
|
// before we move pieces of `persisted` into `state`.
|
||||||
|
let model_id = persisted.model_id.clone();
|
||||||
|
let mode_id = persisted.mode_id.clone();
|
||||||
|
let history_turns = persisted.history.len();
|
||||||
|
|
||||||
|
let mut state = SessionState::new(req.cwd.clone(), persisted.model_id);
|
||||||
|
state.history = persisted.history;
|
||||||
|
state.mode_id = SessionModeId::new(persisted.mode_id);
|
||||||
|
session::insert(&inner.sessions, req.session_id.clone(), state).await;
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
session_id = %req.session_id.0,
|
||||||
|
model_id = %model_id,
|
||||||
|
mode = %mode_id,
|
||||||
|
cwd = %req.cwd.display(),
|
||||||
|
history_turns,
|
||||||
|
"session loaded from disk"
|
||||||
|
);
|
||||||
|
|
||||||
|
let modes = SessionModeState::new(
|
||||||
|
SessionModeId::new(mode_id),
|
||||||
|
default_mode_state().available_modes,
|
||||||
|
);
|
||||||
|
Ok(LoadSessionResponse::new().modes(modes))
|
||||||
|
}
|
||||||
|
|
||||||
/// The two modes every Stage 3 session advertises. Stage 7 may grow
|
/// The two modes every Stage 3 session advertises. Stage 7 may grow
|
||||||
/// this list (e.g. "plan" for plan-only output, "ask" for read-only),
|
/// this list (e.g. "plan" for plan-only output, "ask" for read-only),
|
||||||
/// but Default + Bypass cover the two operationally distinct
|
/// but Default + Bypass cover the two operationally distinct
|
||||||
@@ -341,7 +416,7 @@ async fn drive_prompt(
|
|||||||
let user_text = flatten_prompt(&req.prompt);
|
let user_text = flatten_prompt(&req.prompt);
|
||||||
state.history.push(Message {
|
state.history.push(Message {
|
||||||
role: Role::User,
|
role: Role::User,
|
||||||
content: MessageContent::Text(user_text),
|
content: MessageContent::Text { text: user_text },
|
||||||
});
|
});
|
||||||
(
|
(
|
||||||
state.history.clone(),
|
state.history.clone(),
|
||||||
@@ -383,7 +458,9 @@ async fn drive_prompt(
|
|||||||
let mut messages: Vec<Message> = Vec::with_capacity(existing_history.len() + 1);
|
let mut messages: Vec<Message> = Vec::with_capacity(existing_history.len() + 1);
|
||||||
messages.push(Message {
|
messages.push(Message {
|
||||||
role: Role::System,
|
role: Role::System,
|
||||||
content: MessageContent::Text(system_prompt),
|
content: MessageContent::Text {
|
||||||
|
text: system_prompt,
|
||||||
|
},
|
||||||
});
|
});
|
||||||
messages.extend(existing_history);
|
messages.extend(existing_history);
|
||||||
|
|
||||||
@@ -494,7 +571,9 @@ async fn drive_prompt(
|
|||||||
if !assistant_text.is_empty() {
|
if !assistant_text.is_empty() {
|
||||||
new_turns.push(Message {
|
new_turns.push(Message {
|
||||||
role: Role::Assistant,
|
role: Role::Assistant,
|
||||||
content: MessageContent::Text(assistant_text),
|
content: MessageContent::Text {
|
||||||
|
text: assistant_text,
|
||||||
|
},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@@ -540,7 +619,9 @@ async fn drive_prompt(
|
|||||||
if !assistant_text.is_empty() {
|
if !assistant_text.is_empty() {
|
||||||
new_turns.push(Message {
|
new_turns.push(Message {
|
||||||
role: Role::Assistant,
|
role: Role::Assistant,
|
||||||
content: MessageContent::Text(assistant_text),
|
content: MessageContent::Text {
|
||||||
|
text: assistant_text,
|
||||||
|
},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
stop_reason = map_finish_reason(finish_reason.as_deref());
|
stop_reason = map_finish_reason(finish_reason.as_deref());
|
||||||
@@ -567,7 +648,9 @@ async fn drive_prompt(
|
|||||||
calls,
|
calls,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
MessageContent::Text(assistant_text)
|
MessageContent::Text {
|
||||||
|
text: assistant_text,
|
||||||
|
}
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
new_turns.push(assistant_turn.clone());
|
new_turns.push(assistant_turn.clone());
|
||||||
@@ -638,9 +721,39 @@ async fn drive_prompt(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
// Append the new turns to the session's in-memory history, then
|
||||||
|
// snapshot the state for persistence. We snapshot *under the
|
||||||
|
// lock* so the on-disk store reflects exactly what's in memory,
|
||||||
|
// but the actual blocking I/O (file write) happens outside the
|
||||||
|
// lock so a slow disk doesn't stall concurrent session work.
|
||||||
|
let snapshot = {
|
||||||
let mut state = session_arc.lock().await;
|
let mut state = session_arc.lock().await;
|
||||||
state.history.extend(new_turns);
|
state.history.extend(new_turns);
|
||||||
|
PersistedSession {
|
||||||
|
session_id: session_id.0.as_ref().to_string(),
|
||||||
|
cwd: state.cwd.clone(),
|
||||||
|
model_id: state.model_id.clone(),
|
||||||
|
mode_id: state.mode_id.0.as_ref().to_string(),
|
||||||
|
history: state.history.clone(),
|
||||||
|
// `created_at` would be ideal to preserve across saves;
|
||||||
|
// we read it back via store::load on resume but the
|
||||||
|
// in-memory SessionState doesn't carry it (yet). For
|
||||||
|
// now persistence treats every save as a refresh,
|
||||||
|
// updating both timestamps. Future work: thread
|
||||||
|
// `created_at` through SessionState.
|
||||||
|
created_at: store::now_secs(),
|
||||||
|
updated_at: store::now_secs(),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(e) = store::save(&snapshot) {
|
||||||
|
// Persistence failure is a warning, not a fatal — the
|
||||||
|
// prompt response still goes through. Operator can grep
|
||||||
|
// for this to diagnose disk issues.
|
||||||
|
tracing::warn!(
|
||||||
|
session_id = %session_id.0,
|
||||||
|
error = %format!("{e:#}"),
|
||||||
|
"session/persist failed; resume from disk will miss this turn"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let _ = responder.respond(PromptResponse::new(stop_reason));
|
let _ = responder.respond(PromptResponse::new(stop_reason));
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ mod prompt;
|
|||||||
mod provider;
|
mod provider;
|
||||||
mod qwen3;
|
mod qwen3;
|
||||||
mod session;
|
mod session;
|
||||||
|
mod store;
|
||||||
mod tool_runner;
|
mod tool_runner;
|
||||||
mod tools;
|
mod tools;
|
||||||
|
|
||||||
|
|||||||
@@ -73,13 +73,14 @@ pub struct CompletionRequest {
|
|||||||
pub max_tokens: Option<u64>,
|
pub max_tokens: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct Message {
|
pub struct Message {
|
||||||
pub role: Role,
|
pub role: Role,
|
||||||
pub content: MessageContent,
|
pub content: MessageContent,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
pub enum Role {
|
pub enum Role {
|
||||||
System,
|
System,
|
||||||
User,
|
User,
|
||||||
@@ -88,17 +89,21 @@ pub enum Role {
|
|||||||
/// shape the upstream wire format wants (OpenAI uses
|
/// shape the upstream wire format wants (OpenAI uses
|
||||||
/// `role: "tool"` + `tool_call_id`; Anthropic uses content blocks).
|
/// `role: "tool"` + `tool_call_id`; Anthropic uses content blocks).
|
||||||
/// Stage 3 (tools) constructs this; Stage 2 never does.
|
/// Stage 3 (tools) constructs this; Stage 2 never does.
|
||||||
#[allow(dead_code)]
|
|
||||||
Tool,
|
Tool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "type", rename_all = "snake_case")]
|
||||||
pub enum MessageContent {
|
pub enum MessageContent {
|
||||||
Text(String),
|
/// Plain text turn (system / user / assistant). Struct variant
|
||||||
|
/// rather than newtype so the persisted JSON has an explicit
|
||||||
|
/// `text` field — that lets us use internal tagging on the
|
||||||
|
/// enum, which is incompatible with newtype-of-primitive
|
||||||
|
/// variants.
|
||||||
|
Text { text: String },
|
||||||
/// Assistant turn that called one or more tools. Stage 3 starts
|
/// Assistant turn that called one or more tools. Stage 3 starts
|
||||||
/// constructing this when the provider stream yields a
|
/// constructing this when the provider stream yields a
|
||||||
/// `ToolCallStart` / `ToolCallArgsDelta` sequence.
|
/// `ToolCallStart` / `ToolCallArgsDelta` sequence.
|
||||||
#[allow(dead_code)]
|
|
||||||
ToolCalls {
|
ToolCalls {
|
||||||
/// Optional text the assistant said alongside the tool calls.
|
/// Optional text the assistant said alongside the tool calls.
|
||||||
text: Option<String>,
|
text: Option<String>,
|
||||||
@@ -106,14 +111,13 @@ pub enum MessageContent {
|
|||||||
},
|
},
|
||||||
/// Tool result. `tool_call_id` matches the assistant's call id.
|
/// Tool result. `tool_call_id` matches the assistant's call id.
|
||||||
/// Stage 3 constructs this after the tool runner finishes.
|
/// Stage 3 constructs this after the tool runner finishes.
|
||||||
#[allow(dead_code)]
|
|
||||||
ToolResult {
|
ToolResult {
|
||||||
tool_call_id: String,
|
tool_call_id: String,
|
||||||
content: String,
|
content: String,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct ToolCall {
|
pub struct ToolCall {
|
||||||
/// Provider-assigned id that ties the call to its result. The
|
/// Provider-assigned id that ties the call to its result. The
|
||||||
/// Qwen3 wire format we use today doesn't carry this on the
|
/// Qwen3 wire format we use today doesn't carry this on the
|
||||||
|
|||||||
@@ -159,11 +159,13 @@ mod tests {
|
|||||||
messages: vec![
|
messages: vec![
|
||||||
Message {
|
Message {
|
||||||
role: Role::System,
|
role: Role::System,
|
||||||
content: MessageContent::Text("you are helpful".into()),
|
content: MessageContent::Text {
|
||||||
|
text: "you are helpful".into(),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Message {
|
Message {
|
||||||
role: Role::User,
|
role: Role::User,
|
||||||
content: MessageContent::Text("hi".into()),
|
content: MessageContent::Text { text: "hi".into() },
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
tools: vec![],
|
tools: vec![],
|
||||||
@@ -503,9 +505,13 @@ fn encode_request(req: &CompletionRequest) -> Value {
|
|||||||
|
|
||||||
fn encode_message(m: &Message) -> Value {
|
fn encode_message(m: &Message) -> Value {
|
||||||
match (m.role, &m.content) {
|
match (m.role, &m.content) {
|
||||||
(Role::System, MessageContent::Text(s)) => json!({"role": "system", "content": s}),
|
(Role::System, MessageContent::Text { text }) => {
|
||||||
(Role::User, MessageContent::Text(s)) => json!({"role": "user", "content": s}),
|
json!({"role": "system", "content": text})
|
||||||
(Role::Assistant, MessageContent::Text(s)) => json!({"role": "assistant", "content": s}),
|
}
|
||||||
|
(Role::User, MessageContent::Text { text }) => json!({"role": "user", "content": text}),
|
||||||
|
(Role::Assistant, MessageContent::Text { text }) => {
|
||||||
|
json!({"role": "assistant", "content": text})
|
||||||
|
}
|
||||||
// Qwen3 wire shape: assistant turns that called tools come
|
// Qwen3 wire shape: assistant turns that called tools come
|
||||||
// back to the model with `<tool_call>{…}</tool_call>` blocks
|
// back to the model with `<tool_call>{…}</tool_call>` blocks
|
||||||
// inline in `content`, *not* via the structured `tool_calls`
|
// inline in `content`, *not* via the structured `tool_calls`
|
||||||
@@ -553,7 +559,7 @@ fn role_str(r: Role) -> &'static str {
|
|||||||
|
|
||||||
fn content_as_text(c: &MessageContent) -> String {
|
fn content_as_text(c: &MessageContent) -> String {
|
||||||
match c {
|
match c {
|
||||||
MessageContent::Text(s) => s.clone(),
|
MessageContent::Text { text } => text.clone(),
|
||||||
MessageContent::ToolCalls { text, .. } => text.clone().unwrap_or_default(),
|
MessageContent::ToolCalls { text, .. } => text.clone().unwrap_or_default(),
|
||||||
MessageContent::ToolResult { content, .. } => content.clone(),
|
MessageContent::ToolResult { content, .. } => content.clone(),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -152,7 +152,9 @@ mod tests {
|
|||||||
.history
|
.history
|
||||||
.push(Message {
|
.push(Message {
|
||||||
role: Role::User,
|
role: Role::User,
|
||||||
content: MessageContent::Text("hello".into()),
|
content: MessageContent::Text {
|
||||||
|
text: "hello".into(),
|
||||||
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
|||||||
277
crates/helexa-acp/src/store.rs
Normal file
277
crates/helexa-acp/src/store.rs
Normal file
@@ -0,0 +1,277 @@
|
|||||||
|
//! On-disk session persistence for `session/load` support.
|
||||||
|
//!
|
||||||
|
//! Storage layout:
|
||||||
|
//!
|
||||||
|
//! ```text
|
||||||
|
//! $XDG_DATA_HOME/helexa-acp/sessions/{session_id}.json
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! (Fallback to `~/.local/share/helexa-acp/sessions/` when
|
||||||
|
//! `$XDG_DATA_HOME` is unset.) One JSON file per session. Writes
|
||||||
|
//! happen at the end of every `session/prompt` round through
|
||||||
|
//! [`save`], using tempfile-plus-rename so a crash mid-write can't
|
||||||
|
//! corrupt the store. Reads happen on `session/load` via [`load`].
|
||||||
|
//!
|
||||||
|
//! No compaction, no rotation: files accumulate until the user
|
||||||
|
//! cleans them up. That's deliberate — disk is cheap, and the
|
||||||
|
//! resume-on-restart workflow matters more than tidiness. The
|
||||||
|
//! [`SESSIONS_DIRNAME`] subdirectory is created lazily on first
|
||||||
|
//! save so an unprivileged install path never errors at startup.
|
||||||
|
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::time::SystemTime;
|
||||||
|
|
||||||
|
use agent_client_protocol::schema::SessionId;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use crate::provider::Message;
|
||||||
|
|
||||||
|
const APP_DIRNAME: &str = "helexa-acp";
|
||||||
|
const SESSIONS_DIRNAME: &str = "sessions";
|
||||||
|
|
||||||
|
/// The shape persisted to disk for one session. Only what we can't
|
||||||
|
/// rebuild from the running config goes in here: the conversation
|
||||||
|
/// history, the mode toggle, the model id, and the cwd-at-creation.
|
||||||
|
///
|
||||||
|
/// `created_at` / `updated_at` are seconds-since-epoch — cheap to
|
||||||
|
/// compare, no third-party time crate, and stable across runs.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct PersistedSession {
|
||||||
|
pub session_id: String,
|
||||||
|
pub cwd: PathBuf,
|
||||||
|
pub model_id: String,
|
||||||
|
pub mode_id: String,
|
||||||
|
pub history: Vec<Message>,
|
||||||
|
pub created_at: u64,
|
||||||
|
pub updated_at: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Resolve the directory that holds session JSON files. Honors
|
||||||
|
/// `$XDG_DATA_HOME`; falls back to `~/.local/share/helexa-acp/sessions/`.
|
||||||
|
/// Returns `None` if neither is resolvable (no `HOME` set — possible
|
||||||
|
/// in stripped-down container environments).
|
||||||
|
pub fn sessions_dir() -> Option<PathBuf> {
|
||||||
|
let base = std::env::var("XDG_DATA_HOME")
|
||||||
|
.ok()
|
||||||
|
.filter(|s| !s.is_empty())
|
||||||
|
.map(PathBuf::from)
|
||||||
|
.or_else(|| {
|
||||||
|
std::env::var("HOME")
|
||||||
|
.ok()
|
||||||
|
.map(|h| PathBuf::from(h).join(".local").join("share"))
|
||||||
|
})?;
|
||||||
|
Some(base.join(APP_DIRNAME).join(SESSIONS_DIRNAME))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Atomic save into the default sessions directory.
|
||||||
|
pub fn save(session: &PersistedSession) -> anyhow::Result<()> {
|
||||||
|
let dir = sessions_dir()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("can't resolve XDG_DATA_HOME or HOME for session store"))?;
|
||||||
|
save_to_dir(&dir, session)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load from the default sessions directory.
|
||||||
|
pub fn load(session_id: &SessionId) -> anyhow::Result<PersistedSession> {
|
||||||
|
let dir = sessions_dir()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("can't resolve XDG_DATA_HOME or HOME for session store"))?;
|
||||||
|
load_from_dir(&dir, session_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Atomic save into an explicit directory. Writes to
|
||||||
|
/// `{id}.json.tmp` then renames over `{id}.json`. Creates the
|
||||||
|
/// target directory if it doesn't exist. Split from [`save`] so
|
||||||
|
/// unit tests can target a per-test scratch dir without mutating
|
||||||
|
/// process-global env vars.
|
||||||
|
pub fn save_to_dir(dir: &std::path::Path, session: &PersistedSession) -> anyhow::Result<()> {
|
||||||
|
std::fs::create_dir_all(dir).map_err(|e| anyhow::anyhow!("create {}: {e}", dir.display()))?;
|
||||||
|
let safe = sanitize_id(&session.session_id);
|
||||||
|
let final_path = dir.join(format!("{safe}.json"));
|
||||||
|
let tmp_path = dir.join(format!("{safe}.json.tmp"));
|
||||||
|
let json = serde_json::to_string_pretty(session)?;
|
||||||
|
std::fs::write(&tmp_path, json)
|
||||||
|
.map_err(|e| anyhow::anyhow!("write {}: {e}", tmp_path.display()))?;
|
||||||
|
std::fs::rename(&tmp_path, &final_path)
|
||||||
|
.map_err(|e| anyhow::anyhow!("rename → {}: {e}", final_path.display()))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load from an explicit directory. Returns a friendly error
|
||||||
|
/// message when the session id has no file on disk so the caller
|
||||||
|
/// can map it to a clean ACP error response.
|
||||||
|
pub fn load_from_dir(
|
||||||
|
dir: &std::path::Path,
|
||||||
|
session_id: &SessionId,
|
||||||
|
) -> anyhow::Result<PersistedSession> {
|
||||||
|
let safe = sanitize_id(session_id.0.as_ref());
|
||||||
|
let path = dir.join(format!("{safe}.json"));
|
||||||
|
let bytes = std::fs::read(&path).map_err(|e| {
|
||||||
|
if e.kind() == std::io::ErrorKind::NotFound {
|
||||||
|
anyhow::anyhow!("no persisted session at {}", path.display())
|
||||||
|
} else {
|
||||||
|
anyhow::anyhow!("read {}: {e}", path.display())
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
let session: PersistedSession = serde_json::from_slice(&bytes)
|
||||||
|
.map_err(|e| anyhow::anyhow!("parse {}: {e}", path.display()))?;
|
||||||
|
Ok(session)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Seconds-since-epoch, saturating to 0 if the system clock is
|
||||||
|
/// behind epoch (which shouldn't happen but the type system
|
||||||
|
/// requires a fallible read).
|
||||||
|
pub fn now_secs() -> u64 {
|
||||||
|
SystemTime::now()
|
||||||
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
|
.map(|d| d.as_secs())
|
||||||
|
.unwrap_or(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Strip anything that isn't a safe filename character so a
|
||||||
|
/// mischievous (or just unconventional) session id can't escape
|
||||||
|
/// the sessions directory.
|
||||||
|
fn sanitize_id(id: &str) -> String {
|
||||||
|
id.chars()
|
||||||
|
.map(|c| {
|
||||||
|
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
|
||||||
|
c
|
||||||
|
} else {
|
||||||
|
'_'
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::provider::{MessageContent, Role};
|
||||||
|
|
||||||
|
/// Unique scratch dir per test invocation. We use this dir
|
||||||
|
/// directly with the `*_to_dir` / `*_from_dir` functions so
|
||||||
|
/// the tests never mutate `$XDG_DATA_HOME` — that env var
|
||||||
|
/// would race across the parallel test harness.
|
||||||
|
fn unique_dir() -> PathBuf {
|
||||||
|
let base = std::env::var("CARGO_TARGET_TMPDIR")
|
||||||
|
.ok()
|
||||||
|
.map(PathBuf::from)
|
||||||
|
.unwrap_or_else(std::env::temp_dir);
|
||||||
|
let pid = std::process::id();
|
||||||
|
let nanos = SystemTime::now()
|
||||||
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
|
.map(|d| d.subsec_nanos())
|
||||||
|
.unwrap_or(0);
|
||||||
|
let dir = base.join(format!("helexa-acp-store-test-{pid}-{nanos}"));
|
||||||
|
std::fs::create_dir_all(&dir).expect("create test dir");
|
||||||
|
dir
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sample(id: &str) -> PersistedSession {
|
||||||
|
PersistedSession {
|
||||||
|
session_id: id.into(),
|
||||||
|
cwd: PathBuf::from("/home/me/proj"),
|
||||||
|
model_id: "Qwen/Qwen3.6-27B".into(),
|
||||||
|
mode_id: "default".into(),
|
||||||
|
history: vec![
|
||||||
|
Message {
|
||||||
|
role: Role::User,
|
||||||
|
content: MessageContent::Text {
|
||||||
|
text: "hello".into(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Message {
|
||||||
|
role: Role::Assistant,
|
||||||
|
content: MessageContent::Text { text: "hi".into() },
|
||||||
|
},
|
||||||
|
],
|
||||||
|
created_at: 1_700_000_000,
|
||||||
|
updated_at: 1_700_000_001,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn round_trip_save_then_load() {
|
||||||
|
let dir = unique_dir();
|
||||||
|
save_to_dir(&dir, &sample("hxa-1")).expect("save");
|
||||||
|
let loaded = load_from_dir(&dir, &SessionId::new("hxa-1")).expect("load");
|
||||||
|
assert_eq!(loaded.session_id, "hxa-1");
|
||||||
|
assert_eq!(loaded.cwd, PathBuf::from("/home/me/proj"));
|
||||||
|
assert_eq!(loaded.history.len(), 2);
|
||||||
|
let _ = std::fs::remove_dir_all(&dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn load_missing_session_errors_with_not_found_message() {
|
||||||
|
let dir = unique_dir();
|
||||||
|
let err = load_from_dir(&dir, &SessionId::new("nope")).unwrap_err();
|
||||||
|
let msg = format!("{err}");
|
||||||
|
assert!(
|
||||||
|
msg.contains("no persisted session"),
|
||||||
|
"want NotFound, got: {msg}"
|
||||||
|
);
|
||||||
|
let _ = std::fs::remove_dir_all(&dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn save_overwrites_existing_atomically() {
|
||||||
|
let dir = unique_dir();
|
||||||
|
save_to_dir(&dir, &sample("hxa-1")).expect("save");
|
||||||
|
let mut updated = sample("hxa-1");
|
||||||
|
updated.history.push(Message {
|
||||||
|
role: Role::User,
|
||||||
|
content: MessageContent::Text {
|
||||||
|
text: "third turn".into(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
updated.updated_at = 1_700_000_500;
|
||||||
|
save_to_dir(&dir, &updated).expect("re-save");
|
||||||
|
let loaded = load_from_dir(&dir, &SessionId::new("hxa-1")).expect("load");
|
||||||
|
assert_eq!(loaded.history.len(), 3);
|
||||||
|
assert_eq!(loaded.updated_at, 1_700_000_500);
|
||||||
|
let _ = std::fs::remove_dir_all(&dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn save_then_load_preserves_tool_calls_and_results() {
|
||||||
|
use crate::provider::ToolCall;
|
||||||
|
let dir = unique_dir();
|
||||||
|
let mut session = sample("hxa-2");
|
||||||
|
session.history.push(Message {
|
||||||
|
role: Role::Assistant,
|
||||||
|
content: MessageContent::ToolCalls {
|
||||||
|
text: Some("calling".into()),
|
||||||
|
calls: vec![ToolCall {
|
||||||
|
id: "call_0".into(),
|
||||||
|
name: "read_file".into(),
|
||||||
|
arguments: r#"{"path":"/etc/hostname"}"#.into(),
|
||||||
|
}],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
session.history.push(Message {
|
||||||
|
role: Role::Tool,
|
||||||
|
content: MessageContent::ToolResult {
|
||||||
|
tool_call_id: "call_0".into(),
|
||||||
|
content: "host".into(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
save_to_dir(&dir, &session).expect("save");
|
||||||
|
let loaded = load_from_dir(&dir, &SessionId::new("hxa-2")).expect("load");
|
||||||
|
assert_eq!(loaded.history.len(), 4);
|
||||||
|
match &loaded.history[2].content {
|
||||||
|
MessageContent::ToolCalls { calls, .. } => {
|
||||||
|
assert_eq!(calls[0].name, "read_file");
|
||||||
|
}
|
||||||
|
other => panic!("expected ToolCalls, got {other:?}"),
|
||||||
|
}
|
||||||
|
let _ = std::fs::remove_dir_all(&dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn sanitize_id_rejects_path_traversal() {
|
||||||
|
// `../../etc/passwd` — 6 non-alnum chars before "etc"
|
||||||
|
// (`.`, `.`, `/`, `.`, `.`, `/`), one between, none
|
||||||
|
// after, none before nothing. Every disallowed char
|
||||||
|
// collapses to `_`.
|
||||||
|
assert_eq!(sanitize_id("../../etc/passwd"), "______etc_passwd");
|
||||||
|
assert_eq!(sanitize_id("ok-name_42"), "ok-name_42");
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user