From df8c951ec2b9aaa61ff7d26a589aacfb6177ded5 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Tue, 24 Mar 2026 03:39:53 -0700 Subject: [PATCH] feat(ecc2): add crash resume session recovery --- ecc2/Cargo.lock | 1 + ecc2/Cargo.toml | 1 + ecc2/src/main.rs | 40 +++++++-- ecc2/src/session/daemon.rs | 133 ++++++++++++++++++++++++++- ecc2/src/session/manager.rs | 77 +++++++++++++++- ecc2/src/session/mod.rs | 1 + ecc2/src/session/store.rs | 173 +++++++++++++++++++++++++++++++----- ecc2/src/tui/dashboard.rs | 36 ++++++-- 8 files changed, 424 insertions(+), 38 deletions(-) diff --git a/ecc2/Cargo.lock b/ecc2/Cargo.lock index 59a060d3..f7c88b11 100644 --- a/ecc2/Cargo.lock +++ b/ecc2/Cargo.lock @@ -332,6 +332,7 @@ dependencies = [ "crossterm", "dirs", "git2", + "libc", "ratatui", "rusqlite", "serde", diff --git a/ecc2/Cargo.toml b/ecc2/Cargo.toml index 88265f97..3f45eca5 100644 --- a/ecc2/Cargo.toml +++ b/ecc2/Cargo.toml @@ -36,6 +36,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } # Error handling anyhow = "1" thiserror = "2" +libc = "0.2" # Time chrono = { version = "0.4", features = ["serde"] } diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 850b7b49..0fb22d28 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -1,9 +1,9 @@ +mod comms; mod config; +mod observability; mod session; mod tui; mod worktree; -mod observability; -mod comms; use anyhow::Result; use clap::Parser; @@ -44,6 +44,11 @@ enum Commands { /// Session ID or alias session_id: String, }, + /// Resume a failed or stopped session + Resume { + /// Session ID or alias + session_id: String, + }, /// Run as background daemon Daemon, } @@ -63,10 +68,13 @@ async fn main() -> Result<()> { Some(Commands::Dashboard) | None => { tui::app::run(db, cfg).await?; } - Some(Commands::Start { task, agent, worktree: use_worktree }) => { - let session_id = session::manager::create_session( - &db, &cfg, &task, &agent, use_worktree, - ).await?; + Some(Commands::Start { + task, + agent, + worktree: use_worktree, + }) => { + let session_id = + session::manager::create_session(&db, &cfg, &task, &agent, use_worktree).await?; println!("Session started: {session_id}"); } Some(Commands::Sessions) => { @@ -84,6 +92,10 @@ async fn main() -> Result<()> { session::manager::stop_session(&db, &session_id).await?; println!("Session stopped: {session_id}"); } + Some(Commands::Resume { session_id }) => { + let resumed_id = session::manager::resume_session(&db, &session_id).await?; + println!("Session resumed: {resumed_id}"); + } Some(Commands::Daemon) => { println!("Starting ECC daemon..."); session::daemon::run(db, cfg).await?; @@ -92,3 +104,19 @@ async fn main() -> Result<()> { Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cli_parses_resume_command() { + let cli = Cli::try_parse_from(["ecc", "resume", "deadbeef"]) + .expect("resume subcommand should parse"); + + match cli.command { + Some(Commands::Resume { session_id }) => assert_eq!(session_id, "deadbeef"), + _ => panic!("expected resume subcommand"), + } + } +} diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index 08969ca2..d9da8f0e 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -10,6 +10,7 @@ use crate::config::Config; /// and cleans up stale resources. pub async fn run(db: StateStore, cfg: Config) -> Result<()> { tracing::info!("ECC daemon started"); + resume_crashed_sessions(&db)?; let heartbeat_interval = Duration::from_secs(cfg.heartbeat_interval_secs); let timeout = Duration::from_secs(cfg.session_timeout_secs); @@ -23,6 +24,43 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> { } } +pub fn resume_crashed_sessions(db: &StateStore) -> Result<()> { + let failed_sessions = resume_crashed_sessions_with(db, pid_is_alive)?; + if failed_sessions > 0 { + tracing::warn!("Marked {failed_sessions} crashed sessions as failed during daemon startup"); + } + Ok(()) +} + +fn resume_crashed_sessions_with(db: &StateStore, is_pid_alive: F) -> Result +where + F: Fn(u32) -> bool, +{ + let sessions = db.list_sessions()?; + let mut failed_sessions = 0; + + for session in sessions { + if session.state != SessionState::Running { + continue; + } + + let is_alive = session.pid.is_some_and(&is_pid_alive); + if is_alive { + continue; + } + + tracing::warn!( + "Session {} was left running with stale pid {:?}; marking it failed", + session.id, + session.pid + ); + db.update_state_and_pid(&session.id, &SessionState::Failed, None)?; + failed_sessions += 1; + } + + Ok(failed_sessions) +} + fn check_sessions(db: &StateStore, timeout: Duration) -> Result<()> { let sessions = db.list_sessions()?; @@ -38,9 +76,102 @@ fn check_sessions(db: &StateStore, timeout: Duration) -> Result<()> { if elapsed > timeout { tracing::warn!("Session {} timed out after {:?}", session.id, elapsed); - db.update_state(&session.id, &SessionState::Failed)?; + db.update_state_and_pid(&session.id, &SessionState::Failed, None)?; } } Ok(()) } + +#[cfg(unix)] +fn pid_is_alive(pid: u32) -> bool { + if pid == 0 { + return false; + } + + // SAFETY: kill(pid, 0) probes process existence without delivering a signal. + let result = unsafe { libc::kill(pid as libc::pid_t, 0) }; + if result == 0 { + return true; + } + + matches!( + std::io::Error::last_os_error().raw_os_error(), + Some(code) if code == libc::EPERM + ) +} + +#[cfg(not(unix))] +fn pid_is_alive(_pid: u32) -> bool { + false +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::session::{Session, SessionMetrics, SessionState}; + use std::path::PathBuf; + + fn temp_db_path() -> PathBuf { + std::env::temp_dir().join(format!("ecc2-daemon-test-{}.db", uuid::Uuid::new_v4())) + } + + fn sample_session(id: &str, state: SessionState, pid: Option) -> Session { + let now = chrono::Utc::now(); + Session { + id: id.to_string(), + task: "Recover crashed worker".to_string(), + agent_type: "claude".to_string(), + state, + pid, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + } + } + + #[test] + fn resume_crashed_sessions_marks_dead_running_sessions_failed() -> Result<()> { + let path = temp_db_path(); + let store = StateStore::open(&path)?; + store.insert_session(&sample_session( + "deadbeef", + SessionState::Running, + Some(4242), + ))?; + + resume_crashed_sessions_with(&store, |_| false)?; + + let session = store + .get_session("deadbeef")? + .expect("session should still exist"); + assert_eq!(session.state, SessionState::Failed); + assert_eq!(session.pid, None); + + let _ = std::fs::remove_file(path); + Ok(()) + } + + #[test] + fn resume_crashed_sessions_keeps_live_running_sessions_running() -> Result<()> { + let path = temp_db_path(); + let store = StateStore::open(&path)?; + store.insert_session(&sample_session( + "alive123", + SessionState::Running, + Some(7777), + ))?; + + resume_crashed_sessions_with(&store, |_| true)?; + + let session = store + .get_session("alive123")? + .expect("session should still exist"); + assert_eq!(session.state, SessionState::Running); + assert_eq!(session.pid, Some(7777)); + + let _ = std::fs::remove_file(path); + Ok(()) + } +} diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index c08c5f0d..03163976 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -1,8 +1,8 @@ use anyhow::Result; use std::fmt; -use super::{Session, SessionMetrics, SessionState}; use super::store::StateStore; +use super::{Session, SessionMetrics, SessionState}; use crate::config::Config; use crate::worktree; @@ -27,6 +27,7 @@ pub async fn create_session( task: task.to_string(), agent_type: agent_type.to_string(), state: SessionState::Pending, + pid: None, worktree: wt, created_at: now, updated_at: now, @@ -49,10 +50,79 @@ pub fn get_status(db: &StateStore, id: &str) -> Result { } pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> { - db.update_state(id, &SessionState::Stopped)?; + let session = db + .get_session(id)? + .ok_or_else(|| anyhow::anyhow!("Session not found: {id}"))?; + + db.update_state_and_pid(&session.id, &SessionState::Stopped, None)?; Ok(()) } +pub async fn resume_session(db: &StateStore, id: &str) -> Result { + let session = db + .get_session(id)? + .ok_or_else(|| anyhow::anyhow!("Session not found: {id}"))?; + + if session.state == SessionState::Completed { + anyhow::bail!("Completed sessions cannot be resumed: {}", session.id); + } + + if session.state == SessionState::Running { + anyhow::bail!("Session is already running: {}", session.id); + } + + db.update_state_and_pid(&session.id, &SessionState::Pending, None)?; + Ok(session.id) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::session::store::StateStore; + use std::path::PathBuf; + + fn temp_db_path() -> PathBuf { + std::env::temp_dir().join(format!("ecc2-manager-test-{}.db", uuid::Uuid::new_v4())) + } + + fn sample_session(id: &str, state: SessionState, pid: Option) -> Session { + let now = chrono::Utc::now(); + Session { + id: id.to_string(), + task: "Resume previous task".to_string(), + agent_type: "claude".to_string(), + state, + pid, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + } + } + + #[tokio::test] + async fn resume_session_requeues_failed_session() -> Result<()> { + let path = temp_db_path(); + let store = StateStore::open(&path)?; + store.insert_session(&sample_session( + "deadbeef", + SessionState::Failed, + Some(31337), + ))?; + + let resumed_id = resume_session(&store, "deadbeef").await?; + let resumed = store + .get_session(&resumed_id)? + .expect("resumed session should exist"); + + assert_eq!(resumed.state, SessionState::Pending); + assert_eq!(resumed.pid, None); + + let _ = std::fs::remove_file(path); + Ok(()) + } +} + pub struct SessionStatus(Session); impl fmt::Display for SessionStatus { @@ -62,6 +132,9 @@ impl fmt::Display for SessionStatus { writeln!(f, "Task: {}", s.task)?; writeln!(f, "Agent: {}", s.agent_type)?; writeln!(f, "State: {}", s.state)?; + if let Some(pid) = s.pid { + writeln!(f, "PID: {pid}")?; + } if let Some(ref wt) = s.worktree { writeln!(f, "Branch: {}", wt.branch)?; writeln!(f, "Worktree: {}", wt.path.display())?; diff --git a/ecc2/src/session/mod.rs b/ecc2/src/session/mod.rs index 67aeb05a..19ccaae7 100644 --- a/ecc2/src/session/mod.rs +++ b/ecc2/src/session/mod.rs @@ -13,6 +13,7 @@ pub struct Session { pub task: String, pub agent_type: String, pub state: SessionState, + pub pid: Option, pub worktree: Option, pub created_at: DateTime, pub updated_at: DateTime, diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index b412f188..ea8034f3 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -24,6 +24,7 @@ impl StateStore { task TEXT NOT NULL, agent_type TEXT NOT NULL, state TEXT NOT NULL DEFAULT 'pending', + pid INTEGER, worktree_path TEXT, worktree_branch TEXT, worktree_base TEXT, @@ -62,18 +63,36 @@ impl StateStore { CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_session, read); ", )?; + self.ensure_sessions_pid_column()?; + Ok(()) + } + + fn ensure_sessions_pid_column(&self) -> Result<()> { + let mut stmt = self.conn.prepare("PRAGMA table_info(sessions)")?; + let mut rows = stmt.query([])?; + + while let Some(row) = rows.next()? { + let column_name: String = row.get(1)?; + if column_name == "pid" { + return Ok(()); + } + } + + self.conn + .execute("ALTER TABLE sessions ADD COLUMN pid INTEGER", [])?; Ok(()) } pub fn insert_session(&self, session: &Session) -> Result<()> { self.conn.execute( - "INSERT INTO sessions (id, task, agent_type, state, worktree_path, worktree_branch, worktree_base, created_at, updated_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + "INSERT INTO sessions (id, task, agent_type, state, pid, worktree_path, worktree_branch, worktree_base, created_at, updated_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", rusqlite::params![ session.id, session.task, session.agent_type, session.state.to_string(), + session.pid.map(i64::from), session.worktree.as_ref().map(|w| w.path.to_string_lossy().to_string()), session.worktree.as_ref().map(|w| w.branch.clone()), session.worktree.as_ref().map(|w| w.base_branch.clone()), @@ -84,6 +103,24 @@ impl StateStore { Ok(()) } + pub fn update_state_and_pid( + &self, + session_id: &str, + state: &SessionState, + pid: Option, + ) -> Result<()> { + self.conn.execute( + "UPDATE sessions SET state = ?1, pid = ?2, updated_at = ?3 WHERE id = ?4", + rusqlite::params![ + state.to_string(), + pid.map(i64::from), + chrono::Utc::now().to_rfc3339(), + session_id, + ], + )?; + Ok(()) + } + pub fn update_state(&self, session_id: &str, state: &SessionState) -> Result<()> { self.conn.execute( "UPDATE sessions SET state = ?1, updated_at = ?2 WHERE id = ?3", @@ -114,7 +151,7 @@ impl StateStore { pub fn list_sessions(&self) -> Result> { let mut stmt = self.conn.prepare( - "SELECT id, task, agent_type, state, worktree_path, worktree_branch, worktree_base, + "SELECT id, task, agent_type, state, pid, worktree_path, worktree_branch, worktree_base, tokens_used, tool_calls, files_changed, duration_secs, cost_usd, created_at, updated_at FROM sessions ORDER BY updated_at DESC", @@ -132,21 +169,26 @@ impl StateStore { _ => SessionState::Pending, }; - let worktree_path: Option = row.get(4)?; + let pid = row + .get::<_, Option>(4)? + .and_then(|value| u32::try_from(value).ok()); + + let worktree_path: Option = row.get(5)?; let worktree = worktree_path.map(|p| super::WorktreeInfo { path: std::path::PathBuf::from(p), - branch: row.get::<_, String>(5).unwrap_or_default(), - base_branch: row.get::<_, String>(6).unwrap_or_default(), + branch: row.get::<_, String>(6).unwrap_or_default(), + base_branch: row.get::<_, String>(7).unwrap_or_default(), }); - let created_str: String = row.get(12)?; - let updated_str: String = row.get(13)?; + let created_str: String = row.get(13)?; + let updated_str: String = row.get(14)?; Ok(Session { id: row.get(0)?, task: row.get(1)?, agent_type: row.get(2)?, state, + pid, worktree, created_at: chrono::DateTime::parse_from_rfc3339(&created_str) .unwrap_or_default() @@ -155,11 +197,11 @@ impl StateStore { .unwrap_or_default() .with_timezone(&chrono::Utc), metrics: SessionMetrics { - tokens_used: row.get(7)?, - tool_calls: row.get(8)?, - files_changed: row.get(9)?, - duration_secs: row.get(10)?, - cost_usd: row.get(11)?, + tokens_used: row.get(8)?, + tool_calls: row.get(9)?, + files_changed: row.get(10)?, + duration_secs: row.get(11)?, + cost_usd: row.get(12)?, }, }) })? @@ -170,16 +212,12 @@ impl StateStore { pub fn get_session(&self, id: &str) -> Result> { let sessions = self.list_sessions()?; - Ok(sessions.into_iter().find(|s| s.id == id || s.id.starts_with(id))) + Ok(sessions + .into_iter() + .find(|s| s.id == id || s.id.starts_with(id))) } - pub fn send_message( - &self, - from: &str, - to: &str, - content: &str, - msg_type: &str, - ) -> Result<()> { + pub fn send_message(&self, from: &str, to: &str, content: &str, msg_type: &str) -> Result<()> { self.conn.execute( "INSERT INTO messages (from_session, to_session, content, msg_type, timestamp) VALUES (?1, ?2, ?3, ?4, ?5)", @@ -188,3 +226,96 @@ impl StateStore { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::session::{Session, SessionMetrics, SessionState}; + use std::path::PathBuf; + + fn temp_db_path() -> PathBuf { + std::env::temp_dir().join(format!("ecc2-store-test-{}.db", uuid::Uuid::new_v4())) + } + + fn sample_session(id: &str, state: SessionState, pid: Option) -> Session { + let now = chrono::Utc::now(); + Session { + id: id.to_string(), + task: "Investigate crash".to_string(), + agent_type: "claude".to_string(), + state, + pid, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + } + } + + #[test] + fn open_migrates_existing_sessions_table_with_pid_column() -> Result<()> { + let path = temp_db_path(); + let conn = Connection::open(&path)?; + conn.execute_batch( + " + CREATE TABLE sessions ( + id TEXT PRIMARY KEY, + task TEXT NOT NULL, + agent_type TEXT NOT NULL, + state TEXT NOT NULL DEFAULT 'pending', + worktree_path TEXT, + worktree_branch TEXT, + worktree_base TEXT, + tokens_used INTEGER DEFAULT 0, + tool_calls INTEGER DEFAULT 0, + files_changed INTEGER DEFAULT 0, + duration_secs INTEGER DEFAULT 0, + cost_usd REAL DEFAULT 0.0, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + ", + )?; + conn.execute( + "INSERT INTO sessions (id, task, agent_type, state, created_at, updated_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + rusqlite::params![ + "legacy", + "Recover state", + "claude", + "running", + chrono::Utc::now().to_rfc3339(), + chrono::Utc::now().to_rfc3339(), + ], + )?; + drop(conn); + + let store = StateStore::open(&path)?; + let session = store + .get_session("legacy")? + .expect("legacy session should load"); + + assert_eq!(session.pid, None); + + let _ = std::fs::remove_file(path); + Ok(()) + } + + #[test] + fn insert_session_persists_pid() -> Result<()> { + let path = temp_db_path(); + let store = StateStore::open(&path)?; + let session = sample_session("abc12345", SessionState::Running, Some(4242)); + + store.insert_session(&session)?; + + let loaded = store + .get_session("abc12345")? + .expect("session should be persisted"); + assert_eq!(loaded.pid, Some(4242)); + assert_eq!(loaded.state, SessionState::Running); + + let _ = std::fs::remove_file(path); + Ok(()) + } +} diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index aca1e995..591866bd 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -4,8 +4,8 @@ use ratatui::{ }; use crate::config::Config; -use crate::session::{Session, SessionState}; use crate::session::store::StateStore; +use crate::session::{Session, SessionState}; pub struct Dashboard { db: StateStore, @@ -42,7 +42,7 @@ impl Dashboard { let chunks = Layout::default() .direction(Direction::Vertical) .constraints([ - Constraint::Length(3), // Header + Constraint::Length(3), // Header Constraint::Min(10), // Main content Constraint::Length(3), // Status bar ]) @@ -79,7 +79,11 @@ impl Dashboard { } fn render_header(&self, frame: &mut Frame, area: Rect) { - let running = self.sessions.iter().filter(|s| s.state == SessionState::Running).count(); + let running = self + .sessions + .iter() + .filter(|s| s.state == SessionState::Running) + .count(); let total = self.sessions.len(); let title = format!(" ECC 2.0 | {running} running / {total} total "); @@ -90,7 +94,11 @@ impl Dashboard { Pane::Output => 1, Pane::Metrics => 2, }) - .highlight_style(Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD)); + .highlight_style( + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + ); frame.render_widget(tabs, area); } @@ -110,11 +118,18 @@ impl Dashboard { SessionState::Pending => "◌", }; let style = if i == self.selected_session { - Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD) + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD) } else { Style::default() }; - let text = format!("{state_icon} {} [{}] {}", &s.id[..8.min(s.id.len())], s.agent_type, s.task); + let text = format!( + "{state_icon} {} [{}] {}", + &s.id[..8.min(s.id.len())], + s.agent_type, + s.task + ); ListItem::new(text).style(style) }) .collect(); @@ -136,7 +151,10 @@ impl Dashboard { fn render_output(&self, frame: &mut Frame, area: Rect) { let content = if let Some(session) = self.sessions.get(self.selected_session) { - format!("Agent output for session {}...\n\n(Live streaming coming soon)", session.id) + format!( + "Agent output for session {}...\n\n(Live streaming coming soon)", + session.id + ) } else { "No sessions. Press 'n' to start one.".to_string() }; @@ -253,7 +271,9 @@ impl Dashboard { pub fn stop_selected(&mut self) { if let Some(session) = self.sessions.get(self.selected_session) { - let _ = self.db.update_state(&session.id, &SessionState::Stopped); + let _ = self + .db + .update_state_and_pid(&session.id, &SessionState::Stopped, None); self.refresh(); } }