From 19ad704216fabae015ffa5ac646a366f601ad5ff Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 03:09:29 -0700 Subject: [PATCH] feat: retry deferred ecc2 dispatch after rebalance --- ecc2/src/session/daemon.rs | 202 ++++++++++++++++++++++++++++++++++--- 1 file changed, 186 insertions(+), 16 deletions(-) diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index 508bc2d9..b58c3932 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -8,6 +8,13 @@ use super::store::StateStore; use super::SessionState; use crate::config::Config; +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +struct DispatchPassSummary { + routed: usize, + deferred: usize, + leads: usize, +} + /// Background daemon that monitors sessions, handles heartbeats, /// and cleans up stale resources. pub async fn run(db: StateStore, cfg: Config) -> Result<()> { @@ -22,12 +29,8 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> { tracing::error!("Session check failed: {e}"); } - if let Err(e) = maybe_auto_dispatch(&db, &cfg).await { - tracing::error!("Auto-dispatch pass failed: {e}"); - } - - if let Err(e) = maybe_auto_rebalance(&db, &cfg).await { - tracing::error!("Auto-rebalance pass failed: {e}"); + if let Err(e) = coordinate_backlog_cycle(&db, &cfg).await { + tracing::error!("Backlog coordination pass failed: {e}"); } time::sleep(heartbeat_interval).await; @@ -94,7 +97,7 @@ fn check_sessions(db: &StateStore, timeout: Duration) -> Result<()> { } async fn maybe_auto_dispatch(db: &StateStore, cfg: &Config) -> Result { - maybe_auto_dispatch_with_recorder(cfg, || { + let summary = maybe_auto_dispatch_with_recorder(cfg, || { manager::auto_dispatch_backlog( db, cfg, @@ -103,7 +106,67 @@ async fn maybe_auto_dispatch(db: &StateStore, cfg: &Config) -> Result { cfg.max_parallel_sessions, ) }, |routed, leads| db.record_daemon_dispatch_pass(routed, leads)) - .await + .await?; + Ok(summary.routed) +} + +async fn coordinate_backlog_cycle(db: &StateStore, cfg: &Config) -> Result<()> { + coordinate_backlog_cycle_with( + cfg, + || { + maybe_auto_dispatch_with_recorder(cfg, || { + manager::auto_dispatch_backlog( + db, + cfg, + &cfg.default_agent, + true, + cfg.max_parallel_sessions, + ) + }, |routed, leads| db.record_daemon_dispatch_pass(routed, leads)) + }, + || { + maybe_auto_rebalance_with_recorder(cfg, || { + manager::rebalance_all_teams( + db, + cfg, + &cfg.default_agent, + true, + cfg.max_parallel_sessions, + ) + }, |rerouted, leads| db.record_daemon_rebalance_pass(rerouted, leads)) + }, + ) + .await?; + Ok(()) +} + +async fn coordinate_backlog_cycle_with( + _cfg: &Config, + dispatch: DF, + rebalance: RF, +) -> Result<(DispatchPassSummary, usize, DispatchPassSummary)> +where + DF: Fn() -> DFut, + DFut: Future>, + RF: Fn() -> RFut, + RFut: Future>, +{ + let first_dispatch = dispatch().await?; + let rebalanced = rebalance().await?; + let recovery_dispatch = if first_dispatch.deferred > 0 && rebalanced > 0 { + let recovery = dispatch().await?; + if recovery.routed > 0 { + tracing::info!( + "Recovered {} deferred task handoff(s) after rebalancing", + recovery.routed + ); + } + recovery + } else { + DispatchPassSummary::default() + }; + + Ok((first_dispatch, rebalanced, recovery_dispatch)) } async fn maybe_auto_dispatch_with(cfg: &Config, dispatch: F) -> Result @@ -111,31 +174,64 @@ where F: Fn() -> Fut, Fut: Future>>, { - maybe_auto_dispatch_with_recorder(cfg, dispatch, |_, _| Ok(())).await + Ok(maybe_auto_dispatch_with_recorder(cfg, dispatch, |_, _| Ok(())).await?.routed) } -async fn maybe_auto_dispatch_with_recorder(cfg: &Config, dispatch: F, mut record: R) -> Result +async fn maybe_auto_dispatch_with_recorder( + cfg: &Config, + dispatch: F, + mut record: R, +) -> Result where F: Fn() -> Fut, Fut: Future>>, R: FnMut(usize, usize) -> Result<()>, { if !cfg.auto_dispatch_unread_handoffs { - return Ok(0); + return Ok(DispatchPassSummary::default()); } let outcomes = dispatch().await?; - let routed: usize = outcomes.iter().map(|outcome| outcome.routed.len()).sum(); - record(routed, outcomes.len())?; + let routed: usize = outcomes + .iter() + .map(|outcome| { + outcome + .routed + .iter() + .filter(|item| manager::assignment_action_routes_work(item.action)) + .count() + }) + .sum(); + let deferred: usize = outcomes + .iter() + .map(|outcome| { + outcome + .routed + .iter() + .filter(|item| !manager::assignment_action_routes_work(item.action)) + .count() + }) + .sum(); + let leads = outcomes.len(); + record(routed, leads)?; if routed > 0 { tracing::info!( "Auto-dispatched {routed} task handoff(s) across {} lead session(s)", - outcomes.len() + leads + ); + } + if deferred > 0 { + tracing::warn!( + "Deferred {deferred} task handoff(s) because delegate teams were saturated" ); } - Ok(routed) + Ok(DispatchPassSummary { + routed, + deferred, + leads, + }) } async fn maybe_auto_rebalance(db: &StateStore, cfg: &Config) -> Result { @@ -391,12 +487,86 @@ mod tests { ) .await?; - assert_eq!(routed, 2); + assert_eq!(routed.routed, 2); + assert_eq!(routed.deferred, 0); assert_eq!(*recorded.lock().unwrap(), Some((2, 1))); let _ = std::fs::remove_file(path); Ok(()) } + #[tokio::test] + async fn coordinate_backlog_cycle_retries_after_rebalance_when_dispatch_deferred() -> Result<()> { + let cfg = Config { + auto_dispatch_unread_handoffs: true, + ..Config::default() + }; + let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let calls_clone = calls.clone(); + + let (first, rebalanced, recovery) = coordinate_backlog_cycle_with( + &cfg, + move || { + let calls_clone = calls_clone.clone(); + async move { + let call = calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(match call { + 0 => DispatchPassSummary { + routed: 0, + deferred: 2, + leads: 1, + }, + _ => DispatchPassSummary { + routed: 2, + deferred: 0, + leads: 1, + }, + }) + } + }, + || async move { Ok(1) }, + ) + .await?; + + assert_eq!(first.deferred, 2); + assert_eq!(rebalanced, 1); + assert_eq!(recovery.routed, 2); + assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 2); + Ok(()) + } + + #[tokio::test] + async fn coordinate_backlog_cycle_skips_retry_without_rebalance() -> Result<()> { + let cfg = Config { + auto_dispatch_unread_handoffs: true, + ..Config::default() + }; + let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let calls_clone = calls.clone(); + + let (first, rebalanced, recovery) = coordinate_backlog_cycle_with( + &cfg, + move || { + let calls_clone = calls_clone.clone(); + async move { + calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(DispatchPassSummary { + routed: 0, + deferred: 2, + leads: 1, + }) + } + }, + || async move { Ok(0) }, + ) + .await?; + + assert_eq!(first.deferred, 2); + assert_eq!(rebalanced, 0); + assert_eq!(recovery, DispatchPassSummary::default()); + assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 1); + Ok(()) + } + #[tokio::test] async fn maybe_auto_rebalance_noops_when_disabled() -> Result<()> { let path = temp_db_path();