From afb97961e3552bb48db70fa9f98eb718490cb8cf Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 13:31:11 -0700 Subject: [PATCH] feat: add ecc2 maintain coordination command --- ecc2/src/main.rs | 266 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 212 insertions(+), 54 deletions(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 400f42f8..71150622 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -124,6 +124,27 @@ enum Commands { #[arg(long)] check: bool, }, + /// Coordinate only when backlog pressure actually needs work + MaintainCoordination { + /// Agent type for routed delegates + #[arg(short, long, default_value = "claude")] + agent: String, + /// Create a dedicated worktree if new delegates must be spawned + #[arg(short, long, default_value_t = true)] + worktree: bool, + /// Maximum lead sessions to sweep in one pass + #[arg(long, default_value_t = 10)] + lead_limit: usize, + /// Emit machine-readable JSON instead of the human summary + #[arg(long)] + json: bool, + /// Return a non-zero exit code from the final coordination health + #[arg(long)] + check: bool, + /// Maximum coordination passes when maintenance is needed + #[arg(long, default_value_t = 5)] + max_passes: usize, + }, /// Rebalance unread handoffs across lead teams with backed-up delegates RebalanceAll { /// Agent type for routed delegates @@ -437,65 +458,24 @@ async fn main() -> Result<()> { } else { 1 }; - let mut final_status = None; - let mut pass_summaries = Vec::new(); - - for pass in 1..=pass_budget { - let outcome = session::manager::coordinate_backlog( - &db, - &cfg, - &agent, - use_worktree, - lead_limit, - ) - .await?; - let mut summary = summarize_coordinate_backlog(&outcome); - summary.pass = pass; - pass_summaries.push(summary.clone()); - - if !json { - if pass_budget > 1 { - println!("Pass {pass}/{pass_budget}: {}", summary.message); - } else { - println!("{}", summary.message); - } - } - - let status = session::manager::get_coordination_status(&db, &cfg)?; - let should_stop = matches!( - status.health, - session::manager::CoordinationHealth::Healthy - | session::manager::CoordinationHealth::Saturated - | session::manager::CoordinationHealth::EscalationRequired - ); - final_status = Some(status); - - if should_stop { - break; - } - } + let run = run_coordination_loop( + &db, + &cfg, + &agent, + use_worktree, + lead_limit, + pass_budget, + !json, + ) + .await?; if json { - let payload = CoordinateBacklogRun { - pass_budget, - passes: pass_summaries, - final_status: final_status.clone(), - }; - println!("{}", serde_json::to_string_pretty(&payload)?); - } else if pass_budget > 1 { - if let Some(status) = final_status.as_ref() { - println!( - "Final coordination health: {:?} | mode {:?} | backlog {} handoff(s) across {} lead(s)", - status.health, - status.mode, - status.backlog_messages, - status.backlog_leads - ); - } + println!("{}", serde_json::to_string_pretty(&run)?); } if check { - let exit_code = final_status + let exit_code = run + .final_status .as_ref() .map(coordination_status_exit_code) .unwrap_or(0); @@ -509,6 +489,55 @@ async fn main() -> Result<()> { std::process::exit(coordination_status_exit_code(&status)); } } + Some(Commands::MaintainCoordination { + agent, + worktree: use_worktree, + lead_limit, + json, + check, + max_passes, + }) => { + let initial_status = session::manager::get_coordination_status(&db, &cfg)?; + let run = if matches!( + initial_status.health, + session::manager::CoordinationHealth::Healthy + ) { + None + } else { + Some( + run_coordination_loop( + &db, + &cfg, + &agent, + use_worktree, + lead_limit, + max_passes.max(1), + !json, + ) + .await?, + ) + }; + let final_status = run + .as_ref() + .and_then(|run| run.final_status.clone()) + .unwrap_or_else(|| initial_status.clone()); + + if json { + let payload = MaintainCoordinationRun { + skipped: run.is_none(), + initial_status, + run, + final_status: final_status.clone(), + }; + println!("{}", serde_json::to_string_pretty(&payload)?); + } else if run.is_none() { + println!("Coordination already healthy"); + } + + if check { + std::process::exit(coordination_status_exit_code(&final_status)); + } + } Some(Commands::RebalanceAll { agent, worktree: use_worktree, @@ -726,6 +755,65 @@ fn format_coordination_status( Ok(status.to_string()) } +async fn run_coordination_loop( + db: &session::store::StateStore, + cfg: &config::Config, + agent: &str, + use_worktree: bool, + lead_limit: usize, + pass_budget: usize, + emit_progress: bool, +) -> Result { + let mut final_status = None; + let mut pass_summaries = Vec::new(); + + for pass in 1..=pass_budget.max(1) { + let outcome = + session::manager::coordinate_backlog(db, cfg, agent, use_worktree, lead_limit).await?; + let mut summary = summarize_coordinate_backlog(&outcome); + summary.pass = pass; + pass_summaries.push(summary.clone()); + + if emit_progress { + if pass_budget > 1 { + println!("Pass {pass}/{pass_budget}: {}", summary.message); + } else { + println!("{}", summary.message); + } + } + + let status = session::manager::get_coordination_status(db, cfg)?; + let should_stop = matches!( + status.health, + session::manager::CoordinationHealth::Healthy + | session::manager::CoordinationHealth::Saturated + | session::manager::CoordinationHealth::EscalationRequired + ); + final_status = Some(status); + + if should_stop { + break; + } + } + + let run = CoordinateBacklogRun { + pass_budget, + passes: pass_summaries, + final_status, + }; + + if emit_progress && pass_budget > 1 { + if let Some(status) = run.final_status.as_ref() { + println!( + "Final coordination health: {:?} | mode {:?} | backlog {} handoff(s) across {} lead(s)", + status.health, status.mode, status.backlog_messages, status.backlog_leads + ); + } + } + + Ok(run) +} + #[derive(Debug, Clone, Serialize)] struct CoordinateBacklogPassSummary { pass: usize, @@ -749,6 +837,14 @@ struct CoordinateBacklogRun { final_status: Option, } +#[derive(Debug, Clone, Serialize)] +struct MaintainCoordinationRun { + skipped: bool, + initial_status: session::manager::CoordinationStatus, + run: Option, + final_status: session::manager::CoordinationStatus, +} + fn summarize_coordinate_backlog( outcome: &session::manager::CoordinateBacklogOutcome, ) -> CoordinateBacklogPassSummary { @@ -1225,6 +1321,68 @@ mod tests { } } + #[test] + fn cli_parses_maintain_coordination_command() { + let cli = Cli::try_parse_from(["ecc", "maintain-coordination"]) + .expect("maintain-coordination should parse"); + + match cli.command { + Some(Commands::MaintainCoordination { + agent, + json, + check, + max_passes, + .. + }) => { + assert_eq!(agent, "claude"); + assert!(!json); + assert!(!check); + assert_eq!(max_passes, 5); + } + _ => panic!("expected maintain-coordination subcommand"), + } + } + + #[test] + fn cli_parses_maintain_coordination_json_flag() { + let cli = Cli::try_parse_from(["ecc", "maintain-coordination", "--json"]) + .expect("maintain-coordination --json should parse"); + + match cli.command { + Some(Commands::MaintainCoordination { + json, + check, + max_passes, + .. + }) => { + assert!(json); + assert!(!check); + assert_eq!(max_passes, 5); + } + _ => panic!("expected maintain-coordination subcommand"), + } + } + + #[test] + fn cli_parses_maintain_coordination_check_flag() { + let cli = Cli::try_parse_from(["ecc", "maintain-coordination", "--check"]) + .expect("maintain-coordination --check should parse"); + + match cli.command { + Some(Commands::MaintainCoordination { + json, + check, + max_passes, + .. + }) => { + assert!(!json); + assert!(check); + assert_eq!(max_passes, 5); + } + _ => panic!("expected maintain-coordination subcommand"), + } + } + #[test] fn format_coordination_status_emits_json() { let status = session::manager::CoordinationStatus {