From d4cdeca946bc76acc4bcb8642b9d1d5b8614adc7 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 03:28:21 -0700 Subject: [PATCH] feat: add ecc2 chronic saturation cooloff --- ecc2/src/session/daemon.rs | 54 ++++++++++++++++++++++++++++++++++++++ ecc2/src/session/store.rs | 7 +++++ ecc2/src/tui/dashboard.rs | 36 +++++++++++++++++++++++-- 3 files changed, 95 insertions(+), 2 deletions(-) diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index 388c6162..612945c3 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -159,6 +159,12 @@ where { if prior_activity.prefers_rebalance_first() { let rebalanced = rebalance().await?; + if prior_activity.dispatch_cooloff_active() && rebalanced == 0 { + tracing::warn!( + "Skipping immediate dispatch retry because chronic saturation cooloff is active" + ); + return Ok((DispatchPassSummary::default(), rebalanced, DispatchPassSummary::default())); + } let first_dispatch = dispatch().await?; return Ok((first_dispatch, rebalanced, DispatchPassSummary::default())); } @@ -688,6 +694,54 @@ mod tests { Ok(()) } + #[tokio::test] + async fn coordinate_backlog_cycle_skips_dispatch_during_chronic_cooloff_when_rebalance_does_not_help() -> Result<()> { + let cfg = Config { + auto_dispatch_unread_handoffs: true, + ..Config::default() + }; + let now = chrono::Utc::now(); + let activity = DaemonActivity { + last_dispatch_at: Some(now), + last_dispatch_routed: 0, + last_dispatch_deferred: 3, + last_dispatch_leads: 1, + last_recovery_dispatch_at: None, + last_recovery_dispatch_routed: 0, + last_recovery_dispatch_leads: 0, + last_rebalance_at: Some(now - chrono::Duration::seconds(1)), + last_rebalance_rerouted: 0, + last_rebalance_leads: 1, + }; + let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let calls_clone = calls.clone(); + + let (first, rebalanced, recovery) = coordinate_backlog_cycle_with( + &cfg, + &activity, + move || { + let calls_clone = calls_clone.clone(); + async move { + calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(DispatchPassSummary { + routed: 1, + deferred: 0, + leads: 1, + }) + } + }, + || async move { Ok(0) }, + |_, _| Ok(()), + ) + .await?; + + assert_eq!(first, DispatchPassSummary::default()); + assert_eq!(rebalanced, 0); + assert_eq!(recovery, DispatchPassSummary::default()); + assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 0); + Ok(()) + } + #[tokio::test] async fn maybe_auto_rebalance_noops_when_disabled() -> Result<()> { let path = temp_db_path(); diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index c6a1018a..4249e98b 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -42,6 +42,10 @@ impl DaemonActivity { _ => false, } } + + pub fn dispatch_cooloff_active(&self) -> bool { + self.prefers_rebalance_first() && self.last_dispatch_deferred >= 2 + } } impl StateStore { @@ -1063,6 +1067,7 @@ mod tests { let clear = DaemonActivity::default(); assert!(!clear.prefers_rebalance_first()); + assert!(!clear.dispatch_cooloff_active()); let unresolved = DaemonActivity { last_dispatch_at: Some(now), @@ -1077,6 +1082,7 @@ mod tests { last_rebalance_leads: 0, }; assert!(unresolved.prefers_rebalance_first()); + assert!(unresolved.dispatch_cooloff_active()); let recovered = DaemonActivity { last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)), @@ -1084,5 +1090,6 @@ mod tests { ..unresolved }; assert!(!recovered.prefers_rebalance_first()); + assert!(!recovered.dispatch_cooloff_active()); } } diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index d5c90f3f..402a69e9 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -1489,7 +1489,9 @@ impl Dashboard { lines.push(format!( "Coordination mode {}", - if self.daemon_activity.prefers_rebalance_first() { + if self.daemon_activity.dispatch_cooloff_active() { + "rebalance-cooloff (chronic saturation)" + } else if self.daemon_activity.prefers_rebalance_first() { "rebalance-first (chronic saturation)" } else { "dispatch-first" @@ -2176,7 +2178,7 @@ mod tests { dashboard.daemon_activity = DaemonActivity { last_dispatch_at: Some(Utc::now()), last_dispatch_routed: 0, - last_dispatch_deferred: 3, + last_dispatch_deferred: 1, last_dispatch_leads: 1, last_recovery_dispatch_at: None, last_recovery_dispatch_routed: 0, @@ -2190,6 +2192,36 @@ mod tests { assert!(text.contains("Coordination mode rebalance-first (chronic saturation)")); } + #[test] + fn selected_session_metrics_text_shows_rebalance_cooloff_mode_when_saturation_is_chronic() { + let mut dashboard = test_dashboard( + vec![sample_session( + "focus-12345678", + "planner", + SessionState::Running, + Some("ecc/focus"), + 512, + 42, + )], + 0, + ); + dashboard.daemon_activity = DaemonActivity { + last_dispatch_at: Some(Utc::now()), + last_dispatch_routed: 0, + last_dispatch_deferred: 3, + last_dispatch_leads: 1, + last_recovery_dispatch_at: None, + last_recovery_dispatch_routed: 0, + last_recovery_dispatch_leads: 0, + last_rebalance_at: Some(Utc::now()), + last_rebalance_rerouted: 1, + last_rebalance_leads: 1, + }; + + let text = dashboard.selected_session_metrics_text(); + assert!(text.contains("Coordination mode rebalance-cooloff (chronic saturation)")); + } + #[test] fn aggregate_cost_summary_mentions_total_cost() { let db = StateStore::open(Path::new(":memory:")).unwrap();