From 349d3a08cb1ec5718bb23d3fc01933537688f040 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Tue, 7 Apr 2026 13:15:03 -0700 Subject: [PATCH] feat: rebalance ecc2 delegate backlog --- ecc2/src/main.rs | 82 +++++++++++++++ ecc2/src/session/manager.rs | 202 ++++++++++++++++++++++++++++++++++++ 2 files changed, 284 insertions(+) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index f645c7d5..23e4a50b 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -90,6 +90,20 @@ enum Commands { #[arg(long, default_value_t = 10)] lead_limit: usize, }, + /// Rebalance unread handoffs off backed-up delegates onto clearer team capacity + RebalanceTeam { + /// Lead session ID or alias + session_id: String, + /// Agent type for routed delegates + #[arg(short, long, default_value = "claude")] + agent: String, + /// Create a dedicated worktree if new delegates must be spawned + #[arg(short, long, default_value_t = true)] + worktree: bool, + /// Maximum handoffs to reroute in one pass + #[arg(long, default_value_t = 5)] + limit: usize, + }, /// List active sessions Sessions, /// Show session details @@ -323,6 +337,46 @@ async fn main() -> Result<()> { } } } + Some(Commands::RebalanceTeam { + session_id, + agent, + worktree: use_worktree, + limit, + }) => { + let lead_id = resolve_session_id(&db, &session_id)?; + let outcomes = session::manager::rebalance_team_backlog( + &db, + &cfg, + &lead_id, + &agent, + use_worktree, + limit, + ) + .await?; + if outcomes.is_empty() { + println!("No delegate backlog needed rebalancing for {}", short_session(&lead_id)); + } else { + println!( + "Rebalanced {} task handoff(s) for {}", + outcomes.len(), + short_session(&lead_id) + ); + for outcome in outcomes { + println!( + "- {} | {} -> {} ({}) | {}", + outcome.message_id, + short_session(&outcome.from_session_id), + short_session(&outcome.session_id), + match outcome.action { + session::manager::AssignmentAction::Spawned => "spawned", + session::manager::AssignmentAction::ReusedIdle => "reused-idle", + session::manager::AssignmentAction::ReusedActive => "reused-active", + }, + outcome.task + ); + } + } + } Some(Commands::Sessions) => { let sessions = session::manager::list_sessions(&db)?; for s in sessions { @@ -692,4 +746,32 @@ mod tests { _ => panic!("expected auto-dispatch subcommand"), } } + + #[test] + fn cli_parses_rebalance_team_command() { + let cli = Cli::try_parse_from([ + "ecc", + "rebalance-team", + "lead", + "--agent", + "claude", + "--limit", + "2", + ]) + .expect("rebalance-team should parse"); + + match cli.command { + Some(Commands::RebalanceTeam { + session_id, + agent, + limit, + .. + }) => { + assert_eq!(session_id, "lead"); + assert_eq!(agent, "claude"); + assert_eq!(limit, 2); + } + _ => panic!("expected rebalance-team subcommand"), + } + } } diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index ab2dfd91..329d0683 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -166,6 +166,113 @@ pub async fn auto_dispatch_backlog( Ok(outcomes) } +pub async fn rebalance_team_backlog( + db: &StateStore, + cfg: &Config, + lead_id: &str, + agent_type: &str, + use_worktree: bool, + limit: usize, +) -> Result> { + let repo_root = + std::env::current_dir().context("Failed to resolve current working directory")?; + let runner_program = std::env::current_exe().context("Failed to resolve ECC executable path")?; + let lead = resolve_session(db, lead_id)?; + let mut outcomes = Vec::new(); + + if limit == 0 { + return Ok(outcomes); + } + + let delegates = direct_delegate_sessions(db, &lead.id, agent_type)?; + let unread_counts = db.unread_message_counts()?; + let team_has_capacity = delegates.len() < cfg.max_parallel_sessions; + + for delegate in &delegates { + if outcomes.len() >= limit { + break; + } + + let unread_count = unread_counts.get(&delegate.id).copied().unwrap_or(0); + if unread_count <= 1 { + continue; + } + + let has_clear_idle_elsewhere = delegates.iter().any(|candidate| { + candidate.id != delegate.id + && candidate.state == SessionState::Idle + && unread_counts.get(&candidate.id).copied().unwrap_or(0) == 0 + }); + + if !has_clear_idle_elsewhere && !team_has_capacity { + continue; + } + + let message_budget = limit.saturating_sub(outcomes.len()); + let messages = db.unread_task_handoffs_for_session(&delegate.id, message_budget)?; + + for message in messages { + if outcomes.len() >= limit { + break; + } + + let current_delegates = direct_delegate_sessions(db, &lead.id, agent_type)?; + let current_unread_counts = db.unread_message_counts()?; + let current_team_has_capacity = current_delegates.len() < cfg.max_parallel_sessions; + let current_has_clear_idle_elsewhere = current_delegates.iter().any(|candidate| { + candidate.id != delegate.id + && candidate.state == SessionState::Idle + && current_unread_counts + .get(&candidate.id) + .copied() + .unwrap_or(0) + == 0 + }); + + if !current_has_clear_idle_elsewhere && !current_team_has_capacity { + break; + } + + if message.from_session != lead.id { + continue; + } + + let task = match comms::parse(&message.content) { + Some(MessageType::TaskHandoff { task, .. }) => task, + _ => extract_legacy_handoff_task(&message.content) + .unwrap_or_else(|| message.content.clone()), + }; + + let outcome = assign_session_in_dir_with_runner_program( + db, + cfg, + &lead.id, + &task, + agent_type, + use_worktree, + &repo_root, + &runner_program, + ) + .await?; + + if outcome.session_id == delegate.id { + continue; + } + + let _ = db.mark_message_read(message.id)?; + outcomes.push(RebalanceOutcome { + from_session_id: delegate.id.clone(), + message_id: message.id, + task, + session_id: outcome.session_id, + action: outcome.action, + }); + } + } + + Ok(outcomes) +} + pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> { stop_session_with_options(db, id, true).await } @@ -850,6 +957,14 @@ pub struct LeadDispatchOutcome { pub routed: Vec, } +pub struct RebalanceOutcome { + pub from_session_id: String, + pub message_id: i64, + pub task: String, + pub session_id: String, + pub action: AssignmentAction, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum AssignmentAction { Spawned, @@ -1739,4 +1854,91 @@ mod tests { Ok(()) } + + #[tokio::test(flavor = "current_thread")] + async fn rebalance_team_backlog_moves_work_off_backed_up_delegate() -> Result<()> { + let tempdir = TestDir::new("manager-rebalance-team")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let mut cfg = build_config(tempdir.path()); + cfg.max_parallel_sessions = 2; + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "lead".to_string(), + task: "lead task".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(42), + worktree: None, + created_at: now - Duration::minutes(4), + updated_at: now - Duration::minutes(4), + metrics: SessionMetrics::default(), + })?; + db.insert_session(&Session { + id: "worker-a".to_string(), + task: "auth lane".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Idle, + pid: None, + worktree: None, + created_at: now - Duration::minutes(3), + updated_at: now - Duration::minutes(3), + metrics: SessionMetrics::default(), + })?; + db.insert_session(&Session { + id: "worker-b".to_string(), + task: "billing lane".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Idle, + pid: None, + worktree: None, + created_at: now - Duration::minutes(2), + updated_at: now - Duration::minutes(2), + metrics: SessionMetrics::default(), + })?; + + db.send_message( + "lead", + "worker-a", + "{\"task\":\"Review auth flow\",\"context\":\"Delegated from lead\"}", + "task_handoff", + )?; + db.send_message( + "lead", + "worker-a", + "{\"task\":\"Check billing integration\",\"context\":\"Delegated from lead\"}", + "task_handoff", + )?; + db.send_message( + "lead", + "worker-b", + "{\"task\":\"Existing clear lane\",\"context\":\"Delegated from lead\"}", + "task_handoff", + )?; + let _ = db.mark_messages_read("worker-b")?; + + let outcomes = rebalance_team_backlog(&db, &cfg, "lead", "claude", true, 5).await?; + assert_eq!(outcomes.len(), 1); + assert_eq!(outcomes[0].from_session_id, "worker-a"); + assert_eq!(outcomes[0].session_id, "worker-b"); + assert_eq!(outcomes[0].action, AssignmentAction::ReusedIdle); + + let unread = db.unread_message_counts()?; + assert_eq!(unread.get("worker-a"), Some(&1)); + assert_eq!(unread.get("worker-b"), Some(&1)); + + let worker_b_messages = db.list_messages_for_session("worker-b", 10)?; + assert!(worker_b_messages.iter().any(|message| { + message.msg_type == "task_handoff" + && message.content.contains("Review auth flow") + })); + + Ok(()) + } }