From 2787b8e92f84ab2de722f802adbbd0af2f023971 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Mon, 23 Mar 2026 03:46:17 -0700 Subject: [PATCH] feat(ecc2): implement session create/destroy lifecycle (#764) - Process spawning via tokio::process::Command - Session state transitions with guards (Pending->Running->Completed/Failed/Stopped) - Stop with process kill and optional worktree cleanup - Latest alias resolver in get_status - SQLite store migrations for state tracking --- ecc2/Cargo.lock | 1 + ecc2/Cargo.toml | 1 + ecc2/src/session/manager.rs | 440 ++++++++++++++++++++++++++++++++++-- ecc2/src/session/mod.rs | 43 +++- ecc2/src/session/store.rs | 228 ++++++++++++++++--- ecc2/src/worktree/mod.rs | 29 ++- 6 files changed, 688 insertions(+), 54 deletions(-) diff --git a/ecc2/Cargo.lock b/ecc2/Cargo.lock index 59a060d3..f7c88b11 100644 --- a/ecc2/Cargo.lock +++ b/ecc2/Cargo.lock @@ -332,6 +332,7 @@ dependencies = [ "crossterm", "dirs", "git2", + "libc", "ratatui", "rusqlite", "serde", diff --git a/ecc2/Cargo.toml b/ecc2/Cargo.toml index 88265f97..3f45eca5 100644 --- a/ecc2/Cargo.toml +++ b/ecc2/Cargo.toml @@ -36,6 +36,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } # Error handling anyhow = "1" thiserror = "2" +libc = "0.2" # Time chrono = { version = "0.4", features = ["serde"] } diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index c08c5f0d..23e439f1 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -1,8 +1,11 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use std::fmt; +use std::path::{Path, PathBuf}; +use std::process::Stdio; +use tokio::process::Command; -use super::{Session, SessionMetrics, SessionState}; use super::store::StateStore; +use super::{Session, SessionMetrics, SessionState}; use crate::config::Config; use crate::worktree; @@ -12,12 +15,67 @@ pub async fn create_session( task: &str, agent_type: &str, use_worktree: bool, +) -> Result { + let repo_root = + std::env::current_dir().context("Failed to resolve current working directory")?; + let agent_program = agent_program(agent_type)?; + + create_session_in_dir( + db, + cfg, + task, + agent_type, + use_worktree, + &repo_root, + &agent_program, + ) + .await +} + +pub fn list_sessions(db: &StateStore) -> Result> { + db.list_sessions() +} + +pub fn get_status(db: &StateStore, id: &str) -> Result { + let session = resolve_session(db, id)?; + Ok(SessionStatus(session)) +} + +pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> { + stop_session_with_options(db, id, true).await +} + +fn agent_program(agent_type: &str) -> Result { + match agent_type { + "claude" => Ok(PathBuf::from("claude")), + other => anyhow::bail!("Unsupported agent type: {other}"), + } +} + +fn resolve_session(db: &StateStore, id: &str) -> Result { + let session = if id == "latest" { + db.get_latest_session()? + } else { + db.get_session(id)? + }; + + session.ok_or_else(|| anyhow::anyhow!("Session not found: {id}")) +} + +async fn create_session_in_dir( + db: &StateStore, + cfg: &Config, + task: &str, + agent_type: &str, + use_worktree: bool, + repo_root: &Path, + agent_program: &Path, ) -> Result { let id = uuid::Uuid::new_v4().to_string()[..8].to_string(); let now = chrono::Utc::now(); let wt = if use_worktree { - Some(worktree::create_for_session(&id, cfg)?) + Some(worktree::create_for_session_in_repo(&id, cfg, repo_root)?) } else { None }; @@ -27,6 +85,7 @@ pub async fn create_session( task: task.to_string(), agent_type: agent_type.to_string(), state: SessionState::Pending, + pid: None, worktree: wt, created_at: now, updated_at: now, @@ -34,25 +93,123 @@ pub async fn create_session( }; db.insert_session(&session)?; - Ok(id) + + let working_dir = session + .worktree + .as_ref() + .map(|worktree| worktree.path.as_path()) + .unwrap_or(repo_root); + + match spawn_claude_code(agent_program, task, &session.id, working_dir).await { + Ok(pid) => { + db.update_pid(&session.id, Some(pid))?; + db.update_state(&session.id, &SessionState::Running)?; + Ok(session.id) + } + Err(error) => { + db.update_state(&session.id, &SessionState::Failed)?; + + if let Some(worktree) = session.worktree.as_ref() { + let _ = crate::worktree::remove(&worktree.path); + } + + Err(error.context(format!("Failed to start session {}", session.id))) + } + } } -pub fn list_sessions(db: &StateStore) -> Result> { - db.list_sessions() +async fn spawn_claude_code( + agent_program: &Path, + task: &str, + session_id: &str, + working_dir: &Path, +) -> Result { + let child = Command::new(agent_program) + .arg("--print") + .arg("--name") + .arg(format!("ecc-{session_id}")) + .arg(task) + .current_dir(working_dir) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .with_context(|| { + format!( + "Failed to spawn Claude Code from {}", + agent_program.display() + ) + })?; + + child + .id() + .ok_or_else(|| anyhow::anyhow!("Claude Code did not expose a process id")) } -pub fn get_status(db: &StateStore, id: &str) -> Result { - let session = db - .get_session(id)? - .ok_or_else(|| anyhow::anyhow!("Session not found: {id}"))?; - Ok(SessionStatus(session)) -} +async fn stop_session_with_options( + db: &StateStore, + id: &str, + cleanup_worktree: bool, +) -> Result<()> { + let session = resolve_session(db, id)?; + + if let Some(pid) = session.pid { + kill_process(pid).await?; + } + + db.update_pid(&session.id, None)?; + db.update_state(&session.id, &SessionState::Stopped)?; + + if cleanup_worktree { + if let Some(worktree) = session.worktree.as_ref() { + crate::worktree::remove(&worktree.path)?; + } + } -pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> { - db.update_state(id, &SessionState::Stopped)?; Ok(()) } +#[cfg(unix)] +async fn kill_process(pid: u32) -> Result<()> { + send_signal(pid, libc::SIGTERM)?; + tokio::time::sleep(std::time::Duration::from_millis(1200)).await; + send_signal(pid, libc::SIGKILL)?; + Ok(()) +} + +#[cfg(unix)] +fn send_signal(pid: u32, signal: i32) -> Result<()> { + let outcome = unsafe { libc::kill(pid as i32, signal) }; + if outcome == 0 { + return Ok(()); + } + + let error = std::io::Error::last_os_error(); + if error.raw_os_error() == Some(libc::ESRCH) { + return Ok(()); + } + + Err(error).with_context(|| format!("Failed to kill process {pid}")) +} + +#[cfg(not(unix))] +async fn kill_process(pid: u32) -> Result<()> { + let status = Command::new("taskkill") + .args(["/F", "/PID", &pid.to_string()]) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .await + .with_context(|| format!("Failed to invoke taskkill for process {pid}"))?; + + if status.success() { + Ok(()) + } else { + anyhow::bail!("taskkill failed for process {pid}"); + } +} + pub struct SessionStatus(Session); impl fmt::Display for SessionStatus { @@ -62,6 +219,9 @@ impl fmt::Display for SessionStatus { writeln!(f, "Task: {}", s.task)?; writeln!(f, "Agent: {}", s.agent_type)?; writeln!(f, "State: {}", s.state)?; + if let Some(pid) = s.pid { + writeln!(f, "PID: {}", pid)?; + } if let Some(ref wt) = s.worktree { writeln!(f, "Branch: {}", wt.branch)?; writeln!(f, "Worktree: {}", wt.path.display())?; @@ -74,3 +234,255 @@ impl fmt::Display for SessionStatus { write!(f, "Updated: {}", s.updated_at) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::{Config, Theme}; + use crate::session::{Session, SessionMetrics, SessionState}; + use anyhow::{Context, Result}; + use chrono::{Duration, Utc}; + use std::fs; + use std::os::unix::fs::PermissionsExt; + use std::path::{Path, PathBuf}; + use std::process::Command as StdCommand; + use std::thread; + use std::time::Duration as StdDuration; + + struct TestDir { + path: PathBuf, + } + + impl TestDir { + fn new(label: &str) -> Result { + let path = + std::env::temp_dir().join(format!("ecc2-{}-{}", label, uuid::Uuid::new_v4())); + fs::create_dir_all(&path)?; + Ok(Self { path }) + } + + fn path(&self) -> &Path { + &self.path + } + } + + impl Drop for TestDir { + fn drop(&mut self) { + let _ = fs::remove_dir_all(&self.path); + } + } + + fn build_config(root: &Path) -> Config { + Config { + db_path: root.join("state.db"), + worktree_root: root.join("worktrees"), + max_parallel_sessions: 4, + max_parallel_worktrees: 4, + session_timeout_secs: 60, + heartbeat_interval_secs: 5, + default_agent: "claude".to_string(), + theme: Theme::Dark, + } + } + + fn build_session(id: &str, state: SessionState, updated_at: chrono::DateTime) -> Session { + Session { + id: id.to_string(), + task: format!("task-{id}"), + agent_type: "claude".to_string(), + state, + pid: None, + worktree: None, + created_at: updated_at - Duration::minutes(1), + updated_at, + metrics: SessionMetrics::default(), + } + } + + fn init_git_repo(path: &Path) -> Result<()> { + fs::create_dir_all(path)?; + run_git(path, ["init", "-q"])?; + fs::write(path.join("README.md"), "hello\n")?; + run_git(path, ["add", "README.md"])?; + run_git( + path, + [ + "-c", + "user.name=ECC Tests", + "-c", + "user.email=ecc-tests@example.com", + "commit", + "-qm", + "init", + ], + )?; + Ok(()) + } + + fn run_git(path: &Path, args: [&str; N]) -> Result<()> { + let status = StdCommand::new("git") + .args(args) + .current_dir(path) + .status() + .with_context(|| format!("failed to run git in {}", path.display()))?; + + if !status.success() { + anyhow::bail!("git command failed in {}", path.display()); + } + + Ok(()) + } + + fn write_fake_claude(root: &Path) -> Result<(PathBuf, PathBuf)> { + let script_path = root.join("fake-claude.sh"); + let log_path = root.join("fake-claude.log"); + let script = format!( + "#!/usr/bin/env python3\nimport os\nimport pathlib\nimport signal\nimport sys\nimport time\n\nlog_path = pathlib.Path(r\"{}\")\nlog_path.write_text(os.getcwd() + \"\\n\", encoding=\"utf-8\")\nwith log_path.open(\"a\", encoding=\"utf-8\") as handle:\n handle.write(\" \".join(sys.argv[1:]) + \"\\n\")\n\ndef handle_term(signum, frame):\n raise SystemExit(0)\n\nsignal.signal(signal.SIGTERM, handle_term)\nwhile True:\n time.sleep(0.1)\n", + log_path.display() + ); + + fs::write(&script_path, script)?; + let mut permissions = fs::metadata(&script_path)?.permissions(); + permissions.set_mode(0o755); + fs::set_permissions(&script_path, permissions)?; + + Ok((script_path, log_path)) + } + + fn wait_for_file(path: &Path) -> Result { + for _ in 0..50 { + if path.exists() { + return fs::read_to_string(path) + .with_context(|| format!("failed to read {}", path.display())); + } + + thread::sleep(StdDuration::from_millis(20)); + } + + anyhow::bail!("timed out waiting for {}", path.display()); + } + + #[tokio::test(flavor = "current_thread")] + async fn create_session_spawns_process_and_marks_session_running() -> Result<()> { + let tempdir = TestDir::new("manager-create-session")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let (fake_claude, log_path) = write_fake_claude(tempdir.path())?; + + let session_id = create_session_in_dir( + &db, + &cfg, + "implement lifecycle", + "claude", + false, + &repo_root, + &fake_claude, + ) + .await?; + + let session = db + .get_session(&session_id)? + .context("session should exist")?; + assert_eq!(session.state, SessionState::Running); + assert!( + session.pid.is_some(), + "spawned session should persist a pid" + ); + + let log = wait_for_file(&log_path)?; + assert!(log.contains(repo_root.to_string_lossy().as_ref())); + assert!(log.contains("--print")); + assert!(log.contains("implement lifecycle")); + + stop_session_with_options(&db, &session_id, false).await?; + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn stop_session_kills_process_and_optionally_cleans_worktree() -> Result<()> { + let tempdir = TestDir::new("manager-stop-session")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let (fake_claude, _) = write_fake_claude(tempdir.path())?; + + let keep_id = create_session_in_dir( + &db, + &cfg, + "keep worktree", + "claude", + true, + &repo_root, + &fake_claude, + ) + .await?; + let keep_session = db.get_session(&keep_id)?.context("keep session missing")?; + keep_session.pid.context("keep session pid missing")?; + let keep_worktree = keep_session + .worktree + .clone() + .context("keep session worktree missing")? + .path; + + stop_session_with_options(&db, &keep_id, false).await?; + + let stopped_keep = db + .get_session(&keep_id)? + .context("stopped keep session missing")?; + assert_eq!(stopped_keep.state, SessionState::Stopped); + assert_eq!(stopped_keep.pid, None); + assert!( + keep_worktree.exists(), + "worktree should remain when cleanup is disabled" + ); + + let cleanup_id = create_session_in_dir( + &db, + &cfg, + "cleanup worktree", + "claude", + true, + &repo_root, + &fake_claude, + ) + .await?; + let cleanup_session = db + .get_session(&cleanup_id)? + .context("cleanup session missing")?; + let cleanup_worktree = cleanup_session + .worktree + .clone() + .context("cleanup session worktree missing")? + .path; + + stop_session_with_options(&db, &cleanup_id, true).await?; + assert!( + !cleanup_worktree.exists(), + "worktree should be removed when cleanup is enabled" + ); + + Ok(()) + } + + #[test] + fn get_status_supports_latest_alias() -> Result<()> { + let tempdir = TestDir::new("manager-latest-status")?; + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let older = Utc::now() - Duration::minutes(2); + let newer = Utc::now(); + + db.insert_session(&build_session("older", SessionState::Running, older))?; + db.insert_session(&build_session("newer", SessionState::Idle, newer))?; + + let status = get_status(&db, "latest")?; + assert_eq!(status.0.id, "newer"); + + Ok(()) + } +} diff --git a/ecc2/src/session/mod.rs b/ecc2/src/session/mod.rs index 67aeb05a..9f8d2b2f 100644 --- a/ecc2/src/session/mod.rs +++ b/ecc2/src/session/mod.rs @@ -13,13 +13,14 @@ pub struct Session { pub task: String, pub agent_type: String, pub state: SessionState, + pub pid: Option, pub worktree: Option, pub created_at: DateTime, pub updated_at: DateTime, pub metrics: SessionMetrics, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum SessionState { Pending, Running, @@ -42,6 +43,46 @@ impl fmt::Display for SessionState { } } +impl SessionState { + pub fn can_transition_to(&self, next: &Self) -> bool { + if self == next { + return true; + } + + matches!( + (self, next), + ( + SessionState::Pending, + SessionState::Running | SessionState::Failed | SessionState::Stopped + ) | ( + SessionState::Running, + SessionState::Idle + | SessionState::Completed + | SessionState::Failed + | SessionState::Stopped + ) | ( + SessionState::Idle, + SessionState::Running + | SessionState::Completed + | SessionState::Failed + | SessionState::Stopped + ) | (SessionState::Completed, SessionState::Stopped) + | (SessionState::Failed, SessionState::Stopped) + ) + } + + pub fn from_db_value(value: &str) -> Self { + match value { + "running" => SessionState::Running, + "idle" => SessionState::Idle, + "completed" => SessionState::Completed, + "failed" => SessionState::Failed, + "stopped" => SessionState::Stopped, + _ => SessionState::Pending, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorktreeInfo { pub path: PathBuf, diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index b412f188..499141dd 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -1,5 +1,5 @@ -use anyhow::Result; -use rusqlite::Connection; +use anyhow::{Context, Result}; +use rusqlite::{Connection, OptionalExtension}; use std::path::Path; use super::{Session, SessionMetrics, SessionState}; @@ -24,6 +24,7 @@ impl StateStore { task TEXT NOT NULL, agent_type TEXT NOT NULL, state TEXT NOT NULL DEFAULT 'pending', + pid INTEGER, worktree_path TEXT, worktree_branch TEXT, worktree_base TEXT, @@ -62,18 +63,40 @@ impl StateStore { CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_session, read); ", )?; + self.ensure_session_columns()?; Ok(()) } + fn ensure_session_columns(&self) -> Result<()> { + if !self.has_column("sessions", "pid")? { + self.conn + .execute("ALTER TABLE sessions ADD COLUMN pid INTEGER", []) + .context("Failed to add pid column to sessions table")?; + } + + Ok(()) + } + + fn has_column(&self, table: &str, column: &str) -> Result { + let pragma = format!("PRAGMA table_info({table})"); + let mut stmt = self.conn.prepare(&pragma)?; + let columns = stmt + .query_map([], |row| row.get::<_, String>(1))? + .collect::, _>>()?; + + Ok(columns.iter().any(|existing| existing == column)) + } + pub fn insert_session(&self, session: &Session) -> Result<()> { self.conn.execute( - "INSERT INTO sessions (id, task, agent_type, state, worktree_path, worktree_branch, worktree_base, created_at, updated_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + "INSERT INTO sessions (id, task, agent_type, state, pid, worktree_path, worktree_branch, worktree_base, created_at, updated_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", rusqlite::params![ session.id, session.task, session.agent_type, session.state.to_string(), + session.pid.map(i64::from), session.worktree.as_ref().map(|w| w.path.to_string_lossy().to_string()), session.worktree.as_ref().map(|w| w.branch.clone()), session.worktree.as_ref().map(|w| w.base_branch.clone()), @@ -85,7 +108,26 @@ impl StateStore { } pub fn update_state(&self, session_id: &str, state: &SessionState) -> Result<()> { - self.conn.execute( + let current_state = self + .conn + .query_row( + "SELECT state FROM sessions WHERE id = ?1", + [session_id], + |row| row.get::<_, String>(0), + ) + .optional()? + .map(|raw| SessionState::from_db_value(&raw)) + .ok_or_else(|| anyhow::anyhow!("Session not found: {session_id}"))?; + + if !current_state.can_transition_to(state) { + anyhow::bail!( + "Invalid session state transition: {} -> {}", + current_state, + state + ); + } + + let updated = self.conn.execute( "UPDATE sessions SET state = ?1, updated_at = ?2 WHERE id = ?3", rusqlite::params![ state.to_string(), @@ -93,6 +135,28 @@ impl StateStore { session_id, ], )?; + + if updated == 0 { + anyhow::bail!("Session not found: {session_id}"); + } + + Ok(()) + } + + pub fn update_pid(&self, session_id: &str, pid: Option) -> Result<()> { + let updated = self.conn.execute( + "UPDATE sessions SET pid = ?1, updated_at = ?2 WHERE id = ?3", + rusqlite::params![ + pid.map(i64::from), + chrono::Utc::now().to_rfc3339(), + session_id, + ], + )?; + + if updated == 0 { + anyhow::bail!("Session not found: {session_id}"); + } + Ok(()) } @@ -114,7 +178,7 @@ impl StateStore { pub fn list_sessions(&self) -> Result> { let mut stmt = self.conn.prepare( - "SELECT id, task, agent_type, state, worktree_path, worktree_branch, worktree_base, + "SELECT id, task, agent_type, state, pid, worktree_path, worktree_branch, worktree_base, tokens_used, tool_calls, files_changed, duration_secs, cost_usd, created_at, updated_at FROM sessions ORDER BY updated_at DESC", @@ -123,30 +187,24 @@ impl StateStore { let sessions = stmt .query_map([], |row| { let state_str: String = row.get(3)?; - let state = match state_str.as_str() { - "running" => SessionState::Running, - "idle" => SessionState::Idle, - "completed" => SessionState::Completed, - "failed" => SessionState::Failed, - "stopped" => SessionState::Stopped, - _ => SessionState::Pending, - }; + let state = SessionState::from_db_value(&state_str); - let worktree_path: Option = row.get(4)?; + let worktree_path: Option = row.get(5)?; let worktree = worktree_path.map(|p| super::WorktreeInfo { path: std::path::PathBuf::from(p), - branch: row.get::<_, String>(5).unwrap_or_default(), - base_branch: row.get::<_, String>(6).unwrap_or_default(), + branch: row.get::<_, String>(6).unwrap_or_default(), + base_branch: row.get::<_, String>(7).unwrap_or_default(), }); - let created_str: String = row.get(12)?; - let updated_str: String = row.get(13)?; + let created_str: String = row.get(13)?; + let updated_str: String = row.get(14)?; Ok(Session { id: row.get(0)?, task: row.get(1)?, agent_type: row.get(2)?, state, + pid: row.get::<_, Option>(4)?, worktree, created_at: chrono::DateTime::parse_from_rfc3339(&created_str) .unwrap_or_default() @@ -155,11 +213,11 @@ impl StateStore { .unwrap_or_default() .with_timezone(&chrono::Utc), metrics: SessionMetrics { - tokens_used: row.get(7)?, - tool_calls: row.get(8)?, - files_changed: row.get(9)?, - duration_secs: row.get(10)?, - cost_usd: row.get(11)?, + tokens_used: row.get(8)?, + tool_calls: row.get(9)?, + files_changed: row.get(10)?, + duration_secs: row.get(11)?, + cost_usd: row.get(12)?, }, }) })? @@ -168,18 +226,18 @@ impl StateStore { Ok(sessions) } - pub fn get_session(&self, id: &str) -> Result> { - let sessions = self.list_sessions()?; - Ok(sessions.into_iter().find(|s| s.id == id || s.id.starts_with(id))) + pub fn get_latest_session(&self) -> Result> { + Ok(self.list_sessions()?.into_iter().next()) } - pub fn send_message( - &self, - from: &str, - to: &str, - content: &str, - msg_type: &str, - ) -> Result<()> { + pub fn get_session(&self, id: &str) -> Result> { + let sessions = self.list_sessions()?; + Ok(sessions + .into_iter() + .find(|s| s.id == id || s.id.starts_with(id))) + } + + pub fn send_message(&self, from: &str, to: &str, content: &str, msg_type: &str) -> Result<()> { self.conn.execute( "INSERT INTO messages (from_session, to_session, content, msg_type, timestamp) VALUES (?1, ?2, ?3, ?4, ?5)", @@ -188,3 +246,105 @@ impl StateStore { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::session::{Session, SessionMetrics, SessionState}; + use chrono::{Duration, Utc}; + use std::fs; + use std::path::{Path, PathBuf}; + + struct TestDir { + path: PathBuf, + } + + impl TestDir { + fn new(label: &str) -> Result { + let path = + std::env::temp_dir().join(format!("ecc2-{}-{}", label, uuid::Uuid::new_v4())); + fs::create_dir_all(&path)?; + Ok(Self { path }) + } + + fn path(&self) -> &Path { + &self.path + } + } + + impl Drop for TestDir { + fn drop(&mut self) { + let _ = fs::remove_dir_all(&self.path); + } + } + + fn build_session(id: &str, state: SessionState) -> Session { + let now = Utc::now(); + Session { + id: id.to_string(), + task: "task".to_string(), + agent_type: "claude".to_string(), + state, + pid: None, + worktree: None, + created_at: now - Duration::minutes(1), + updated_at: now, + metrics: SessionMetrics::default(), + } + } + + #[test] + fn update_state_rejects_invalid_terminal_transition() -> Result<()> { + let tempdir = TestDir::new("store-invalid-transition")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + + db.insert_session(&build_session("done", SessionState::Completed))?; + + let error = db + .update_state("done", &SessionState::Running) + .expect_err("completed sessions must not transition back to running"); + + assert!(error + .to_string() + .contains("Invalid session state transition")); + Ok(()) + } + + #[test] + fn open_migrates_existing_sessions_table_with_pid_column() -> Result<()> { + let tempdir = TestDir::new("store-migration")?; + let db_path = tempdir.path().join("state.db"); + + let conn = Connection::open(&db_path)?; + conn.execute_batch( + " + CREATE TABLE sessions ( + id TEXT PRIMARY KEY, + task TEXT NOT NULL, + agent_type TEXT NOT NULL, + state TEXT NOT NULL DEFAULT 'pending', + worktree_path TEXT, + worktree_branch TEXT, + worktree_base TEXT, + tokens_used INTEGER DEFAULT 0, + tool_calls INTEGER DEFAULT 0, + files_changed INTEGER DEFAULT 0, + duration_secs INTEGER DEFAULT 0, + cost_usd REAL DEFAULT 0.0, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + ", + )?; + drop(conn); + + let db = StateStore::open(&db_path)?; + let mut stmt = db.conn.prepare("PRAGMA table_info(sessions)")?; + let column_names = stmt + .query_map([], |row| row.get::<_, String>(1))? + .collect::, _>>()?; + + assert!(column_names.iter().any(|column| column == "pid")); + Ok(()) + } +} diff --git a/ecc2/src/worktree/mod.rs b/ecc2/src/worktree/mod.rs index 50306f2a..d4183bdb 100644 --- a/ecc2/src/worktree/mod.rs +++ b/ecc2/src/worktree/mod.rs @@ -1,5 +1,5 @@ use anyhow::{Context, Result}; -use std::path::PathBuf; +use std::path::Path; use std::process::Command; use crate::config::Config; @@ -7,16 +7,27 @@ use crate::session::WorktreeInfo; /// Create a new git worktree for an agent session. pub fn create_for_session(session_id: &str, cfg: &Config) -> Result { + let repo_root = std::env::current_dir().context("Failed to resolve repository root")?; + create_for_session_in_repo(session_id, cfg, &repo_root) +} + +pub(crate) fn create_for_session_in_repo( + session_id: &str, + cfg: &Config, + repo_root: &Path, +) -> Result { let branch = format!("ecc/{session_id}"); let path = cfg.worktree_root.join(session_id); // Get current branch as base - let base = get_current_branch()?; + let base = get_current_branch(repo_root)?; std::fs::create_dir_all(&cfg.worktree_root) .context("Failed to create worktree root directory")?; let output = Command::new("git") + .arg("-C") + .arg(repo_root) .args(["worktree", "add", "-b", &branch]) .arg(&path) .arg("HEAD") @@ -28,7 +39,11 @@ pub fn create_for_session(session_id: &str, cfg: &Config) -> Result Result Result<()> { +pub fn remove(path: &Path) -> Result<()> { let output = Command::new("git") + .arg("-C") + .arg(path) .args(["worktree", "remove", "--force"]) .arg(path) .output() @@ -70,8 +87,10 @@ pub fn list() -> Result> { Ok(worktrees) } -fn get_current_branch() -> Result { +fn get_current_branch(repo_root: &Path) -> Result { let output = Command::new("git") + .arg("-C") + .arg(repo_root) .args(["rev-parse", "--abbrev-ref", "HEAD"]) .output() .context("Failed to get current branch")?;