feat(ecc2): add session heartbeat stale detection

This commit is contained in:
Affaan Mustafa
2026-04-09 07:20:40 -07:00
parent 48fd68115e
commit 24a3ffa234
9 changed files with 520 additions and 47 deletions

View File

@@ -37,6 +37,7 @@ pub struct Config {
pub max_parallel_worktrees: usize,
pub session_timeout_secs: u64,
pub heartbeat_interval_secs: u64,
pub auto_terminate_stale_sessions: bool,
pub default_agent: String,
pub auto_dispatch_unread_handoffs: bool,
pub auto_dispatch_limit_per_session: usize,
@@ -91,6 +92,7 @@ impl Default for Config {
max_parallel_worktrees: 6,
session_timeout_secs: 3600,
heartbeat_interval_secs: 30,
auto_terminate_stale_sessions: false,
default_agent: "claude".to_string(),
auto_dispatch_unread_handoffs: false,
auto_dispatch_limit_per_session: 5,
@@ -340,6 +342,7 @@ max_parallel_sessions = 8
max_parallel_worktrees = 6
session_timeout_secs = 3600
heartbeat_interval_secs = 30
auto_terminate_stale_sessions = false
default_agent = "claude"
theme = "Dark"
"#;
@@ -377,6 +380,10 @@ theme = "Dark"
config.auto_merge_ready_worktrees,
defaults.auto_merge_ready_worktrees
);
assert_eq!(
config.auto_terminate_stale_sessions,
defaults.auto_terminate_stale_sessions
);
}
#[test]

View File

@@ -900,6 +900,7 @@ fn sync_runtime_session_metrics(
db.refresh_session_durations()?;
db.sync_cost_tracker_metrics(&cfg.cost_metrics_path())?;
db.sync_tool_activity_metrics(&cfg.tool_activity_metrics_path())?;
let _ = session::manager::enforce_session_heartbeats(db, cfg)?;
let _ = session::manager::enforce_budget_hard_limits(db, cfg)?;
Ok(())
}

View File

@@ -313,6 +313,7 @@ mod tests {
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
}
}

View File

