From 2709694b7b4b039b50649fbdfe47a9b8d612cfa7 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 02:40:18 -0700 Subject: [PATCH] feat: surface ecc2 daemon activity --- ecc2/src/session/daemon.rs | 168 ++++++++++++++++++++++++++---------- ecc2/src/session/manager.rs | 7 +- ecc2/src/session/store.rs | 106 +++++++++++++++++++++++ ecc2/src/tui/dashboard.rs | 74 ++++++++++++++-- 4 files changed, 304 insertions(+), 51 deletions(-) diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index 099245da..508bc2d9 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -94,34 +94,31 @@ fn check_sessions(db: &StateStore, timeout: Duration) -> Result<()> { } async fn maybe_auto_dispatch(db: &StateStore, cfg: &Config) -> Result { - if !cfg.auto_dispatch_unread_handoffs { - return Ok(0); - } - - let outcomes = manager::auto_dispatch_backlog( - db, - cfg, - &cfg.default_agent, - true, - cfg.max_parallel_sessions, - ) - .await?; - let routed: usize = outcomes.iter().map(|outcome| outcome.routed.len()).sum(); - - if routed > 0 { - tracing::info!( - "Auto-dispatched {routed} task handoff(s) across {} lead session(s)", - outcomes.len() - ); - } - - Ok(routed) + maybe_auto_dispatch_with_recorder(cfg, || { + manager::auto_dispatch_backlog( + db, + cfg, + &cfg.default_agent, + true, + cfg.max_parallel_sessions, + ) + }, |routed, leads| db.record_daemon_dispatch_pass(routed, leads)) + .await } async fn maybe_auto_dispatch_with(cfg: &Config, dispatch: F) -> Result where F: Fn() -> Fut, Fut: Future>>, +{ + maybe_auto_dispatch_with_recorder(cfg, dispatch, |_, _| Ok(())).await +} + +async fn maybe_auto_dispatch_with_recorder(cfg: &Config, dispatch: F, mut record: R) -> Result +where + F: Fn() -> Fut, + Fut: Future>>, + R: FnMut(usize, usize) -> Result<()>, { if !cfg.auto_dispatch_unread_handoffs { return Ok(0); @@ -129,6 +126,7 @@ where let outcomes = dispatch().await?; let routed: usize = outcomes.iter().map(|outcome| outcome.routed.len()).sum(); + record(routed, outcomes.len())?; if routed > 0 { tracing::info!( @@ -141,34 +139,35 @@ where } async fn maybe_auto_rebalance(db: &StateStore, cfg: &Config) -> Result { - if !cfg.auto_dispatch_unread_handoffs { - return Ok(0); - } - - let outcomes = manager::rebalance_all_teams( - db, - cfg, - &cfg.default_agent, - true, - cfg.max_parallel_sessions, - ) - .await?; - let rerouted: usize = outcomes.iter().map(|outcome| outcome.rerouted.len()).sum(); - - if rerouted > 0 { - tracing::info!( - "Auto-rebalanced {rerouted} task handoff(s) across {} lead session(s)", - outcomes.len() - ); - } - - Ok(rerouted) + maybe_auto_rebalance_with_recorder(cfg, || { + manager::rebalance_all_teams( + db, + cfg, + &cfg.default_agent, + true, + cfg.max_parallel_sessions, + ) + }, |rerouted, leads| db.record_daemon_rebalance_pass(rerouted, leads)) + .await } async fn maybe_auto_rebalance_with(cfg: &Config, rebalance: F) -> Result where F: Fn() -> Fut, Fut: Future>>, +{ + maybe_auto_rebalance_with_recorder(cfg, rebalance, |_, _| Ok(())).await +} + +async fn maybe_auto_rebalance_with_recorder( + cfg: &Config, + rebalance: F, + mut record: R, +) -> Result +where + F: Fn() -> Fut, + Fut: Future>>, + R: FnMut(usize, usize) -> Result<()>, { if !cfg.auto_dispatch_unread_handoffs { return Ok(0); @@ -176,6 +175,7 @@ where let outcomes = rebalance().await?; let rerouted: usize = outcomes.iter().map(|outcome| outcome.rerouted.len()).sum(); + record(rerouted, outcomes.len())?; if rerouted > 0 { tracing::info!( @@ -353,6 +353,50 @@ mod tests { Ok(()) } + #[tokio::test] + async fn maybe_auto_dispatch_records_latest_pass() -> Result<()> { + let path = temp_db_path(); + let mut cfg = Config::default(); + cfg.auto_dispatch_unread_handoffs = true; + + let recorded = std::sync::Arc::new(std::sync::Mutex::new(None)); + let recorded_clone = recorded.clone(); + + let routed = maybe_auto_dispatch_with_recorder( + &cfg, + || async move { + Ok(vec![LeadDispatchOutcome { + lead_session_id: "lead-a".to_string(), + unread_count: 3, + routed: vec![ + InboxDrainOutcome { + message_id: 1, + task: "task-a".to_string(), + session_id: "worker-a".to_string(), + action: AssignmentAction::Spawned, + }, + InboxDrainOutcome { + message_id: 2, + task: "task-b".to_string(), + session_id: "worker-b".to_string(), + action: AssignmentAction::Spawned, + }, + ], + }]) + }, + move |count, leads| { + *recorded_clone.lock().unwrap() = Some((count, leads)); + Ok(()) + }, + ) + .await?; + + assert_eq!(routed, 2); + assert_eq!(*recorded.lock().unwrap(), Some((2, 1))); + let _ = std::fs::remove_file(path); + Ok(()) + } + #[tokio::test] async fn maybe_auto_rebalance_noops_when_disabled() -> Result<()> { let path = temp_db_path(); @@ -422,4 +466,40 @@ mod tests { let _ = std::fs::remove_file(path); Ok(()) } + + #[tokio::test] + async fn maybe_auto_rebalance_records_latest_pass() -> Result<()> { + let path = temp_db_path(); + let mut cfg = Config::default(); + cfg.auto_dispatch_unread_handoffs = true; + + let recorded = std::sync::Arc::new(std::sync::Mutex::new(None)); + let recorded_clone = recorded.clone(); + + let rerouted = maybe_auto_rebalance_with_recorder( + &cfg, + || async move { + Ok(vec![LeadRebalanceOutcome { + lead_session_id: "lead-a".to_string(), + rerouted: vec![RebalanceOutcome { + from_session_id: "worker-a".to_string(), + message_id: 7, + task: "task-a".to_string(), + session_id: "worker-b".to_string(), + action: AssignmentAction::ReusedIdle, + }], + }]) + }, + move |count, leads| { + *recorded_clone.lock().unwrap() = Some((count, leads)); + Ok(()) + }, + ) + .await?; + + assert_eq!(rerouted, 1); + assert_eq!(*recorded.lock().unwrap(), Some((1, 1))); + let _ = std::fs::remove_file(path); + Ok(()) + } } diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index af25a027..bde00d61 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -1237,8 +1237,11 @@ mod tests { fn wait_for_file(path: &Path) -> Result { for _ in 0..200 { if path.exists() { - return fs::read_to_string(path) - .with_context(|| format!("failed to read {}", path.display())); + let content = fs::read_to_string(path) + .with_context(|| format!("failed to read {}", path.display()))?; + if content.lines().count() >= 2 { + return Ok(content); + } } thread::sleep(StdDuration::from_millis(20)); diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index d8e187e1..8f80e976 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -13,6 +13,16 @@ pub struct StateStore { conn: Connection, } +#[derive(Debug, Clone, Default)] +pub struct DaemonActivity { + pub last_dispatch_at: Option>, + pub last_dispatch_routed: usize, + pub last_dispatch_leads: usize, + pub last_rebalance_at: Option>, + pub last_rebalance_rerouted: usize, + pub last_rebalance_leads: usize, +} + impl StateStore { pub fn open(path: &Path) -> Result { let conn = Connection::open(path)?; @@ -74,11 +84,23 @@ impl StateStore { timestamp TEXT NOT NULL ); + CREATE TABLE IF NOT EXISTS daemon_activity ( + id INTEGER PRIMARY KEY CHECK(id = 1), + last_dispatch_at TEXT, + last_dispatch_routed INTEGER NOT NULL DEFAULT 0, + last_dispatch_leads INTEGER NOT NULL DEFAULT 0, + last_rebalance_at TEXT, + last_rebalance_rerouted INTEGER NOT NULL DEFAULT 0, + last_rebalance_leads INTEGER NOT NULL DEFAULT 0 + ); + CREATE INDEX IF NOT EXISTS idx_sessions_state ON sessions(state); CREATE INDEX IF NOT EXISTS idx_tool_log_session ON tool_log(session_id); CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_session, read); CREATE INDEX IF NOT EXISTS idx_session_output_session ON session_output(session_id, id); + + INSERT OR IGNORE INTO daemon_activity (id) VALUES (1); ", )?; self.ensure_session_columns()?; @@ -488,6 +510,71 @@ impl StateStore { .map_err(Into::into) } + pub fn daemon_activity(&self) -> Result { + self.conn + .query_row( + "SELECT last_dispatch_at, last_dispatch_routed, last_dispatch_leads, + last_rebalance_at, last_rebalance_rerouted, last_rebalance_leads + FROM daemon_activity + WHERE id = 1", + [], + |row| { + let parse_ts = + |value: Option| -> rusqlite::Result>> { + value + .map(|raw| { + chrono::DateTime::parse_from_rfc3339(&raw) + .map(|ts| ts.with_timezone(&chrono::Utc)) + .map_err(|err| { + rusqlite::Error::FromSqlConversionFailure( + 0, + rusqlite::types::Type::Text, + Box::new(err), + ) + }) + }) + .transpose() + }; + + Ok(DaemonActivity { + last_dispatch_at: parse_ts(row.get(0)?)?, + last_dispatch_routed: row.get::<_, i64>(1)? as usize, + last_dispatch_leads: row.get::<_, i64>(2)? as usize, + last_rebalance_at: parse_ts(row.get(3)?)?, + last_rebalance_rerouted: row.get::<_, i64>(4)? as usize, + last_rebalance_leads: row.get::<_, i64>(5)? as usize, + }) + }, + ) + .map_err(Into::into) + } + + pub fn record_daemon_dispatch_pass(&self, routed: usize, leads: usize) -> Result<()> { + self.conn.execute( + "UPDATE daemon_activity + SET last_dispatch_at = ?1, + last_dispatch_routed = ?2, + last_dispatch_leads = ?3 + WHERE id = 1", + rusqlite::params![chrono::Utc::now().to_rfc3339(), routed as i64, leads as i64], + )?; + + Ok(()) + } + + pub fn record_daemon_rebalance_pass(&self, rerouted: usize, leads: usize) -> Result<()> { + self.conn.execute( + "UPDATE daemon_activity + SET last_rebalance_at = ?1, + last_rebalance_rerouted = ?2, + last_rebalance_leads = ?3 + WHERE id = 1", + rusqlite::params![chrono::Utc::now().to_rfc3339(), rerouted as i64, leads as i64], + )?; + + Ok(()) + } + pub fn delegated_children(&self, session_id: &str, limit: usize) -> Result> { let mut stmt = self.conn.prepare( "SELECT to_session @@ -855,4 +942,23 @@ mod tests { Ok(()) } + + #[test] + fn daemon_activity_round_trips_latest_passes() -> Result<()> { + let tempdir = TestDir::new("store-daemon-activity")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + + db.record_daemon_dispatch_pass(4, 2)?; + db.record_daemon_rebalance_pass(3, 1)?; + + let activity = db.daemon_activity()?; + assert_eq!(activity.last_dispatch_routed, 4); + assert_eq!(activity.last_dispatch_leads, 2); + assert_eq!(activity.last_rebalance_rerouted, 3); + assert_eq!(activity.last_rebalance_leads, 1); + assert!(activity.last_dispatch_at.is_some()); + assert!(activity.last_rebalance_at.is_some()); + + Ok(()) + } } diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index 02aec98d..ae9bdf6e 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -1,6 +1,4 @@ use std::collections::HashMap; -use std::path::Path; - use ratatui::{ prelude::*, widgets::{ @@ -13,12 +11,19 @@ use super::widgets::{budget_state, format_currency, format_token_count, BudgetSt use crate::comms; use crate::config::{Config, PaneLayout}; use crate::observability::ToolLogEntry; -use crate::session::output::{OutputEvent, OutputLine, SessionOutputStore, OutputStream, OUTPUT_BUFFER_LIMIT}; +use crate::session::output::{OutputEvent, OutputLine, SessionOutputStore, OUTPUT_BUFFER_LIMIT}; use crate::session::manager; -use crate::session::store::StateStore; -use crate::session::{Session, SessionMessage, SessionMetrics, SessionState, WorktreeInfo}; +use crate::session::store::{DaemonActivity, StateStore}; +use crate::session::{Session, SessionMessage, SessionState}; use crate::worktree; +#[cfg(test)] +use std::path::Path; +#[cfg(test)] +use crate::session::output::OutputStream; +#[cfg(test)] +use crate::session::{SessionMetrics, WorktreeInfo}; + const DEFAULT_PANE_SIZE_PERCENT: u16 = 35; const DEFAULT_GRID_SIZE_PERCENT: u16 = 50; const OUTPUT_PANE_PERCENT: u16 = 70; @@ -37,6 +42,7 @@ pub struct Dashboard { unread_message_counts: HashMap, global_handoff_backlog_leads: usize, global_handoff_backlog_messages: usize, + daemon_activity: DaemonActivity, selected_messages: Vec, selected_parent_session: Option, selected_child_sessions: Vec, @@ -137,6 +143,7 @@ impl Dashboard { unread_message_counts: HashMap::new(), global_handoff_backlog_leads: 0, global_handoff_backlog_messages: 0, + daemon_activity: DaemonActivity::default(), selected_messages: Vec::new(), selected_parent_session: None, selected_child_sessions: Vec::new(), @@ -988,6 +995,7 @@ impl Dashboard { } }; self.sync_global_handoff_backlog(); + self.sync_daemon_activity(); self.sync_selection_by_id(selected_id.as_deref()); self.ensure_selected_pane_visible(); self.sync_selected_output(); @@ -1038,6 +1046,16 @@ impl Dashboard { } } + fn sync_daemon_activity(&mut self) { + self.daemon_activity = match self.db.daemon_activity() { + Ok(activity) => activity, + Err(error) => { + tracing::warn!("Failed to refresh daemon activity: {error}"); + DaemonActivity::default() + } + }; + } + fn sync_selected_output(&mut self) { let Some(session_id) = self.selected_session_id().map(ToOwned::to_owned) else { self.output_scroll_offset = 0; @@ -1333,6 +1351,24 @@ impl Dashboard { self.cfg.auto_dispatch_limit_per_session )); + if let Some(last_dispatch_at) = self.daemon_activity.last_dispatch_at.as_ref() { + lines.push(format!( + "Last daemon dispatch {} handoff(s) across {} lead(s) @ {}", + self.daemon_activity.last_dispatch_routed, + self.daemon_activity.last_dispatch_leads, + self.short_timestamp(&last_dispatch_at.to_rfc3339()) + )); + } + + if let Some(last_rebalance_at) = self.daemon_activity.last_rebalance_at.as_ref() { + lines.push(format!( + "Last daemon rebalance {} handoff(s) across {} lead(s) @ {}", + self.daemon_activity.last_rebalance_rerouted, + self.daemon_activity.last_rebalance_leads, + self.short_timestamp(&last_rebalance_at.to_rfc3339()) + )); + } + if let Some(route_preview) = self.selected_route_preview.as_ref() { lines.push(format!("Next route {route_preview}")); } @@ -1932,6 +1968,33 @@ mod tests { assert!(text.contains("Next route reuse idle worker-1")); } + #[test] + fn selected_session_metrics_text_includes_daemon_activity() { + let mut dashboard = test_dashboard( + vec![sample_session( + "focus-12345678", + "planner", + SessionState::Running, + Some("ecc/focus"), + 512, + 42, + )], + 0, + ); + dashboard.daemon_activity = DaemonActivity { + last_dispatch_at: Some(Utc::now()), + last_dispatch_routed: 4, + last_dispatch_leads: 2, + last_rebalance_at: Some(Utc::now()), + last_rebalance_rerouted: 1, + last_rebalance_leads: 1, + }; + + let text = dashboard.selected_session_metrics_text(); + assert!(text.contains("Last daemon dispatch 4 handoff(s) across 2 lead(s)")); + assert!(text.contains("Last daemon rebalance 1 handoff(s) across 1 lead(s)")); + } + #[test] fn aggregate_cost_summary_mentions_total_cost() { let db = StateStore::open(Path::new(":memory:")).unwrap(); @@ -2373,6 +2436,7 @@ mod tests { unread_message_counts: HashMap::new(), global_handoff_backlog_leads: 0, global_handoff_backlog_messages: 0, + daemon_activity: DaemonActivity::default(), selected_messages: Vec::new(), selected_parent_session: None, selected_child_sessions: Vec::new(),