feat(ecc2): implement session create/destroy lifecycle (#764)

- Process spawning via tokio::process::Command
- Session state transitions with guards (Pending->Running->Completed/Failed/Stopped)
- Stop with process kill and optional worktree cleanup
- Latest alias resolver in get_status
- SQLite store migrations for state tracking
This commit is contained in:
Affaan Mustafa
2026-03-23 03:46:17 -07:00
parent 2166d80d58
commit 2787b8e92f
6 changed files with 688 additions and 54 deletions

View File

@@ -1,5 +1,5 @@
use anyhow::Result;
use rusqlite::Connection;
use anyhow::{Context, Result};
use rusqlite::{Connection, OptionalExtension};
use std::path::Path;
use super::{Session, SessionMetrics, SessionState};
@@ -24,6 +24,7 @@ impl StateStore {
task TEXT NOT NULL,
agent_type TEXT NOT NULL,
state TEXT NOT NULL DEFAULT 'pending',
pid INTEGER,
worktree_path TEXT,
worktree_branch TEXT,
worktree_base TEXT,
@@ -62,18 +63,40 @@ impl StateStore {
CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_session, read);
",
)?;
self.ensure_session_columns()?;
Ok(())
}
fn ensure_session_columns(&self) -> Result<()> {
if !self.has_column("sessions", "pid")? {
self.conn
.execute("ALTER TABLE sessions ADD COLUMN pid INTEGER", [])
.context("Failed to add pid column to sessions table")?;
}
Ok(())
}
fn has_column(&self, table: &str, column: &str) -> Result<bool> {
let pragma = format!("PRAGMA table_info({table})");
let mut stmt = self.conn.prepare(&pragma)?;
let columns = stmt
.query_map([], |row| row.get::<_, String>(1))?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(columns.iter().any(|existing| existing == column))
}
pub fn insert_session(&self, session: &Session) -> Result<()> {
self.conn.execute(
"INSERT INTO sessions (id, task, agent_type, state, worktree_path, worktree_branch, worktree_base, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
"INSERT INTO sessions (id, task, agent_type, state, pid, worktree_path, worktree_branch, worktree_base, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
rusqlite::params![
session.id,
session.task,
session.agent_type,
session.state.to_string(),
session.pid.map(i64::from),
session.worktree.as_ref().map(|w| w.path.to_string_lossy().to_string()),
session.worktree.as_ref().map(|w| w.branch.clone()),
session.worktree.as_ref().map(|w| w.base_branch.clone()),
@@ -85,7 +108,26 @@ impl StateStore {
}
pub fn update_state(&self, session_id: &str, state: &SessionState) -> Result<()> {
self.conn.execute(
let current_state = self
.conn
.query_row(
"SELECT state FROM sessions WHERE id = ?1",
[session_id],
|row| row.get::<_, String>(0),
)
.optional()?
.map(|raw| SessionState::from_db_value(&raw))
.ok_or_else(|| anyhow::anyhow!("Session not found: {session_id}"))?;
if !current_state.can_transition_to(state) {
anyhow::bail!(
"Invalid session state transition: {} -> {}",
current_state,
state
);
}
let updated = self.conn.execute(
"UPDATE sessions SET state = ?1, updated_at = ?2 WHERE id = ?3",
rusqlite::params![
state.to_string(),
@@ -93,6 +135,28 @@ impl StateStore {
session_id,
],
)?;
if updated == 0 {
anyhow::bail!("Session not found: {session_id}");
}
Ok(())
}
pub fn update_pid(&self, session_id: &str, pid: Option<u32>) -> Result<()> {
let updated = self.conn.execute(
"UPDATE sessions SET pid = ?1, updated_at = ?2 WHERE id = ?3",
rusqlite::params![
pid.map(i64::from),
chrono::Utc::now().to_rfc3339(),
session_id,
],
)?;
if updated == 0 {
anyhow::bail!("Session not found: {session_id}");
}
Ok(())
}
@@ -114,7 +178,7 @@ impl StateStore {
pub fn list_sessions(&self) -> Result<Vec<Session>> {
let mut stmt = self.conn.prepare(
"SELECT id, task, agent_type, state, worktree_path, worktree_branch, worktree_base,
"SELECT id, task, agent_type, state, pid, worktree_path, worktree_branch, worktree_base,
tokens_used, tool_calls, files_changed, duration_secs, cost_usd,
created_at, updated_at
FROM sessions ORDER BY updated_at DESC",
@@ -123,30 +187,24 @@ impl StateStore {
let sessions = stmt
.query_map([], |row| {
let state_str: String = row.get(3)?;
let state = match state_str.as_str() {
"running" => SessionState::Running,
"idle" => SessionState::Idle,
"completed" => SessionState::Completed,
"failed" => SessionState::Failed,
"stopped" => SessionState::Stopped,
_ => SessionState::Pending,
};
let state = SessionState::from_db_value(&state_str);
let worktree_path: Option<String> = row.get(4)?;
let worktree_path: Option<String> = row.get(5)?;
let worktree = worktree_path.map(|p| super::WorktreeInfo {
path: std::path::PathBuf::from(p),
branch: row.get::<_, String>(5).unwrap_or_default(),
base_branch: row.get::<_, String>(6).unwrap_or_default(),
branch: row.get::<_, String>(6).unwrap_or_default(),
base_branch: row.get::<_, String>(7).unwrap_or_default(),
});
let created_str: String = row.get(12)?;
let updated_str: String = row.get(13)?;
let created_str: String = row.get(13)?;
let updated_str: String = row.get(14)?;
Ok(Session {
id: row.get(0)?,
task: row.get(1)?,
agent_type: row.get(2)?,
state,
pid: row.get::<_, Option<u32>>(4)?,
worktree,
created_at: chrono::DateTime::parse_from_rfc3339(&created_str)
.unwrap_or_default()
@@ -155,11 +213,11 @@ impl StateStore {
.unwrap_or_default()
.with_timezone(&chrono::Utc),
metrics: SessionMetrics {
tokens_used: row.get(7)?,
tool_calls: row.get(8)?,
files_changed: row.get(9)?,
duration_secs: row.get(10)?,
cost_usd: row.get(11)?,
tokens_used: row.get(8)?,
tool_calls: row.get(9)?,
files_changed: row.get(10)?,
duration_secs: row.get(11)?,
cost_usd: row.get(12)?,
},
})
})?
@@ -168,18 +226,18 @@ impl StateStore {
Ok(sessions)
}
pub fn get_session(&self, id: &str) -> Result<Option<Session>> {
let sessions = self.list_sessions()?;
Ok(sessions.into_iter().find(|s| s.id == id || s.id.starts_with(id)))
pub fn get_latest_session(&self) -> Result<Option<Session>> {
Ok(self.list_sessions()?.into_iter().next())
}
pub fn send_message(
&self,
from: &str,
to: &str,
content: &str,
msg_type: &str,
) -> Result<()> {
pub fn get_session(&self, id: &str) -> Result<Option<Session>> {
let sessions = self.list_sessions()?;
Ok(sessions
.into_iter()
.find(|s| s.id == id || s.id.starts_with(id)))
}
pub fn send_message(&self, from: &str, to: &str, content: &str, msg_type: &str) -> Result<()> {
self.conn.execute(
"INSERT INTO messages (from_session, to_session, content, msg_type, timestamp)
VALUES (?1, ?2, ?3, ?4, ?5)",
@@ -188,3 +246,105 @@ impl StateStore {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::session::{Session, SessionMetrics, SessionState};
use chrono::{Duration, Utc};
use std::fs;
use std::path::{Path, PathBuf};
struct TestDir {
path: PathBuf,
}
impl TestDir {
fn new(label: &str) -> Result<Self> {
let path =
std::env::temp_dir().join(format!("ecc2-{}-{}", label, uuid::Uuid::new_v4()));
fs::create_dir_all(&path)?;
Ok(Self { path })
}
fn path(&self) -> &Path {
&self.path
}
}
impl Drop for TestDir {
fn drop(&mut self) {
let _ = fs::remove_dir_all(&self.path);
}
}
fn build_session(id: &str, state: SessionState) -> Session {
let now = Utc::now();
Session {
id: id.to_string(),
task: "task".to_string(),
agent_type: "claude".to_string(),
state,
pid: None,
worktree: None,
created_at: now - Duration::minutes(1),
updated_at: now,
metrics: SessionMetrics::default(),
}
}
#[test]
fn update_state_rejects_invalid_terminal_transition() -> Result<()> {
let tempdir = TestDir::new("store-invalid-transition")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
db.insert_session(&build_session("done", SessionState::Completed))?;
let error = db
.update_state("done", &SessionState::Running)
.expect_err("completed sessions must not transition back to running");
assert!(error
.to_string()
.contains("Invalid session state transition"));
Ok(())
}
#[test]
fn open_migrates_existing_sessions_table_with_pid_column() -> Result<()> {
let tempdir = TestDir::new("store-migration")?;
let db_path = tempdir.path().join("state.db");
let conn = Connection::open(&db_path)?;
conn.execute_batch(
"
CREATE TABLE sessions (
id TEXT PRIMARY KEY,
task TEXT NOT NULL,
agent_type TEXT NOT NULL,
state TEXT NOT NULL DEFAULT 'pending',
worktree_path TEXT,
worktree_branch TEXT,
worktree_base TEXT,
tokens_used INTEGER DEFAULT 0,
tool_calls INTEGER DEFAULT 0,
files_changed INTEGER DEFAULT 0,
duration_secs INTEGER DEFAULT 0,
cost_usd REAL DEFAULT 0.0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
",
)?;
drop(conn);
let db = StateStore::open(&db_path)?;
let mut stmt = db.conn.prepare("PRAGMA table_info(sessions)")?;
let column_names = stmt
.query_map([], |row| row.get::<_, String>(1))?
.collect::<std::result::Result<Vec<_>, _>>()?;
assert!(column_names.iter().any(|column| column == "pid"));
Ok(())
}
}