From 08e9d0e28b3ba23267fd1f8938e3e6c5123ef9cf Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 03:14:20 -0700 Subject: [PATCH] feat: surface ecc2 daemon recovery pressure --- ecc2/src/session/daemon.rs | 64 +++++++++++++++++++++++--- ecc2/src/session/store.rs | 94 ++++++++++++++++++++++++++++++++++---- ecc2/src/tui/dashboard.rs | 21 ++++++++- 3 files changed, 162 insertions(+), 17 deletions(-) diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index b58c3932..089d40c9 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -105,7 +105,7 @@ async fn maybe_auto_dispatch(db: &StateStore, cfg: &Config) -> Result { true, cfg.max_parallel_sessions, ) - }, |routed, leads| db.record_daemon_dispatch_pass(routed, leads)) + }, |routed, deferred, leads| db.record_daemon_dispatch_pass(routed, deferred, leads)) .await?; Ok(summary.routed) } @@ -122,7 +122,7 @@ async fn coordinate_backlog_cycle(db: &StateStore, cfg: &Config) -> Result<()> { true, cfg.max_parallel_sessions, ) - }, |routed, leads| db.record_daemon_dispatch_pass(routed, leads)) + }, |routed, deferred, leads| db.record_daemon_dispatch_pass(routed, deferred, leads)) }, || { maybe_auto_rebalance_with_recorder(cfg, || { @@ -135,27 +135,31 @@ async fn coordinate_backlog_cycle(db: &StateStore, cfg: &Config) -> Result<()> { ) }, |rerouted, leads| db.record_daemon_rebalance_pass(rerouted, leads)) }, + |routed, leads| db.record_daemon_recovery_dispatch_pass(routed, leads), ) .await?; Ok(()) } -async fn coordinate_backlog_cycle_with( +async fn coordinate_backlog_cycle_with( _cfg: &Config, dispatch: DF, rebalance: RF, + mut record_recovery: Rec, ) -> Result<(DispatchPassSummary, usize, DispatchPassSummary)> where DF: Fn() -> DFut, DFut: Future>, RF: Fn() -> RFut, RFut: Future>, + Rec: FnMut(usize, usize) -> Result<()>, { let first_dispatch = dispatch().await?; let rebalanced = rebalance().await?; let recovery_dispatch = if first_dispatch.deferred > 0 && rebalanced > 0 { let recovery = dispatch().await?; if recovery.routed > 0 { + record_recovery(recovery.routed, recovery.leads)?; tracing::info!( "Recovered {} deferred task handoff(s) after rebalancing", recovery.routed @@ -174,7 +178,7 @@ where F: Fn() -> Fut, Fut: Future>>, { - Ok(maybe_auto_dispatch_with_recorder(cfg, dispatch, |_, _| Ok(())).await?.routed) + Ok(maybe_auto_dispatch_with_recorder(cfg, dispatch, |_, _, _| Ok(())).await?.routed) } async fn maybe_auto_dispatch_with_recorder( @@ -185,7 +189,7 @@ async fn maybe_auto_dispatch_with_recorder( where F: Fn() -> Fut, Fut: Future>>, - R: FnMut(usize, usize) -> Result<()>, + R: FnMut(usize, usize, usize) -> Result<()>, { if !cfg.auto_dispatch_unread_handoffs { return Ok(DispatchPassSummary::default()); @@ -213,7 +217,7 @@ where }) .sum(); let leads = outcomes.len(); - record(routed, leads)?; + record(routed, deferred, leads)?; if routed > 0 { tracing::info!( @@ -480,7 +484,7 @@ mod tests { ], }]) }, - move |count, leads| { + move |count, _deferred, leads| { *recorded_clone.lock().unwrap() = Some((count, leads)); Ok(()) }, @@ -524,6 +528,7 @@ mod tests { } }, || async move { Ok(1) }, + |_, _| Ok(()), ) .await?; @@ -557,6 +562,7 @@ mod tests { } }, || async move { Ok(0) }, + |_, _| Ok(()), ) .await?; @@ -567,6 +573,50 @@ mod tests { Ok(()) } + #[tokio::test] + async fn coordinate_backlog_cycle_records_recovery_dispatch_when_it_routes_work() -> Result<()> { + let cfg = Config { + auto_dispatch_unread_handoffs: true, + ..Config::default() + }; + let recorded = std::sync::Arc::new(std::sync::Mutex::new(None)); + let recorded_clone = recorded.clone(); + let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let calls_clone = calls.clone(); + + let (_first, _rebalanced, recovery) = coordinate_backlog_cycle_with( + &cfg, + move || { + let calls_clone = calls_clone.clone(); + async move { + let call = calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(match call { + 0 => DispatchPassSummary { + routed: 0, + deferred: 1, + leads: 1, + }, + _ => DispatchPassSummary { + routed: 2, + deferred: 0, + leads: 1, + }, + }) + } + }, + || async move { Ok(1) }, + move |routed, leads| { + *recorded_clone.lock().unwrap() = Some((routed, leads)); + Ok(()) + }, + ) + .await?; + + assert_eq!(recovery.routed, 2); + assert_eq!(*recorded.lock().unwrap(), Some((2, 1))); + Ok(()) + } + #[tokio::test] async fn maybe_auto_rebalance_noops_when_disabled() -> Result<()> { let path = temp_db_path(); diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 8f80e976..57b182e5 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -17,7 +17,11 @@ pub struct StateStore { pub struct DaemonActivity { pub last_dispatch_at: Option>, pub last_dispatch_routed: usize, + pub last_dispatch_deferred: usize, pub last_dispatch_leads: usize, + pub last_recovery_dispatch_at: Option>, + pub last_recovery_dispatch_routed: usize, + pub last_recovery_dispatch_leads: usize, pub last_rebalance_at: Option>, pub last_rebalance_rerouted: usize, pub last_rebalance_leads: usize, @@ -88,7 +92,11 @@ impl StateStore { id INTEGER PRIMARY KEY CHECK(id = 1), last_dispatch_at TEXT, last_dispatch_routed INTEGER NOT NULL DEFAULT 0, + last_dispatch_deferred INTEGER NOT NULL DEFAULT 0, last_dispatch_leads INTEGER NOT NULL DEFAULT 0, + last_recovery_dispatch_at TEXT, + last_recovery_dispatch_routed INTEGER NOT NULL DEFAULT 0, + last_recovery_dispatch_leads INTEGER NOT NULL DEFAULT 0, last_rebalance_at TEXT, last_rebalance_rerouted INTEGER NOT NULL DEFAULT 0, last_rebalance_leads INTEGER NOT NULL DEFAULT 0 @@ -123,6 +131,42 @@ impl StateStore { .context("Failed to add pid column to sessions table")?; } + if !self.has_column("daemon_activity", "last_dispatch_deferred")? { + self.conn + .execute( + "ALTER TABLE daemon_activity ADD COLUMN last_dispatch_deferred INTEGER NOT NULL DEFAULT 0", + [], + ) + .context("Failed to add last_dispatch_deferred column to daemon_activity table")?; + } + + if !self.has_column("daemon_activity", "last_recovery_dispatch_at")? { + self.conn + .execute( + "ALTER TABLE daemon_activity ADD COLUMN last_recovery_dispatch_at TEXT", + [], + ) + .context("Failed to add last_recovery_dispatch_at column to daemon_activity table")?; + } + + if !self.has_column("daemon_activity", "last_recovery_dispatch_routed")? { + self.conn + .execute( + "ALTER TABLE daemon_activity ADD COLUMN last_recovery_dispatch_routed INTEGER NOT NULL DEFAULT 0", + [], + ) + .context("Failed to add last_recovery_dispatch_routed column to daemon_activity table")?; + } + + if !self.has_column("daemon_activity", "last_recovery_dispatch_leads")? { + self.conn + .execute( + "ALTER TABLE daemon_activity ADD COLUMN last_recovery_dispatch_leads INTEGER NOT NULL DEFAULT 0", + [], + ) + .context("Failed to add last_recovery_dispatch_leads column to daemon_activity table")?; + } + Ok(()) } @@ -513,7 +557,8 @@ impl StateStore { pub fn daemon_activity(&self) -> Result { self.conn .query_row( - "SELECT last_dispatch_at, last_dispatch_routed, last_dispatch_leads, + "SELECT last_dispatch_at, last_dispatch_routed, last_dispatch_deferred, last_dispatch_leads, + last_recovery_dispatch_at, last_recovery_dispatch_routed, last_recovery_dispatch_leads, last_rebalance_at, last_rebalance_rerouted, last_rebalance_leads FROM daemon_activity WHERE id = 1", @@ -539,22 +584,50 @@ impl StateStore { Ok(DaemonActivity { last_dispatch_at: parse_ts(row.get(0)?)?, last_dispatch_routed: row.get::<_, i64>(1)? as usize, - last_dispatch_leads: row.get::<_, i64>(2)? as usize, - last_rebalance_at: parse_ts(row.get(3)?)?, - last_rebalance_rerouted: row.get::<_, i64>(4)? as usize, - last_rebalance_leads: row.get::<_, i64>(5)? as usize, + last_dispatch_deferred: row.get::<_, i64>(2)? as usize, + last_dispatch_leads: row.get::<_, i64>(3)? as usize, + last_recovery_dispatch_at: parse_ts(row.get(4)?)?, + last_recovery_dispatch_routed: row.get::<_, i64>(5)? as usize, + last_recovery_dispatch_leads: row.get::<_, i64>(6)? as usize, + last_rebalance_at: parse_ts(row.get(7)?)?, + last_rebalance_rerouted: row.get::<_, i64>(8)? as usize, + last_rebalance_leads: row.get::<_, i64>(9)? as usize, }) }, ) .map_err(Into::into) } - pub fn record_daemon_dispatch_pass(&self, routed: usize, leads: usize) -> Result<()> { + pub fn record_daemon_dispatch_pass( + &self, + routed: usize, + deferred: usize, + leads: usize, + ) -> Result<()> { self.conn.execute( "UPDATE daemon_activity SET last_dispatch_at = ?1, last_dispatch_routed = ?2, - last_dispatch_leads = ?3 + last_dispatch_deferred = ?3, + last_dispatch_leads = ?4 + WHERE id = 1", + rusqlite::params![ + chrono::Utc::now().to_rfc3339(), + routed as i64, + deferred as i64, + leads as i64 + ], + )?; + + Ok(()) + } + + pub fn record_daemon_recovery_dispatch_pass(&self, routed: usize, leads: usize) -> Result<()> { + self.conn.execute( + "UPDATE daemon_activity + SET last_recovery_dispatch_at = ?1, + last_recovery_dispatch_routed = ?2, + last_recovery_dispatch_leads = ?3 WHERE id = 1", rusqlite::params![chrono::Utc::now().to_rfc3339(), routed as i64, leads as i64], )?; @@ -948,15 +1021,20 @@ mod tests { let tempdir = TestDir::new("store-daemon-activity")?; let db = StateStore::open(&tempdir.path().join("state.db"))?; - db.record_daemon_dispatch_pass(4, 2)?; + db.record_daemon_dispatch_pass(4, 1, 2)?; + db.record_daemon_recovery_dispatch_pass(2, 1)?; db.record_daemon_rebalance_pass(3, 1)?; let activity = db.daemon_activity()?; assert_eq!(activity.last_dispatch_routed, 4); + assert_eq!(activity.last_dispatch_deferred, 1); assert_eq!(activity.last_dispatch_leads, 2); + assert_eq!(activity.last_recovery_dispatch_routed, 2); + assert_eq!(activity.last_recovery_dispatch_leads, 1); assert_eq!(activity.last_rebalance_rerouted, 3); assert_eq!(activity.last_rebalance_leads, 1); assert!(activity.last_dispatch_at.is_some()); + assert!(activity.last_recovery_dispatch_at.is_some()); assert!(activity.last_rebalance_at.is_some()); Ok(()) diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index 5cf95725..e7b9d327 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -1489,13 +1489,25 @@ impl Dashboard { if let Some(last_dispatch_at) = self.daemon_activity.last_dispatch_at.as_ref() { lines.push(format!( - "Last daemon dispatch {} handoff(s) across {} lead(s) @ {}", + "Last daemon dispatch {} routed / {} deferred across {} lead(s) @ {}", self.daemon_activity.last_dispatch_routed, + self.daemon_activity.last_dispatch_deferred, self.daemon_activity.last_dispatch_leads, self.short_timestamp(&last_dispatch_at.to_rfc3339()) )); } + if let Some(last_recovery_dispatch_at) = + self.daemon_activity.last_recovery_dispatch_at.as_ref() + { + lines.push(format!( + "Last daemon recovery dispatch {} handoff(s) across {} lead(s) @ {}", + self.daemon_activity.last_recovery_dispatch_routed, + self.daemon_activity.last_recovery_dispatch_leads, + self.short_timestamp(&last_recovery_dispatch_at.to_rfc3339()) + )); + } + if let Some(last_rebalance_at) = self.daemon_activity.last_rebalance_at.as_ref() { lines.push(format!( "Last daemon rebalance {} handoff(s) across {} lead(s) @ {}", @@ -2121,14 +2133,19 @@ mod tests { dashboard.daemon_activity = DaemonActivity { last_dispatch_at: Some(Utc::now()), last_dispatch_routed: 4, + last_dispatch_deferred: 2, last_dispatch_leads: 2, + last_recovery_dispatch_at: Some(Utc::now()), + last_recovery_dispatch_routed: 1, + last_recovery_dispatch_leads: 1, last_rebalance_at: Some(Utc::now()), last_rebalance_rerouted: 1, last_rebalance_leads: 1, }; let text = dashboard.selected_session_metrics_text(); - assert!(text.contains("Last daemon dispatch 4 handoff(s) across 2 lead(s)")); + assert!(text.contains("Last daemon dispatch 4 routed / 2 deferred across 2 lead(s)")); + assert!(text.contains("Last daemon recovery dispatch 1 handoff(s) across 1 lead(s)")); assert!(text.contains("Last daemon rebalance 1 handoff(s) across 1 lead(s)")); }