From f498dc09714046a663be901ed7e133a2b1fd22df Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 03:17:44 -0700 Subject: [PATCH] feat: prefer ecc2 rebalance after chronic saturation --- ecc2/src/session/daemon.rs | 114 +++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index 089d40c9..e166c974 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -111,8 +111,10 @@ async fn maybe_auto_dispatch(db: &StateStore, cfg: &Config) -> Result { } async fn coordinate_backlog_cycle(db: &StateStore, cfg: &Config) -> Result<()> { + let activity = db.daemon_activity()?; coordinate_backlog_cycle_with( cfg, + &activity, || { maybe_auto_dispatch_with_recorder(cfg, || { manager::auto_dispatch_backlog( @@ -143,6 +145,7 @@ async fn coordinate_backlog_cycle(db: &StateStore, cfg: &Config) -> Result<()> { async fn coordinate_backlog_cycle_with( _cfg: &Config, + prior_activity: &super::store::DaemonActivity, dispatch: DF, rebalance: RF, mut record_recovery: Rec, @@ -154,6 +157,12 @@ where RFut: Future>, Rec: FnMut(usize, usize) -> Result<()>, { + if should_rebalance_first(prior_activity) { + let rebalanced = rebalance().await?; + let first_dispatch = dispatch().await?; + return Ok((first_dispatch, rebalanced, DispatchPassSummary::default())); + } + let first_dispatch = dispatch().await?; let rebalanced = rebalance().await?; let recovery_dispatch = if first_dispatch.deferred > 0 && rebalanced > 0 { @@ -173,6 +182,21 @@ where Ok((first_dispatch, rebalanced, recovery_dispatch)) } +fn should_rebalance_first(activity: &super::store::DaemonActivity) -> bool { + if activity.last_dispatch_deferred == 0 { + return false; + } + + match ( + activity.last_dispatch_at.as_ref(), + activity.last_recovery_dispatch_at.as_ref(), + ) { + (Some(dispatch_at), Some(recovery_at)) => recovery_at < dispatch_at, + (Some(_), None) => true, + _ => false, + } +} + async fn maybe_auto_dispatch_with(cfg: &Config, dispatch: F) -> Result where F: Fn() -> Fut, @@ -317,6 +341,7 @@ mod tests { AssignmentAction, InboxDrainOutcome, LeadDispatchOutcome, LeadRebalanceOutcome, RebalanceOutcome, }; + use crate::session::store::DaemonActivity; use crate::session::{Session, SessionMetrics, SessionState}; use std::path::PathBuf; @@ -504,11 +529,13 @@ mod tests { auto_dispatch_unread_handoffs: true, ..Config::default() }; + let activity = DaemonActivity::default(); let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); let calls_clone = calls.clone(); let (first, rebalanced, recovery) = coordinate_backlog_cycle_with( &cfg, + &activity, move || { let calls_clone = calls_clone.clone(); async move { @@ -545,11 +572,13 @@ mod tests { auto_dispatch_unread_handoffs: true, ..Config::default() }; + let activity = DaemonActivity::default(); let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); let calls_clone = calls.clone(); let (first, rebalanced, recovery) = coordinate_backlog_cycle_with( &cfg, + &activity, move || { let calls_clone = calls_clone.clone(); async move { @@ -579,6 +608,7 @@ mod tests { auto_dispatch_unread_handoffs: true, ..Config::default() }; + let activity = DaemonActivity::default(); let recorded = std::sync::Arc::new(std::sync::Mutex::new(None)); let recorded_clone = recorded.clone(); let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); @@ -586,6 +616,7 @@ mod tests { let (_first, _rebalanced, recovery) = coordinate_backlog_cycle_with( &cfg, + &activity, move || { let calls_clone = calls_clone.clone(); async move { @@ -617,6 +648,89 @@ mod tests { Ok(()) } + #[test] + fn should_rebalance_first_only_after_unrecovered_deferred_pressure() { + let now = chrono::Utc::now(); + + assert!(!should_rebalance_first(&DaemonActivity::default())); + + let unresolved = DaemonActivity { + last_dispatch_at: Some(now), + last_dispatch_routed: 0, + last_dispatch_deferred: 2, + last_dispatch_leads: 1, + last_recovery_dispatch_at: None, + last_recovery_dispatch_routed: 0, + last_recovery_dispatch_leads: 0, + last_rebalance_at: None, + last_rebalance_rerouted: 0, + last_rebalance_leads: 0, + }; + assert!(should_rebalance_first(&unresolved)); + + let recovered = DaemonActivity { + last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)), + last_recovery_dispatch_routed: 1, + ..unresolved.clone() + }; + assert!(!should_rebalance_first(&recovered)); + } + + #[tokio::test] + async fn coordinate_backlog_cycle_rebalances_first_after_unrecovered_deferred_pressure() -> Result<()> { + let cfg = Config { + auto_dispatch_unread_handoffs: true, + ..Config::default() + }; + let now = chrono::Utc::now(); + let activity = DaemonActivity { + last_dispatch_at: Some(now), + last_dispatch_routed: 0, + last_dispatch_deferred: 2, + last_dispatch_leads: 1, + last_recovery_dispatch_at: None, + last_recovery_dispatch_routed: 0, + last_recovery_dispatch_leads: 0, + last_rebalance_at: None, + last_rebalance_rerouted: 0, + last_rebalance_leads: 0, + }; + let order = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let dispatch_order = order.clone(); + let rebalance_order = order.clone(); + + let (first, rebalanced, recovery) = coordinate_backlog_cycle_with( + &cfg, + &activity, + move || { + let dispatch_order = dispatch_order.clone(); + async move { + dispatch_order.lock().unwrap().push("dispatch"); + Ok(DispatchPassSummary { + routed: 1, + deferred: 0, + leads: 1, + }) + } + }, + move || { + let rebalance_order = rebalance_order.clone(); + async move { + rebalance_order.lock().unwrap().push("rebalance"); + Ok(1) + } + }, + |_, _| Ok(()), + ) + .await?; + + assert_eq!(*order.lock().unwrap(), vec!["rebalance", "dispatch"]); + assert_eq!(first.routed, 1); + assert_eq!(rebalanced, 1); + assert_eq!(recovery, DispatchPassSummary::default()); + Ok(()) + } + #[tokio::test] async fn maybe_auto_rebalance_noops_when_disabled() -> Result<()> { let path = temp_db_path();