From d738089e3ea1d7770b6ef707025af1dc9c37a6e0 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 13:22:02 -0700 Subject: [PATCH] feat: add ecc2 looping backlog coordination --- ecc2/src/main.rs | 187 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 139 insertions(+), 48 deletions(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index aef8bd09..f568abf7 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -101,6 +101,12 @@ enum Commands { /// Maximum lead sessions to sweep in one pass #[arg(long, default_value_t = 10)] lead_limit: usize, + /// Keep coordinating until the backlog is healthy, saturated, or max passes is reached + #[arg(long)] + until_healthy: bool, + /// Maximum coordination passes when using --until-healthy + #[arg(long, default_value_t = 5)] + max_passes: usize, }, /// Show global coordination, backlog, and daemon policy status CoordinationStatus { @@ -414,57 +420,57 @@ async fn main() -> Result<()> { agent, worktree: use_worktree, lead_limit, + until_healthy, + max_passes, }) => { - let outcome = session::manager::coordinate_backlog( - &db, - &cfg, - &agent, - use_worktree, - lead_limit, - ) - .await?; - let total_processed: usize = outcome - .dispatched - .iter() - .map(|dispatch| dispatch.routed.len()) - .sum(); - let total_routed: usize = outcome - .dispatched - .iter() - .map(|dispatch| { - dispatch - .routed - .iter() - .filter(|item| session::manager::assignment_action_routes_work(item.action)) - .count() - }) - .sum(); - let total_deferred = total_processed.saturating_sub(total_routed); - let total_rerouted: usize = outcome - .rebalanced - .iter() - .map(|rebalance| rebalance.rerouted.len()) - .sum(); - - if total_routed == 0 - && total_rerouted == 0 - && outcome.remaining_backlog_sessions == 0 - { - println!("Backlog already clear"); + let pass_budget = if until_healthy { + max_passes.max(1) } else { - println!( - "Coordinated backlog: processed {} handoff(s) across {} lead(s) ({} routed, {} deferred); rebalanced {} handoff(s) across {} lead(s); remaining {} handoff(s) across {} session(s) [{} absorbable, {} saturated]", - total_processed, - outcome.dispatched.len(), - total_routed, - total_deferred, - total_rerouted, - outcome.rebalanced.len(), - outcome.remaining_backlog_messages, - outcome.remaining_backlog_sessions, - outcome.remaining_absorbable_sessions, - outcome.remaining_saturated_sessions + 1 + }; + let mut final_status = None; + + for pass in 1..=pass_budget { + let outcome = session::manager::coordinate_backlog( + &db, + &cfg, + &agent, + use_worktree, + lead_limit, + ) + .await?; + let summary = summarize_coordinate_backlog(&outcome); + + if pass_budget > 1 { + println!("Pass {pass}/{pass_budget}: {summary}"); + } else { + println!("{summary}"); + } + + let status = session::manager::get_coordination_status(&db, &cfg)?; + let should_stop = matches!( + status.health, + session::manager::CoordinationHealth::Healthy + | session::manager::CoordinationHealth::Saturated + | session::manager::CoordinationHealth::EscalationRequired ); + final_status = Some(status); + + if should_stop { + break; + } + } + + if pass_budget > 1 { + if let Some(status) = final_status { + println!( + "Final coordination health: {:?} | mode {:?} | backlog {} handoff(s) across {} lead(s)", + status.health, + status.mode, + status.backlog_messages, + status.backlog_leads + ); + } } } Some(Commands::CoordinationStatus { json, check }) => { @@ -691,6 +697,49 @@ fn format_coordination_status( Ok(status.to_string()) } +fn summarize_coordinate_backlog(outcome: &session::manager::CoordinateBacklogOutcome) -> String { + let total_processed: usize = outcome + .dispatched + .iter() + .map(|dispatch| dispatch.routed.len()) + .sum(); + let total_routed: usize = outcome + .dispatched + .iter() + .map(|dispatch| { + dispatch + .routed + .iter() + .filter(|item| session::manager::assignment_action_routes_work(item.action)) + .count() + }) + .sum(); + let total_deferred = total_processed.saturating_sub(total_routed); + let total_rerouted: usize = outcome + .rebalanced + .iter() + .map(|rebalance| rebalance.rerouted.len()) + .sum(); + + if total_routed == 0 && total_rerouted == 0 && outcome.remaining_backlog_sessions == 0 { + "Backlog already clear".to_string() + } else { + format!( + "Coordinated backlog: processed {} handoff(s) across {} lead(s) ({} routed, {} deferred); rebalanced {} handoff(s) across {} lead(s); remaining {} handoff(s) across {} session(s) [{} absorbable, {} saturated]", + total_processed, + outcome.dispatched.len(), + total_routed, + total_deferred, + total_rerouted, + outcome.rebalanced.len(), + outcome.remaining_backlog_messages, + outcome.remaining_backlog_sessions, + outcome.remaining_absorbable_sessions, + outcome.remaining_saturated_sessions + ) + } +} + fn coordination_status_exit_code(status: &session::manager::CoordinationStatus) -> i32 { match status.health { session::manager::CoordinationHealth::Healthy => 0, @@ -955,10 +1004,38 @@ mod tests { Some(Commands::CoordinateBacklog { agent, lead_limit, + until_healthy, + max_passes, .. }) => { assert_eq!(agent, "claude"); assert_eq!(lead_limit, 7); + assert!(!until_healthy); + assert_eq!(max_passes, 5); + } + _ => panic!("expected coordinate-backlog subcommand"), + } + } + + #[test] + fn cli_parses_coordinate_backlog_until_healthy_flags() { + let cli = Cli::try_parse_from([ + "ecc", + "coordinate-backlog", + "--until-healthy", + "--max-passes", + "3", + ]) + .expect("coordinate-backlog looping flags should parse"); + + match cli.command { + Some(Commands::CoordinateBacklog { + until_healthy, + max_passes, + .. + }) => { + assert!(until_healthy); + assert_eq!(max_passes, 3); } _ => panic!("expected coordinate-backlog subcommand"), } @@ -1093,6 +1170,20 @@ mod tests { assert_eq!(coordination_status_exit_code(&saturated), 2); } + #[test] + fn summarize_coordinate_backlog_reports_clear_state() { + let summary = summarize_coordinate_backlog(&session::manager::CoordinateBacklogOutcome { + dispatched: Vec::new(), + rebalanced: Vec::new(), + remaining_backlog_sessions: 0, + remaining_backlog_messages: 0, + remaining_absorbable_sessions: 0, + remaining_saturated_sessions: 0, + }); + + assert_eq!(summary, "Backlog already clear"); + } + #[test] fn cli_parses_rebalance_team_command() { let cli = Cli::try_parse_from([