@@ -22,10 +22,8 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> {
resume_crashed_sessions(&db)?;
let heartbeat_interval = Duration::from_secs(cfg.heartbeat_interval_secs);
let timeout = Duration::from_secs(cfg.session_timeout_secs);
loop {
if let Err(e) = check_sessions(&db, timeout) {
if let Err(e) = check_sessions(&db, &cfg) {
tracing::error!("Session check failed: {e}");
}
@@ -82,25 +80,8 @@ where
Ok(failed_sessions)
}
fn check_sessions(db: &StateStore, timeout: Duration) -> Result<()> {
let sessions = db.list_sessions()?;
for session in sessions {
if session.state != SessionState::Running {
continue;
}
let elapsed = chrono::Utc::now()
.signed_duration_since(session.updated_at)
.to_std()
.unwrap_or(Duration::ZERO);
if elapsed > timeout {
tracing::warn!("Session {} timed out after {:?}", session.id, elapsed);
db.update_state_and_pid(&session.id, &SessionState::Failed, None)?;
}
}
fn check_sessions(db: &StateStore, cfg: &Config) -> Result<()> {
let _ = manager::enforce_session_heartbeats(db, cfg)?;
Ok(())
}
@@ -498,6 +479,7 @@ mod tests {
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
}
}

View File

@@ -68,6 +68,58 @@ pub fn get_team_status(db: &StateStore, id: &str, depth: usize) -> Result<TeamSt
})
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct HeartbeatEnforcementOutcome {
pub stale_sessions: Vec<String>,
pub auto_terminated_sessions: Vec<String>,
}
pub fn enforce_session_heartbeats(
db: &StateStore,
cfg: &Config,
) -> Result<HeartbeatEnforcementOutcome> {
enforce_session_heartbeats_with(db, cfg, kill_process)
}
fn enforce_session_heartbeats_with<F>(
db: &StateStore,
cfg: &Config,
terminate_pid: F,
) -> Result<HeartbeatEnforcementOutcome>
where
F: Fn(u32) -> Result<()>,
{
let timeout = chrono::Duration::seconds(cfg.session_timeout_secs as i64);
let now = chrono::Utc::now();
let mut outcome = HeartbeatEnforcementOutcome::default();
for session in db.list_sessions()? {
if !matches!(session.state, SessionState::Running | SessionState::Stale) {
continue;
}
if now.signed_duration_since(session.last_heartbeat_at) <= timeout {
continue;
}
if cfg.auto_terminate_stale_sessions {
if let Some(pid) = session.pid {
let _ = terminate_pid(pid);
}
db.update_state_and_pid(&session.id, &SessionState::Failed, None)?;
outcome.auto_terminated_sessions.push(session.id);
continue;
}
if session.state != SessionState::Stale {
db.update_state(&session.id, &SessionState::Stale)?;
outcome.stale_sessions.push(session.id);
}
}
Ok(outcome)
}
pub async fn assign_session(
db: &StateStore,
cfg: &Config,
@@ -685,7 +737,7 @@ pub async fn merge_session_worktree(
if matches!(
session.state,
SessionState::Pending | SessionState::Running | SessionState::Idle
SessionState::Pending | SessionState::Running | SessionState::Idle | SessionState::Stale
) {
anyhow::bail!(
"Cannot merge active session {} while it is {}",
@@ -747,7 +799,10 @@ pub async fn merge_ready_worktrees(
if matches!(
session.state,
SessionState::Pending | SessionState::Running | SessionState::Idle
SessionState::Pending
| SessionState::Running
| SessionState::Idle
| SessionState::Stale
) {
active_with_worktree_ids.push(session.id);
continue;
@@ -902,6 +957,7 @@ pub async fn run_session(
session_id.to_string(),
command,
SessionOutputStore::default(),
std::time::Duration::from_secs(cfg.heartbeat_interval_secs),
)
.await?;
Ok(())
@@ -997,6 +1053,7 @@ fn build_session_record(
worktree,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})
}
@@ -1488,6 +1545,15 @@ impl fmt::Display for SessionStatus {
writeln!(f, "Tools: {}", s.metrics.tool_calls)?;
writeln!(f, "Files: {}", s.metrics.files_changed)?;
writeln!(f, "Cost: ${:.4}", s.metrics.cost_usd)?;
writeln!(
f,
"Heartbeat: {} ({}s ago)",
s.last_heartbeat_at,
chrono::Utc::now()
.signed_duration_since(s.last_heartbeat_at)
.num_seconds()
.max(0)
)?;
if !self.delegated_children.is_empty() {
writeln!(f, "Children: {}", self.delegated_children.join(", "))?;
}
@@ -1528,6 +1594,7 @@ impl fmt::Display for TeamStatus {
for lane in [
"Running",
"Idle",
"Stale",
"Pending",
"Failed",
"Stopped",
@@ -1676,6 +1743,7 @@ fn session_state_label(state: &SessionState) -> &'static str {
SessionState::Pending => "Pending",
SessionState::Running => "Running",
SessionState::Idle => "Idle",
SessionState::Stale => "Stale",
SessionState::Completed => "Completed",
SessionState::Failed => "Failed",
SessionState::Stopped => "Stopped",
@@ -1727,6 +1795,7 @@ mod tests {
max_parallel_worktrees: 4,
session_timeout_secs: 60,
heartbeat_interval_secs: 5,
auto_terminate_stale_sessions: false,
default_agent: "claude".to_string(),
auto_dispatch_unread_handoffs: false,
auto_dispatch_limit_per_session: 5,
@@ -1755,10 +1824,85 @@ mod tests {
worktree: None,
created_at: updated_at - Duration::minutes(1),
updated_at,
last_heartbeat_at: updated_at,
metrics: SessionMetrics::default(),
}
}
#[test]
fn enforce_session_heartbeats_marks_overdue_running_sessions_stale() -> Result<()> {
let tempdir = TestDir::new("manager-heartbeat-stale")?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "stale-1".to_string(),
task: "heartbeat overdue".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: Some(4242),
worktree: None,
created_at: now - Duration::minutes(5),
updated_at: now - Duration::minutes(5),
last_heartbeat_at: now - Duration::minutes(5),
metrics: SessionMetrics::default(),
})?;
let outcome = enforce_session_heartbeats(&db, &cfg)?;
let session = db.get_session("stale-1")?.expect("session should exist");
assert_eq!(outcome.stale_sessions, vec!["stale-1".to_string()]);
assert!(outcome.auto_terminated_sessions.is_empty());
assert_eq!(session.state, SessionState::Stale);
assert_eq!(session.pid, Some(4242));
Ok(())
}
#[test]
fn enforce_session_heartbeats_auto_terminates_when_enabled() -> Result<()> {
let tempdir = TestDir::new("manager-heartbeat-terminate")?;
let mut cfg = build_config(tempdir.path());
cfg.auto_terminate_stale_sessions = true;
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
let killed = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let killed_clone = killed.clone();
db.insert_session(&Session {
id: "stale-2".to_string(),
task: "terminate overdue".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: Some(7777),
worktree: None,
created_at: now - Duration::minutes(5),
updated_at: now - Duration::minutes(5),
last_heartbeat_at: now - Duration::minutes(5),
metrics: SessionMetrics::default(),
})?;
let outcome = enforce_session_heartbeats_with(&db, &cfg, move |pid| {
killed_clone.lock().unwrap().push(pid);
Ok(())
})?;
let session = db.get_session("stale-2")?.expect("session should exist");
assert!(outcome.stale_sessions.is_empty());
assert_eq!(
outcome.auto_terminated_sessions,
vec!["stale-2".to_string()]
);
assert_eq!(*killed.lock().unwrap(), vec![7777]);
assert_eq!(session.state, SessionState::Failed);
assert_eq!(session.pid, None);
Ok(())
}
fn build_daemon_activity() -> super::super::store::DaemonActivity {
let now = Utc::now();
super::super::store::DaemonActivity {
@@ -1976,6 +2120,7 @@ mod tests {
}),
created_at: now - Duration::minutes(1),
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
db.update_metrics(
@@ -2032,6 +2177,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(1),
last_heartbeat_at: now - Duration::minutes(1),
metrics: SessionMetrics::default(),
})?;
db.update_metrics(
@@ -2076,6 +2222,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(1),
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -2328,6 +2475,7 @@ mod tests {
worktree: Some(merged_worktree.clone()),
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -2343,6 +2491,7 @@ mod tests {
worktree: Some(active_worktree.clone()),
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -2359,6 +2508,7 @@ mod tests {
worktree: Some(dirty_worktree.clone()),
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -2584,6 +2734,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
last_heartbeat_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
@@ -2596,6 +2747,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(1),
updated_at: now - Duration::minutes(1),
last_heartbeat_at: now - Duration::minutes(1),
metrics: SessionMetrics::default(),
})?;
db.send_message(
@@ -2651,6 +2803,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
@@ -2663,6 +2816,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
last_heartbeat_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.send_message(
@@ -2727,6 +2881,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
@@ -2739,6 +2894,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
last_heartbeat_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.send_message(
@@ -2794,6 +2950,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
@@ -2806,6 +2963,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
last_heartbeat_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.send_message(
@@ -2865,6 +3023,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
@@ -2877,6 +3036,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
last_heartbeat_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.send_message(
@@ -2930,6 +3090,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
@@ -2977,6 +3138,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
@@ -2989,6 +3151,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
last_heartbeat_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.send_message(
@@ -3044,6 +3207,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
}
@@ -3103,6 +3267,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
}
@@ -3154,6 +3319,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
@@ -3167,6 +3333,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
last_heartbeat_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
@@ -3222,6 +3389,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(4),
updated_at: now - Duration::minutes(4),
last_heartbeat_at: now - Duration::minutes(4),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
@@ -3234,6 +3402,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
@@ -3246,6 +3415,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
last_heartbeat_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
@@ -3307,6 +3477,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(4),
updated_at: now - Duration::minutes(4),
last_heartbeat_at: now - Duration::minutes(4),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
@@ -3319,6 +3490,7 @@ mod tests {
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;

View File

@@ -20,6 +20,7 @@ pub struct Session {
pub worktree: Option<WorktreeInfo>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub last_heartbeat_at: DateTime<Utc>,
pub metrics: SessionMetrics,
}
@@ -28,6 +29,7 @@ pub enum SessionState {
Pending,
Running,
Idle,
Stale,
Completed,
Failed,
Stopped,
@@ -39,6 +41,7 @@ impl fmt::Display for SessionState {
SessionState::Pending => write!(f, "pending"),
SessionState::Running => write!(f, "running"),
SessionState::Idle => write!(f, "idle"),
SessionState::Stale => write!(f, "stale"),
SessionState::Completed => write!(f, "completed"),
SessionState::Failed => write!(f, "failed"),
SessionState::Stopped => write!(f, "stopped"),
@@ -60,12 +63,21 @@ impl SessionState {
) | (
SessionState::Running,
SessionState::Idle
| SessionState::Stale
| SessionState::Completed
| SessionState::Failed
| SessionState::Stopped
) | (
SessionState::Idle,
SessionState::Running
| SessionState::Stale
| SessionState::Completed
| SessionState::Failed
| SessionState::Stopped
) | (
SessionState::Stale,
SessionState::Running
| SessionState::Idle
| SessionState::Completed
| SessionState::Failed
| SessionState::Stopped
@@ -78,6 +90,7 @@ impl SessionState {
match value {
"running" => SessionState::Running,
"idle" => SessionState::Idle,
"stale" => SessionState::Stale,
"completed" => SessionState::Completed,
"failed" => SessionState::Failed,
"stopped" => SessionState::Stopped,

View File

@@ -5,6 +5,7 @@ use anyhow::{Context, Result};
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tokio::process::Command;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{self, MissedTickBehavior};
use super::output::{OutputStream, SessionOutputStore};
use super::store::StateStore;
@@ -26,6 +27,9 @@ enum DbMessage {
line: String,
ack: oneshot::Sender<DbAck>,
},
TouchHeartbeat {
ack: oneshot::Sender<DbAck>,
},
}
#[derive(Clone)]
@@ -53,6 +57,10 @@ impl DbWriter {
.await
}
async fn touch_heartbeat(&self) -> Result<()> {
self.send(|ack| DbMessage::TouchHeartbeat { ack }).await
}
async fn send<F>(&self, build: F) -> Result<()>
where
F: FnOnce(oneshot::Sender<DbAck>) -> DbMessage,
@@ -111,6 +119,17 @@ fn run_db_writer(db_path: PathBuf, session_id: String, mut rx: mpsc::UnboundedRe
};
let _ = ack.send(result);
}
DbMessage::TouchHeartbeat { ack } => {
let result = match opened.as_ref() {
Some(db) => db
.touch_heartbeat(&session_id)
.map_err(|error| error.to_string()),
None => Err(open_error
.clone()
.unwrap_or_else(|| "Failed to open state store".to_string())),
};
let _ = ack.send(result);
}
}
}
}
@@ -120,6 +139,7 @@ pub async fn capture_command_output(
session_id: String,
mut command: Command,
output_store: SessionOutputStore,
heartbeat_interval: std::time::Duration,
) -> Result<ExitStatus> {
let db_writer = DbWriter::start(db_path, session_id.clone());
@@ -152,6 +172,19 @@ pub async fn capture_command_output(
.ok_or_else(|| anyhow::anyhow!("Spawned process did not expose a process id"))?;
db_writer.update_pid(Some(pid)).await?;
db_writer.update_state(SessionState::Running).await?;
db_writer.touch_heartbeat().await?;
let heartbeat_writer = db_writer.clone();
let heartbeat_task = tokio::spawn(async move {
let mut ticker = time::interval(heartbeat_interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
ticker.tick().await;
if heartbeat_writer.touch_heartbeat().await.is_err() {
break;
}
}
});
let stdout_task = tokio::spawn(capture_stream(
session_id.clone(),
@@ -169,6 +202,8 @@ pub async fn capture_command_output(
));
let status = child.wait().await?;
heartbeat_task.abort();
let _ = heartbeat_task.await;
stdout_task.await??;
stderr_task.await??;
@@ -244,6 +279,7 @@ mod tests {
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -254,9 +290,14 @@ mod tests {
.arg("-c")
.arg("printf 'alpha\\n'; printf 'beta\\n' >&2");
let status =
capture_command_output(db_path.clone(), session_id.clone(), command, output_store)
.await?;
let status = capture_command_output(
db_path.clone(),
session_id.clone(),
command,
output_store,
std::time::Duration::from_millis(10),
)
.await?;
assert!(status.success());
@@ -286,4 +327,49 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn capture_command_output_updates_heartbeat_for_quiet_processes() -> Result<()> {
let db_path = env::temp_dir().join(format!("ecc2-runtime-heartbeat-{}.db", Uuid::new_v4()));
let db = StateStore::open(&db_path)?;
let session_id = "session-heartbeat".to_string();
let now = Utc::now();
db.insert_session(&Session {
id: session_id.clone(),
task: "quiet process".to_string(),
agent_type: "test".to_string(),
working_dir: env::temp_dir(),
state: SessionState::Pending,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
let mut command = Command::new("/bin/sh");
command.arg("-c").arg("sleep 0.05");
let _ = capture_command_output(
db_path.clone(),
session_id.clone(),
command,
SessionOutputStore::default(),
std::time::Duration::from_millis(10),
)
.await?;
let db = StateStore::open(&db_path)?;
let session = db
.get_session(&session_id)?
.expect("session should still exist");
assert!(session.last_heartbeat_at > now);
assert_eq!(session.state, SessionState::Completed);
let _ = std::fs::remove_file(db_path);
Ok(())
}
}

View File

@@ -132,7 +132,8 @@ impl StateStore {
duration_secs INTEGER DEFAULT 0,
cost_usd REAL DEFAULT 0.0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
updated_at TEXT NOT NULL,
last_heartbeat_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS tool_log (
@@ -240,6 +241,20 @@ impl StateStore {
.context("Failed to add output_tokens column to sessions table")?;
}
if !self.has_column("sessions", "last_heartbeat_at")? {
self.conn
.execute("ALTER TABLE sessions ADD COLUMN last_heartbeat_at TEXT", [])
.context("Failed to add last_heartbeat_at column to sessions table")?;
self.conn
.execute(
"UPDATE sessions
SET last_heartbeat_at = updated_at
WHERE last_heartbeat_at IS NULL",
[],
)
.context("Failed to backfill last_heartbeat_at column")?;
}
if !self.has_column("tool_log", "hook_event_id")? {
self.conn
.execute("ALTER TABLE tool_log ADD COLUMN hook_event_id TEXT", [])
@@ -404,8 +419,8 @@ impl StateStore {
pub fn insert_session(&self, session: &Session) -> Result<()> {
self.conn.execute(
"INSERT INTO sessions (id, task, agent_type, working_dir, state, pid, worktree_path, worktree_branch, worktree_base, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
"INSERT INTO sessions (id, task, agent_type, working_dir, state, pid, worktree_path, worktree_branch, worktree_base, created_at, updated_at, last_heartbeat_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
rusqlite::params![
session.id,
session.task,
@@ -421,6 +436,7 @@ impl StateStore {
session.worktree.as_ref().map(|w| w.base_branch.clone()),
session.created_at.to_rfc3339(),
session.updated_at.to_rfc3339(),
session.last_heartbeat_at.to_rfc3339(),
],
)?;
Ok(())
@@ -433,7 +449,12 @@ impl StateStore {
pid: Option<u32>,
) -> Result<()> {
let updated = self.conn.execute(
"UPDATE sessions SET state = ?1, pid = ?2, updated_at = ?3 WHERE id = ?4",
"UPDATE sessions
SET state = ?1,
pid = ?2,
updated_at = ?3,
last_heartbeat_at = ?3
WHERE id = ?4",
rusqlite::params![
state.to_string(),
pid.map(i64::from),
@@ -470,7 +491,11 @@ impl StateStore {
}
let updated = self.conn.execute(
"UPDATE sessions SET state = ?1, updated_at = ?2 WHERE id = ?3",
"UPDATE sessions
SET state = ?1,
updated_at = ?2,
last_heartbeat_at = ?2
WHERE id = ?3",
rusqlite::params![
state.to_string(),
chrono::Utc::now().to_rfc3339(),
@@ -487,7 +512,11 @@ impl StateStore {
pub fn update_pid(&self, session_id: &str, pid: Option<u32>) -> Result<()> {
let updated = self.conn.execute(
"UPDATE sessions SET pid = ?1, updated_at = ?2 WHERE id = ?3",
"UPDATE sessions
SET pid = ?1,
updated_at = ?2,
last_heartbeat_at = ?2
WHERE id = ?3",
rusqlite::params![
pid.map(i64::from),
chrono::Utc::now().to_rfc3339(),
@@ -505,7 +534,11 @@ impl StateStore {
pub fn clear_worktree(&self, session_id: &str) -> Result<()> {
let updated = self.conn.execute(
"UPDATE sessions
SET worktree_path = NULL, worktree_branch = NULL, worktree_base = NULL, updated_at = ?1
SET worktree_path = NULL,
worktree_branch = NULL,
worktree_base = NULL,
updated_at = ?1,
last_heartbeat_at = ?1
WHERE id = ?2",
rusqlite::params![chrono::Utc::now().to_rfc3339(), session_id],
)?;
@@ -571,7 +604,10 @@ impl StateStore {
.unwrap_or_default()
.with_timezone(&chrono::Utc);
let effective_end = match state {
SessionState::Pending | SessionState::Running | SessionState::Idle => now,
SessionState::Pending
| SessionState::Running
| SessionState::Idle
| SessionState::Stale => now,
SessionState::Completed | SessionState::Failed | SessionState::Stopped => {
updated_at
}
@@ -592,6 +628,20 @@ impl StateStore {
Ok(())
}
pub fn touch_heartbeat(&self, session_id: &str) -> Result<()> {
let now = chrono::Utc::now().to_rfc3339();
let updated = self.conn.execute(
"UPDATE sessions SET last_heartbeat_at = ?1 WHERE id = ?2",
rusqlite::params![now, session_id],
)?;
if updated == 0 {
anyhow::bail!("Session not found: {session_id}");
}
Ok(())
}
pub fn sync_cost_tracker_metrics(&self, metrics_path: &Path) -> Result<()> {
if !metrics_path.exists() {
return Ok(());
@@ -786,7 +836,11 @@ impl StateStore {
pub fn increment_tool_calls(&self, session_id: &str) -> Result<()> {
self.conn.execute(
"UPDATE sessions SET tool_calls = tool_calls + 1, updated_at = ?1 WHERE id = ?2",
"UPDATE sessions
SET tool_calls = tool_calls + 1,
updated_at = ?1,
last_heartbeat_at = ?1
WHERE id = ?2",
rusqlite::params![chrono::Utc::now().to_rfc3339(), session_id],
)?;
Ok(())
@@ -796,7 +850,7 @@ impl StateStore {
let mut stmt = self.conn.prepare(
"SELECT id, task, agent_type, working_dir, state, pid, worktree_path, worktree_branch, worktree_base,
input_tokens, output_tokens, tokens_used, tool_calls, files_changed, duration_secs, cost_usd,
created_at, updated_at
created_at, updated_at, last_heartbeat_at
FROM sessions ORDER BY updated_at DESC",
)?;
@@ -814,6 +868,7 @@ impl StateStore {
let created_str: String = row.get(16)?;
let updated_str: String = row.get(17)?;
let heartbeat_str: String = row.get(18)?;
Ok(Session {
id: row.get(0)?,
@@ -829,6 +884,11 @@ impl StateStore {
updated_at: chrono::DateTime::parse_from_rfc3339(&updated_str)
.unwrap_or_default()
.with_timezone(&chrono::Utc),
last_heartbeat_at: chrono::DateTime::parse_from_rfc3339(&heartbeat_str)
.unwrap_or_else(|_| {
chrono::DateTime::parse_from_rfc3339(&updated_str).unwrap_or_default()
})
.with_timezone(&chrono::Utc),
metrics: SessionMetrics {
input_tokens: row.get(9)?,
output_tokens: row.get(10)?,
@@ -1299,7 +1359,10 @@ impl StateStore {
)?;
self.conn.execute(
"UPDATE sessions SET updated_at = ?1 WHERE id = ?2",
"UPDATE sessions
SET updated_at = ?1,
last_heartbeat_at = ?1
WHERE id = ?2",
rusqlite::params![chrono::Utc::now().to_rfc3339(), session_id],
)?;
@@ -1460,6 +1523,7 @@ mod tests {
worktree: None,
created_at: now - ChronoDuration::minutes(1),
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
}
}
@@ -1520,6 +1584,9 @@ mod tests {
assert!(column_names.iter().any(|column| column == "pid"));
assert!(column_names.iter().any(|column| column == "input_tokens"));
assert!(column_names.iter().any(|column| column == "output_tokens"));
assert!(column_names
.iter()
.any(|column| column == "last_heartbeat_at"));
Ok(())
}
@@ -1539,6 +1606,7 @@ mod tests {
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -1583,6 +1651,7 @@ mod tests {
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
@@ -1595,6 +1664,7 @@ mod tests {
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -1648,6 +1718,7 @@ mod tests {
worktree: None,
created_at: now - ChronoDuration::seconds(95),
updated_at: now - ChronoDuration::seconds(1),
last_heartbeat_at: now - ChronoDuration::seconds(1),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
@@ -1660,6 +1731,7 @@ mod tests {
worktree: None,
created_at: now - ChronoDuration::seconds(80),
updated_at: now - ChronoDuration::seconds(5),
last_heartbeat_at: now - ChronoDuration::seconds(5),
metrics: SessionMetrics::default(),
})?;
@@ -1678,6 +1750,36 @@ mod tests {
Ok(())
}
#[test]
fn touch_heartbeat_updates_last_heartbeat_timestamp() -> Result<()> {
let tempdir = TestDir::new("store-touch-heartbeat")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let now = Utc::now() - ChronoDuration::seconds(30);
db.insert_session(&Session {
id: "session-1".to_string(),
task: "heartbeat".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: Some(1234),
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
db.touch_heartbeat("session-1")?;
let session = db
.get_session("session-1")?
.expect("session should still exist");
assert!(session.last_heartbeat_at > now);
Ok(())
}
#[test]
fn append_output_line_keeps_latest_buffer_window() -> Result<()> {
let tempdir = TestDir::new("store-output")?;
@@ -1694,6 +1796,7 @@ mod tests {
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;

View File

@@ -112,6 +112,7 @@ struct SessionSummary {
pending: usize,
running: usize,
idle: usize,
stale: usize,
completed: usize,
failed: usize,
stopped: usize,
@@ -266,6 +267,7 @@ struct TeamSummary {
idle: usize,
running: usize,
pending: usize,
stale: usize,
failed: usize,
stopped: usize,
}
@@ -2753,7 +2755,12 @@ impl Dashboard {
self.sync_from_store();
}
fn sync_runtime_metrics(&mut self) -> Option<manager::BudgetEnforcementOutcome> {
fn sync_runtime_metrics(
&mut self,
) -> (
Option<manager::HeartbeatEnforcementOutcome>,
Option<manager::BudgetEnforcementOutcome>,
) {
if let Err(error) = self.db.refresh_session_durations() {
tracing::warn!("Failed to refresh session durations: {error}");
}
@@ -2780,17 +2787,27 @@ impl Dashboard {
}
}
match manager::enforce_budget_hard_limits(&self.db, &self.cfg) {
let heartbeat_enforcement = match manager::enforce_session_heartbeats(&self.db, &self.cfg) {
Ok(outcome) => Some(outcome),
Err(error) => {
tracing::warn!("Failed to enforce session heartbeats: {error}");
None
}
};
let budget_enforcement = match manager::enforce_budget_hard_limits(&self.db, &self.cfg) {
Ok(outcome) => Some(outcome),
Err(error) => {
tracing::warn!("Failed to enforce budget hard limits: {error}");
None
}
}
};
(heartbeat_enforcement, budget_enforcement)
}
fn sync_from_store(&mut self) {
let budget_enforcement = self.sync_runtime_metrics();
let (heartbeat_enforcement, budget_enforcement) = self.sync_runtime_metrics();
let selected_id = self.selected_session_id().map(ToOwned::to_owned);
self.sessions = match self.db.list_sessions() {
Ok(sessions) => sessions,
@@ -2825,6 +2842,11 @@ impl Dashboard {
{
self.set_operator_note(budget_auto_pause_note(&outcome));
}
if let Some(outcome) = heartbeat_enforcement.filter(|outcome| {
!outcome.stale_sessions.is_empty() || !outcome.auto_terminated_sessions.is_empty()
}) {
self.set_operator_note(heartbeat_enforcement_note(&outcome));
}
}
fn sync_budget_alerts(&mut self) {
@@ -3183,6 +3205,7 @@ impl Dashboard {
SessionState::Pending => team.pending += 1,
SessionState::Failed => team.failed += 1,
SessionState::Stopped => team.stopped += 1,
SessionState::Stale => team.stale += 1,
SessionState::Completed => {}
}
@@ -4239,7 +4262,10 @@ impl Dashboard {
.filter(|session| {
matches!(
session.state,
SessionState::Pending | SessionState::Running | SessionState::Idle
SessionState::Pending
| SessionState::Running
| SessionState::Idle
| SessionState::Stale
)
})
.count()
@@ -4944,6 +4970,7 @@ impl SessionSummary {
SessionState::Pending => summary.pending += 1,
SessionState::Running => summary.running += 1,
SessionState::Idle => summary.idle += 1,
SessionState::Stale => summary.stale += 1,
SessionState::Completed => summary.completed += 1,
SessionState::Failed => summary.failed += 1,
SessionState::Stopped => summary.stopped += 1,
@@ -4968,12 +4995,14 @@ fn session_row(
approval_requests: usize,
unread_messages: usize,
) -> Row<'static> {
let state_label = session_state_label(&session.state);
let state_color = session_state_color(&session.state);
Row::new(vec![
Cell::from(format_session_id(&session.id)),
Cell::from(session.agent_type.clone()),
Cell::from(session_state_label(&session.state)).style(
Cell::from(state_label).style(
Style::default()
.fg(session_state_color(&session.state))
.fg(state_color)
.add_modifier(Modifier::BOLD),
),
Cell::from(session_branch(session)),
@@ -5016,6 +5045,7 @@ fn summary_line(summary: &SessionSummary) -> Line<'static> {
),
summary_span("Running", summary.running, Color::Green),
summary_span("Idle", summary.idle, Color::Yellow),
summary_span("Stale", summary.stale, Color::LightRed),
summary_span("Completed", summary.completed, Color::Blue),
summary_span("Failed", summary.failed, Color::Red),
summary_span("Stopped", summary.stopped, Color::DarkGray),
@@ -5052,6 +5082,7 @@ fn attention_queue_line(summary: &SessionSummary, stabilized: bool) -> Line<'sta
if summary.failed == 0
&& summary.stopped == 0
&& summary.pending == 0
&& summary.stale == 0
&& summary.unread_messages == 0
&& summary.conflicted_worktrees == 0
{
@@ -5086,6 +5117,7 @@ fn attention_queue_line(summary: &SessionSummary, stabilized: bool) -> Line<'sta
}
spans.extend([
summary_span("Stale", summary.stale, Color::LightRed),
summary_span("Backlog", summary.unread_messages, Color::Magenta),
summary_span("Failed", summary.failed, Color::Red),
summary_span("Stopped", summary.stopped, Color::DarkGray),
@@ -5321,6 +5353,7 @@ fn session_state_label(state: &SessionState) -> &'static str {
SessionState::Pending => "Pending",
SessionState::Running => "Running",
SessionState::Idle => "Idle",
SessionState::Stale => "Stale",
SessionState::Completed => "Completed",
SessionState::Failed => "Failed",
SessionState::Stopped => "Stopped",
@@ -5331,6 +5364,7 @@ fn session_state_color(state: &SessionState) -> Color {
match state {
SessionState::Running => Color::Green,
SessionState::Idle => Color::Yellow,
SessionState::Stale => Color::LightRed,
SessionState::Failed => Color::Red,
SessionState::Stopped => Color::DarkGray,
SessionState::Completed => Color::Blue,
@@ -5338,6 +5372,20 @@ fn session_state_color(state: &SessionState) -> Color {
}
}
fn heartbeat_enforcement_note(outcome: &manager::HeartbeatEnforcementOutcome) -> String {
if !outcome.auto_terminated_sessions.is_empty() {
return format!(
"stale heartbeat detected | auto-terminated {} session(s)",
outcome.auto_terminated_sessions.len()
);
}
format!(
"stale heartbeat detected | flagged {} session(s) for attention",
outcome.stale_sessions.len()
)
}
fn budget_auto_pause_note(outcome: &manager::BudgetEnforcementOutcome) -> String {
let cause = match (outcome.token_budget_exceeded, outcome.cost_budget_exceeded) {
(true, true) => "token and cost budgets exceeded",
@@ -5436,6 +5484,7 @@ fn delegate_next_action(delegate: &DelegatedChildSummary) -> &'static str {
SessionState::Pending => "wait for startup",
SessionState::Running => "let it run",
SessionState::Idle => "assign next task",
SessionState::Stale => "inspect stale heartbeat",
SessionState::Failed => "inspect failure",
SessionState::Stopped => "resume or reassign",
SessionState::Completed => "merge or cleanup",
@@ -5449,7 +5498,10 @@ fn delegate_attention_priority(delegate: &DelegatedChildSummary) -> u8 {
if delegate.approval_backlog > 0 {
return 1;
}
if matches!(delegate.state, SessionState::Failed | SessionState::Stopped) {
if matches!(
delegate.state,
SessionState::Stale | SessionState::Failed | SessionState::Stopped
) {
return 2;
}
if delegate.handoff_backlog > 0 {
@@ -5463,7 +5515,7 @@ fn delegate_attention_priority(delegate: &DelegatedChildSummary) -> u8 {
SessionState::Running => 6,
SessionState::Idle => 7,
SessionState::Completed => 8,
SessionState::Failed | SessionState::Stopped => unreachable!(),
SessionState::Stale | SessionState::Failed | SessionState::Stopped => unreachable!(),
}
}
@@ -6160,6 +6212,7 @@ diff --git a/src/next.rs b/src/next.rs
idle: 1,
running: 1,
pending: 1,
stale: 0,
failed: 0,
stopped: 0,
});
@@ -7285,6 +7338,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})
.unwrap();
@@ -7308,6 +7362,39 @@ diff --git a/src/next.rs b/src/next.rs
let _ = fs::remove_dir_all(tempdir);
}
#[test]
fn refresh_flags_stale_sessions_and_sets_operator_note() {
let db = StateStore::open(Path::new(":memory:")).unwrap();
let mut cfg = Config::default();
cfg.session_timeout_secs = 60;
let now = Utc::now();
db.insert_session(&Session {
id: "stale-1".to_string(),
task: "stale session".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: Some(4242),
worktree: None,
created_at: now - Duration::minutes(5),
updated_at: now - Duration::minutes(5),
last_heartbeat_at: now - Duration::minutes(5),
metrics: SessionMetrics::default(),
})
.unwrap();
let mut dashboard = Dashboard::new(db, cfg);
dashboard.refresh();
assert_eq!(dashboard.sessions.len(), 1);
assert_eq!(dashboard.sessions[0].state, SessionState::Stale);
assert_eq!(
dashboard.operator_note.as_deref(),
Some("stale heartbeat detected | flagged 1 session(s) for attention")
);
}
#[test]
fn new_session_task_uses_selected_session_context() {
let dashboard = test_dashboard(
@@ -7445,6 +7532,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -7458,6 +7546,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: None,
created_at: now,
updated_at: now + chrono::Duration::seconds(1),
last_heartbeat_at: now + chrono::Duration::seconds(1),
metrics: SessionMetrics::default(),
})?;
@@ -7487,6 +7576,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -7529,6 +7619,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -8163,6 +8254,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -8200,6 +8292,7 @@ diff --git a/src/next.rs b/src/next.rs
}),
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -8239,6 +8332,7 @@ diff --git a/src/next.rs b/src/next.rs
}),
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -8275,6 +8369,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -8315,6 +8410,7 @@ diff --git a/src/next.rs b/src/next.rs
}),
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
@@ -8331,6 +8427,7 @@ diff --git a/src/next.rs b/src/next.rs
}),
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -8380,6 +8477,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: Some(worktree.clone()),
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -8461,6 +8559,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: Some(merged_worktree.clone()),
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -8476,6 +8575,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: Some(active_worktree.clone()),
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -8519,6 +8619,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -8551,6 +8652,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -8583,6 +8685,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -8615,6 +8718,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -8647,6 +8751,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
@@ -9243,6 +9348,7 @@ diff --git a/src/next.rs b/src/next.rs
max_parallel_worktrees: 4,
session_timeout_secs: 60,
heartbeat_interval_secs: 5,
auto_terminate_stale_sessions: false,
default_agent: "claude".to_string(),
auto_dispatch_unread_handoffs: false,
auto_dispatch_limit_per_session: 5,
@@ -9307,6 +9413,7 @@ diff --git a/src/next.rs b/src/next.rs
}),
created_at: Utc::now(),
updated_at: Utc::now(),
last_heartbeat_at: Utc::now(),
metrics: SessionMetrics {
input_tokens: tokens_used.saturating_mul(3) / 4,
output_tokens: tokens_used / 4,
@@ -9331,6 +9438,7 @@ diff --git a/src/next.rs b/src/next.rs
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics {
input_tokens: tokens_used.saturating_mul(3) / 4,
output_tokens: tokens_used / 4,