feat: add ecc2 chronic saturation cooloff

This commit is contained in:
Affaan Mustafa
2026-04-08 03:28:21 -07:00
parent a6f798e505
commit d4cdeca946
3 changed files with 95 additions and 2 deletions

View File

@@ -159,6 +159,12 @@ where
{
if prior_activity.prefers_rebalance_first() {
let rebalanced = rebalance().await?;
if prior_activity.dispatch_cooloff_active() && rebalanced == 0 {
tracing::warn!(
"Skipping immediate dispatch retry because chronic saturation cooloff is active"
);
return Ok((DispatchPassSummary::default(), rebalanced, DispatchPassSummary::default()));
}
let first_dispatch = dispatch().await?;
return Ok((first_dispatch, rebalanced, DispatchPassSummary::default()));
}
@@ -688,6 +694,54 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn coordinate_backlog_cycle_skips_dispatch_during_chronic_cooloff_when_rebalance_does_not_help() -> Result<()> {
let cfg = Config {
auto_dispatch_unread_handoffs: true,
..Config::default()
};
let now = chrono::Utc::now();
let activity = DaemonActivity {
last_dispatch_at: Some(now),
last_dispatch_routed: 0,
last_dispatch_deferred: 3,
last_dispatch_leads: 1,
last_recovery_dispatch_at: None,
last_recovery_dispatch_routed: 0,
last_recovery_dispatch_leads: 0,
last_rebalance_at: Some(now - chrono::Duration::seconds(1)),
last_rebalance_rerouted: 0,
last_rebalance_leads: 1,
};
let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let calls_clone = calls.clone();
let (first, rebalanced, recovery) = coordinate_backlog_cycle_with(
&cfg,
&activity,
move || {
let calls_clone = calls_clone.clone();
async move {
calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(DispatchPassSummary {
routed: 1,
deferred: 0,
leads: 1,
})
}
},
|| async move { Ok(0) },
|_, _| Ok(()),
)
.await?;
assert_eq!(first, DispatchPassSummary::default());
assert_eq!(rebalanced, 0);
assert_eq!(recovery, DispatchPassSummary::default());
assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 0);
Ok(())
}
#[tokio::test]
async fn maybe_auto_rebalance_noops_when_disabled() -> Result<()> {
let path = temp_db_path();

View File

@@ -42,6 +42,10 @@ impl DaemonActivity {
_ => false,
}
}
pub fn dispatch_cooloff_active(&self) -> bool {
self.prefers_rebalance_first() && self.last_dispatch_deferred >= 2
}
}
impl StateStore {
@@ -1063,6 +1067,7 @@ mod tests {
let clear = DaemonActivity::default();
assert!(!clear.prefers_rebalance_first());
assert!(!clear.dispatch_cooloff_active());
let unresolved = DaemonActivity {
last_dispatch_at: Some(now),
@@ -1077,6 +1082,7 @@ mod tests {
last_rebalance_leads: 0,
};
assert!(unresolved.prefers_rebalance_first());
assert!(unresolved.dispatch_cooloff_active());
let recovered = DaemonActivity {
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
@@ -1084,5 +1090,6 @@ mod tests {
..unresolved
};
assert!(!recovered.prefers_rebalance_first());
assert!(!recovered.dispatch_cooloff_active());
}
}

View File

@@ -1489,7 +1489,9 @@ impl Dashboard {
lines.push(format!(
"Coordination mode {}",
if self.daemon_activity.prefers_rebalance_first() {
if self.daemon_activity.dispatch_cooloff_active() {
"rebalance-cooloff (chronic saturation)"
} else if self.daemon_activity.prefers_rebalance_first() {
"rebalance-first (chronic saturation)"
} else {
"dispatch-first"
@@ -2176,7 +2178,7 @@ mod tests {
dashboard.daemon_activity = DaemonActivity {
last_dispatch_at: Some(Utc::now()),
last_dispatch_routed: 0,
last_dispatch_deferred: 3,
last_dispatch_deferred: 1,
last_dispatch_leads: 1,
last_recovery_dispatch_at: None,
last_recovery_dispatch_routed: 0,
@@ -2190,6 +2192,36 @@ mod tests {
assert!(text.contains("Coordination mode rebalance-first (chronic saturation)"));
}
#[test]
fn selected_session_metrics_text_shows_rebalance_cooloff_mode_when_saturation_is_chronic() {
let mut dashboard = test_dashboard(
vec![sample_session(
"focus-12345678",
"planner",
SessionState::Running,
Some("ecc/focus"),
512,
42,
)],
0,
);
dashboard.daemon_activity = DaemonActivity {
last_dispatch_at: Some(Utc::now()),
last_dispatch_routed: 0,
last_dispatch_deferred: 3,
last_dispatch_leads: 1,
last_recovery_dispatch_at: None,
last_recovery_dispatch_routed: 0,
last_recovery_dispatch_leads: 0,
last_rebalance_at: Some(Utc::now()),
last_rebalance_rerouted: 1,
last_rebalance_leads: 1,
};
let text = dashboard.selected_session_metrics_text();
assert!(text.contains("Coordination mode rebalance-cooloff (chronic saturation)"));
}
#[test]
fn aggregate_cost_summary_mentions_total_cost() {
let db = StateStore::open(Path::new(":memory:")).unwrap();