From a7bfe82af9ca6a02c90bbed9805c74771267e3a2 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 02:27:28 -0700 Subject: [PATCH] feat: auto-rebalance ecc2 delegate teams --- ecc2/src/session/daemon.rs | 126 +++++++++++++++++++++++++++++++++++- ecc2/src/session/manager.rs | 41 ++++++++++++ 2 files changed, 166 insertions(+), 1 deletion(-) diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index 842d23d1..099245da 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -26,6 +26,10 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> { tracing::error!("Auto-dispatch pass failed: {e}"); } + if let Err(e) = maybe_auto_rebalance(&db, &cfg).await { + tracing::error!("Auto-rebalance pass failed: {e}"); + } + time::sleep(heartbeat_interval).await; } } @@ -136,6 +140,53 @@ where Ok(routed) } +async fn maybe_auto_rebalance(db: &StateStore, cfg: &Config) -> Result { + if !cfg.auto_dispatch_unread_handoffs { + return Ok(0); + } + + let outcomes = manager::rebalance_all_teams( + db, + cfg, + &cfg.default_agent, + true, + cfg.max_parallel_sessions, + ) + .await?; + let rerouted: usize = outcomes.iter().map(|outcome| outcome.rerouted.len()).sum(); + + if rerouted > 0 { + tracing::info!( + "Auto-rebalanced {rerouted} task handoff(s) across {} lead session(s)", + outcomes.len() + ); + } + + Ok(rerouted) +} + +async fn maybe_auto_rebalance_with(cfg: &Config, rebalance: F) -> Result +where + F: Fn() -> Fut, + Fut: Future>>, +{ + if !cfg.auto_dispatch_unread_handoffs { + return Ok(0); + } + + let outcomes = rebalance().await?; + let rerouted: usize = outcomes.iter().map(|outcome| outcome.rerouted.len()).sum(); + + if rerouted > 0 { + tracing::info!( + "Auto-rebalanced {rerouted} task handoff(s) across {} lead session(s)", + outcomes.len() + ); + } + + Ok(rerouted) +} + #[cfg(unix)] fn pid_is_alive(pid: u32) -> bool { if pid == 0 { @@ -162,7 +213,10 @@ fn pid_is_alive(_pid: u32) -> bool { #[cfg(test)] mod tests { use super::*; - use crate::session::manager::{AssignmentAction, InboxDrainOutcome, LeadDispatchOutcome}; + use crate::session::manager::{ + AssignmentAction, InboxDrainOutcome, LeadDispatchOutcome, LeadRebalanceOutcome, + RebalanceOutcome, + }; use crate::session::{Session, SessionMetrics, SessionState}; use std::path::PathBuf; @@ -298,4 +352,74 @@ mod tests { let _ = std::fs::remove_file(path); Ok(()) } + + #[tokio::test] + async fn maybe_auto_rebalance_noops_when_disabled() -> Result<()> { + let path = temp_db_path(); + let _store = StateStore::open(&path)?; + let cfg = Config::default(); + let invoked = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let invoked_flag = invoked.clone(); + + let rerouted = maybe_auto_rebalance_with(&cfg, move || { + let invoked_flag = invoked_flag.clone(); + async move { + invoked_flag.store(true, std::sync::atomic::Ordering::SeqCst); + Ok(Vec::new()) + } + }) + .await?; + + assert_eq!(rerouted, 0); + assert!(!invoked.load(std::sync::atomic::Ordering::SeqCst)); + let _ = std::fs::remove_file(path); + Ok(()) + } + + #[tokio::test] + async fn maybe_auto_rebalance_reports_total_rerouted_work() -> Result<()> { + let path = temp_db_path(); + let _store = StateStore::open(&path)?; + let mut cfg = Config::default(); + cfg.auto_dispatch_unread_handoffs = true; + + let rerouted = maybe_auto_rebalance_with(&cfg, || async move { + Ok(vec![ + LeadRebalanceOutcome { + lead_session_id: "lead-a".to_string(), + rerouted: vec![ + RebalanceOutcome { + from_session_id: "worker-a".to_string(), + message_id: 1, + task: "Task A".to_string(), + session_id: "worker-b".to_string(), + action: AssignmentAction::ReusedIdle, + }, + RebalanceOutcome { + from_session_id: "worker-a".to_string(), + message_id: 2, + task: "Task B".to_string(), + session_id: "worker-c".to_string(), + action: AssignmentAction::Spawned, + }, + ], + }, + LeadRebalanceOutcome { + lead_session_id: "lead-b".to_string(), + rerouted: vec![RebalanceOutcome { + from_session_id: "worker-d".to_string(), + message_id: 3, + task: "Task C".to_string(), + session_id: "worker-e".to_string(), + action: AssignmentAction::ReusedActive, + }], + }, + ]) + }) + .await?; + + assert_eq!(rerouted, 3); + let _ = std::fs::remove_file(path); + Ok(()) + } } diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 329d0683..af25a027 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -166,6 +166,42 @@ pub async fn auto_dispatch_backlog( Ok(outcomes) } +pub async fn rebalance_all_teams( + db: &StateStore, + cfg: &Config, + agent_type: &str, + use_worktree: bool, + lead_limit: usize, +) -> Result> { + let sessions = db.list_sessions()?; + let mut outcomes = Vec::new(); + + for session in sessions + .into_iter() + .filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending | SessionState::Idle)) + .take(lead_limit) + { + let rerouted = rebalance_team_backlog( + db, + cfg, + &session.id, + agent_type, + use_worktree, + cfg.auto_dispatch_limit_per_session, + ) + .await?; + + if !rerouted.is_empty() { + outcomes.push(LeadRebalanceOutcome { + lead_session_id: session.id, + rerouted, + }); + } + } + + Ok(outcomes) +} + pub async fn rebalance_team_backlog( db: &StateStore, cfg: &Config, @@ -965,6 +1001,11 @@ pub struct RebalanceOutcome { pub action: AssignmentAction, } +pub struct LeadRebalanceOutcome { + pub lead_session_id: String, + pub rerouted: Vec, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum AssignmentAction { Spawned,