From 0166231ddbe5c8a04ef294521983277f7ed1dff7 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Tue, 24 Mar 2026 03:39:53 -0700 Subject: [PATCH] feat(ecc2): add crash resume session recovery --- ecc2/src/main.rs | 25 +++++++ ecc2/src/session/daemon.rs | 133 +++++++++++++++++++++++++++++++++++- ecc2/src/session/manager.rs | 45 ++++++++++++ ecc2/src/session/store.rs | 23 +++++++ ecc2/src/tui/dashboard.rs | 5 +- 5 files changed, 229 insertions(+), 2 deletions(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 476c2d3b..86bce735 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -45,6 +45,11 @@ enum Commands { /// Session ID or alias session_id: String, }, + /// Resume a failed or stopped session + Resume { + /// Session ID or alias + session_id: String, + }, /// Run as background daemon Daemon, #[command(hide = true)] @@ -99,6 +104,10 @@ async fn main() -> Result<()> { session::manager::stop_session(&db, &session_id).await?; println!("Session stopped: {session_id}"); } + Some(Commands::Resume { session_id }) => { + let resumed_id = session::manager::resume_session(&db, &session_id).await?; + println!("Session resumed: {resumed_id}"); + } Some(Commands::Daemon) => { println!("Starting ECC daemon..."); session::daemon::run(db, cfg).await?; @@ -115,3 +124,19 @@ async fn main() -> Result<()> { Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cli_parses_resume_command() { + let cli = Cli::try_parse_from(["ecc", "resume", "deadbeef"]) + .expect("resume subcommand should parse"); + + match cli.command { + Some(Commands::Resume { session_id }) => assert_eq!(session_id, "deadbeef"), + _ => panic!("expected resume subcommand"), + } + } +} diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index 08969ca2..d9da8f0e 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -10,6 +10,7 @@ use crate::config::Config; /// and cleans up stale resources. pub async fn run(db: StateStore, cfg: Config) -> Result<()> { tracing::info!("ECC daemon started"); + resume_crashed_sessions(&db)?; let heartbeat_interval = Duration::from_secs(cfg.heartbeat_interval_secs); let timeout = Duration::from_secs(cfg.session_timeout_secs); @@ -23,6 +24,43 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> { } } +pub fn resume_crashed_sessions(db: &StateStore) -> Result<()> { + let failed_sessions = resume_crashed_sessions_with(db, pid_is_alive)?; + if failed_sessions > 0 { + tracing::warn!("Marked {failed_sessions} crashed sessions as failed during daemon startup"); + } + Ok(()) +} + +fn resume_crashed_sessions_with(db: &StateStore, is_pid_alive: F) -> Result +where + F: Fn(u32) -> bool, +{ + let sessions = db.list_sessions()?; + let mut failed_sessions = 0; + + for session in sessions { + if session.state != SessionState::Running { + continue; + } + + let is_alive = session.pid.is_some_and(&is_pid_alive); + if is_alive { + continue; + } + + tracing::warn!( + "Session {} was left running with stale pid {:?}; marking it failed", + session.id, + session.pid + ); + db.update_state_and_pid(&session.id, &SessionState::Failed, None)?; + failed_sessions += 1; + } + + Ok(failed_sessions) +} + fn check_sessions(db: &StateStore, timeout: Duration) -> Result<()> { let sessions = db.list_sessions()?; @@ -38,9 +76,102 @@ fn check_sessions(db: &StateStore, timeout: Duration) -> Result<()> { if elapsed > timeout { tracing::warn!("Session {} timed out after {:?}", session.id, elapsed); - db.update_state(&session.id, &SessionState::Failed)?; + db.update_state_and_pid(&session.id, &SessionState::Failed, None)?; } } Ok(()) } + +#[cfg(unix)] +fn pid_is_alive(pid: u32) -> bool { + if pid == 0 { + return false; + } + + // SAFETY: kill(pid, 0) probes process existence without delivering a signal. + let result = unsafe { libc::kill(pid as libc::pid_t, 0) }; + if result == 0 { + return true; + } + + matches!( + std::io::Error::last_os_error().raw_os_error(), + Some(code) if code == libc::EPERM + ) +} + +#[cfg(not(unix))] +fn pid_is_alive(_pid: u32) -> bool { + false +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::session::{Session, SessionMetrics, SessionState}; + use std::path::PathBuf; + + fn temp_db_path() -> PathBuf { + std::env::temp_dir().join(format!("ecc2-daemon-test-{}.db", uuid::Uuid::new_v4())) + } + + fn sample_session(id: &str, state: SessionState, pid: Option) -> Session { + let now = chrono::Utc::now(); + Session { + id: id.to_string(), + task: "Recover crashed worker".to_string(), + agent_type: "claude".to_string(), + state, + pid, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + } + } + + #[test] + fn resume_crashed_sessions_marks_dead_running_sessions_failed() -> Result<()> { + let path = temp_db_path(); + let store = StateStore::open(&path)?; + store.insert_session(&sample_session( + "deadbeef", + SessionState::Running, + Some(4242), + ))?; + + resume_crashed_sessions_with(&store, |_| false)?; + + let session = store + .get_session("deadbeef")? + .expect("session should still exist"); + assert_eq!(session.state, SessionState::Failed); + assert_eq!(session.pid, None); + + let _ = std::fs::remove_file(path); + Ok(()) + } + + #[test] + fn resume_crashed_sessions_keeps_live_running_sessions_running() -> Result<()> { + let path = temp_db_path(); + let store = StateStore::open(&path)?; + store.insert_session(&sample_session( + "alive123", + SessionState::Running, + Some(7777), + ))?; + + resume_crashed_sessions_with(&store, |_| true)?; + + let session = store + .get_session("alive123")? + .expect("session should still exist"); + assert_eq!(session.state, SessionState::Running); + assert_eq!(session.pid, Some(7777)); + + let _ = std::fs::remove_file(path); + Ok(()) + } +} diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 7d00d7fc..5070e2c2 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -36,6 +36,21 @@ pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> { stop_session_with_options(db, id, true).await } +pub async fn resume_session(db: &StateStore, id: &str) -> Result { + let session = resolve_session(db, id)?; + + if session.state == SessionState::Completed { + anyhow::bail!("Completed sessions cannot be resumed: {}", session.id); + } + + if session.state == SessionState::Running { + anyhow::bail!("Session is already running: {}", session.id); + } + + db.update_state_and_pid(&session.id, &SessionState::Pending, None)?; + Ok(session.id) +} + fn agent_program(agent_type: &str) -> Result { match agent_type { "claude" => Ok(PathBuf::from("claude")), @@ -575,6 +590,36 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "current_thread")] + async fn resume_session_requeues_failed_session() -> Result<()> { + let tempdir = TestDir::new("manager-resume-session")?; + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "deadbeef".to_string(), + task: "resume previous task".to_string(), + agent_type: "claude".to_string(), + state: SessionState::Failed, + pid: Some(31337), + worktree: None, + created_at: now - Duration::minutes(1), + updated_at: now, + metrics: SessionMetrics::default(), + })?; + + let resumed_id = resume_session(&db, "deadbeef").await?; + let resumed = db + .get_session(&resumed_id)? + .context("resumed session should exist")?; + + assert_eq!(resumed.state, SessionState::Pending); + assert_eq!(resumed.pid, None); + + Ok(()) + } + #[test] fn get_status_supports_latest_alias() -> Result<()> { let tempdir = TestDir::new("manager-latest-status")?; diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 1a313100..bb6af18e 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -124,6 +124,29 @@ impl StateStore { Ok(()) } + pub fn update_state_and_pid( + &self, + session_id: &str, + state: &SessionState, + pid: Option, + ) -> Result<()> { + let updated = self.conn.execute( + "UPDATE sessions SET state = ?1, pid = ?2, updated_at = ?3 WHERE id = ?4", + rusqlite::params![ + state.to_string(), + pid.map(i64::from), + chrono::Utc::now().to_rfc3339(), + session_id, + ], + )?; + + if updated == 0 { + anyhow::bail!("Session not found: {session_id}"); + } + + Ok(()) + } + pub fn update_state(&self, session_id: &str, state: &SessionState) -> Result<()> { let current_state = self .conn diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index 17efafc3..8d1b4e44 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -422,7 +422,10 @@ impl Dashboard { pub fn stop_selected(&mut self) { if let Some(session) = self.sessions.get(self.selected_session) { - if let Err(error) = self.db.update_state(&session.id, &SessionState::Stopped) { + if let Err(error) = + self.db + .update_state_and_pid(&session.id, &SessionState::Stopped, None) + { tracing::warn!("Failed to stop session {}: {error}", session.id); return; }