From 051d47eb5f9250ca3a2377e411db88cea406a1fc Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 03:40:26 -0700 Subject: [PATCH] feat: relax ecc2 stabilized cycles --- ecc2/src/session/daemon.rs | 56 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index 5ddded80..c63195e3 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -177,6 +177,12 @@ where } let first_dispatch = dispatch().await?; + if prior_activity.stabilized_after_recovery_at().is_some() && first_dispatch.deferred == 0 { + tracing::info!( + "Skipping rebalance because stabilized dispatch cycle has no deferred handoffs" + ); + return Ok((first_dispatch, 0, DispatchPassSummary::default())); + } let rebalanced = rebalance().await?; let recovery_dispatch = if first_dispatch.deferred > 0 && rebalanced > 0 { let recovery = dispatch().await?; @@ -796,6 +802,56 @@ mod tests { Ok(()) } + #[tokio::test] + async fn coordinate_backlog_cycle_skips_rebalance_when_stabilized_and_dispatch_is_healthy() -> Result<()> { + let cfg = Config { + auto_dispatch_unread_handoffs: true, + ..Config::default() + }; + let now = chrono::Utc::now(); + let activity = DaemonActivity { + last_dispatch_at: Some(now + chrono::Duration::seconds(2)), + last_dispatch_routed: 2, + last_dispatch_deferred: 0, + last_dispatch_leads: 1, + last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)), + last_recovery_dispatch_routed: 1, + last_recovery_dispatch_leads: 1, + last_rebalance_at: Some(now), + last_rebalance_rerouted: 1, + last_rebalance_leads: 1, + }; + let rebalance_calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let rebalance_calls_clone = rebalance_calls.clone(); + + let (first, rebalanced, recovery) = coordinate_backlog_cycle_with( + &cfg, + &activity, + || async move { + Ok(DispatchPassSummary { + routed: 1, + deferred: 0, + leads: 1, + }) + }, + move || { + let rebalance_calls_clone = rebalance_calls_clone.clone(); + async move { + rebalance_calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(1) + } + }, + |_, _| Ok(()), + ) + .await?; + + assert_eq!(first.routed, 1); + assert_eq!(rebalanced, 0); + assert_eq!(recovery, DispatchPassSummary::default()); + assert_eq!(rebalance_calls.load(std::sync::atomic::Ordering::SeqCst), 0); + Ok(()) + } + #[tokio::test] async fn maybe_auto_rebalance_noops_when_disabled() -> Result<()> { let path = temp_db_path();