From a6f798e505c643f33e2a1cb843aa8507251f8336 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 03:20:47 -0700 Subject: [PATCH] feat: show ecc2 chronic saturation mode --- ecc2/src/session/daemon.rs | 45 +------------------------------------ ecc2/src/session/store.rs | 46 ++++++++++++++++++++++++++++++++++++++ ecc2/src/tui/dashboard.rs | 41 +++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 44 deletions(-) diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index e166c974..388c6162 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -157,7 +157,7 @@ where RFut: Future>, Rec: FnMut(usize, usize) -> Result<()>, { - if should_rebalance_first(prior_activity) { + if prior_activity.prefers_rebalance_first() { let rebalanced = rebalance().await?; let first_dispatch = dispatch().await?; return Ok((first_dispatch, rebalanced, DispatchPassSummary::default())); @@ -182,21 +182,6 @@ where Ok((first_dispatch, rebalanced, recovery_dispatch)) } -fn should_rebalance_first(activity: &super::store::DaemonActivity) -> bool { - if activity.last_dispatch_deferred == 0 { - return false; - } - - match ( - activity.last_dispatch_at.as_ref(), - activity.last_recovery_dispatch_at.as_ref(), - ) { - (Some(dispatch_at), Some(recovery_at)) => recovery_at < dispatch_at, - (Some(_), None) => true, - _ => false, - } -} - async fn maybe_auto_dispatch_with(cfg: &Config, dispatch: F) -> Result where F: Fn() -> Fut, @@ -648,34 +633,6 @@ mod tests { Ok(()) } - #[test] - fn should_rebalance_first_only_after_unrecovered_deferred_pressure() { - let now = chrono::Utc::now(); - - assert!(!should_rebalance_first(&DaemonActivity::default())); - - let unresolved = DaemonActivity { - last_dispatch_at: Some(now), - last_dispatch_routed: 0, - last_dispatch_deferred: 2, - last_dispatch_leads: 1, - last_recovery_dispatch_at: None, - last_recovery_dispatch_routed: 0, - last_recovery_dispatch_leads: 0, - last_rebalance_at: None, - last_rebalance_rerouted: 0, - last_rebalance_leads: 0, - }; - assert!(should_rebalance_first(&unresolved)); - - let recovered = DaemonActivity { - last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)), - last_recovery_dispatch_routed: 1, - ..unresolved.clone() - }; - assert!(!should_rebalance_first(&recovered)); - } - #[tokio::test] async fn coordinate_backlog_cycle_rebalances_first_after_unrecovered_deferred_pressure() -> Result<()> { let cfg = Config { diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 57b182e5..c6a1018a 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -27,6 +27,23 @@ pub struct DaemonActivity { pub last_rebalance_leads: usize, } +impl DaemonActivity { + pub fn prefers_rebalance_first(&self) -> bool { + if self.last_dispatch_deferred == 0 { + return false; + } + + match ( + self.last_dispatch_at.as_ref(), + self.last_recovery_dispatch_at.as_ref(), + ) { + (Some(dispatch_at), Some(recovery_at)) => recovery_at < dispatch_at, + (Some(_), None) => true, + _ => false, + } + } +} + impl StateStore { pub fn open(path: &Path) -> Result { let conn = Connection::open(path)?; @@ -1039,4 +1056,33 @@ mod tests { Ok(()) } + + #[test] + fn daemon_activity_detects_rebalance_first_mode() { + let now = chrono::Utc::now(); + + let clear = DaemonActivity::default(); + assert!(!clear.prefers_rebalance_first()); + + let unresolved = DaemonActivity { + last_dispatch_at: Some(now), + last_dispatch_routed: 0, + last_dispatch_deferred: 2, + last_dispatch_leads: 1, + last_recovery_dispatch_at: None, + last_recovery_dispatch_routed: 0, + last_recovery_dispatch_leads: 0, + last_rebalance_at: None, + last_rebalance_rerouted: 0, + last_rebalance_leads: 0, + }; + assert!(unresolved.prefers_rebalance_first()); + + let recovered = DaemonActivity { + last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)), + last_recovery_dispatch_routed: 1, + ..unresolved + }; + assert!(!recovered.prefers_rebalance_first()); + } } diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index e7b9d327..d5c90f3f 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -1487,6 +1487,15 @@ impl Dashboard { self.cfg.auto_dispatch_limit_per_session )); + lines.push(format!( + "Coordination mode {}", + if self.daemon_activity.prefers_rebalance_first() { + "rebalance-first (chronic saturation)" + } else { + "dispatch-first" + } + )); + if let Some(last_dispatch_at) = self.daemon_activity.last_dispatch_at.as_ref() { lines.push(format!( "Last daemon dispatch {} routed / {} deferred across {} lead(s) @ {}", @@ -2114,6 +2123,7 @@ mod tests { let text = dashboard.selected_session_metrics_text(); assert!(text.contains("Team 3/8 | idle 1 | running 1 | pending 1 | failed 0 | stopped 0")); assert!(text.contains("Global handoff backlog 2 lead(s) / 5 handoff(s) | Auto-dispatch off @ 5/lead")); + assert!(text.contains("Coordination mode dispatch-first")); assert!(text.contains("Next route reuse idle worker-1")); } @@ -2144,11 +2154,42 @@ mod tests { }; let text = dashboard.selected_session_metrics_text(); + assert!(text.contains("Coordination mode dispatch-first")); assert!(text.contains("Last daemon dispatch 4 routed / 2 deferred across 2 lead(s)")); assert!(text.contains("Last daemon recovery dispatch 1 handoff(s) across 1 lead(s)")); assert!(text.contains("Last daemon rebalance 1 handoff(s) across 1 lead(s)")); } + #[test] + fn selected_session_metrics_text_shows_rebalance_first_mode_when_saturation_is_unrecovered() { + let mut dashboard = test_dashboard( + vec![sample_session( + "focus-12345678", + "planner", + SessionState::Running, + Some("ecc/focus"), + 512, + 42, + )], + 0, + ); + dashboard.daemon_activity = DaemonActivity { + last_dispatch_at: Some(Utc::now()), + last_dispatch_routed: 0, + last_dispatch_deferred: 3, + last_dispatch_leads: 1, + last_recovery_dispatch_at: None, + last_recovery_dispatch_routed: 0, + last_recovery_dispatch_leads: 0, + last_rebalance_at: Some(Utc::now()), + last_rebalance_rerouted: 1, + last_rebalance_leads: 1, + }; + + let text = dashboard.selected_session_metrics_text(); + assert!(text.contains("Coordination mode rebalance-first (chronic saturation)")); + } + #[test] fn aggregate_cost_summary_mentions_total_cost() { let db = StateStore::open(Path::new(":memory:")).unwrap();