From 2b7b71766474d5baf48ab2a50bd9ccaa33dc23da Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 13:24:32 -0700 Subject: [PATCH] feat: add ecc2 coordinate backlog json output --- ecc2/src/main.rs | 145 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 136 insertions(+), 9 deletions(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index f568abf7..eef5caca 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -7,6 +7,7 @@ mod worktree; use anyhow::Result; use clap::Parser; +use serde::Serialize; use std::path::PathBuf; use tracing_subscriber::EnvFilter; @@ -101,6 +102,9 @@ enum Commands { /// Maximum lead sessions to sweep in one pass #[arg(long, default_value_t = 10)] lead_limit: usize, + /// Emit machine-readable JSON instead of the human summary + #[arg(long)] + json: bool, /// Keep coordinating until the backlog is healthy, saturated, or max passes is reached #[arg(long)] until_healthy: bool, @@ -420,6 +424,7 @@ async fn main() -> Result<()> { agent, worktree: use_worktree, lead_limit, + json, until_healthy, max_passes, }) => { @@ -429,6 +434,7 @@ async fn main() -> Result<()> { 1 }; let mut final_status = None; + let mut pass_summaries = Vec::new(); for pass in 1..=pass_budget { let outcome = session::manager::coordinate_backlog( @@ -439,12 +445,16 @@ async fn main() -> Result<()> { lead_limit, ) .await?; - let summary = summarize_coordinate_backlog(&outcome); + let mut summary = summarize_coordinate_backlog(&outcome); + summary.pass = pass; + pass_summaries.push(summary.clone()); - if pass_budget > 1 { - println!("Pass {pass}/{pass_budget}: {summary}"); - } else { - println!("{summary}"); + if !json { + if pass_budget > 1 { + println!("Pass {pass}/{pass_budget}: {}", summary.message); + } else { + println!("{}", summary.message); + } } let status = session::manager::get_coordination_status(&db, &cfg)?; @@ -461,7 +471,14 @@ async fn main() -> Result<()> { } } - if pass_budget > 1 { + if json { + let payload = CoordinateBacklogRun { + pass_budget, + passes: pass_summaries, + final_status, + }; + println!("{}", serde_json::to_string_pretty(&payload)?); + } else if pass_budget > 1 { if let Some(status) = final_status { println!( "Final coordination health: {:?} | mode {:?} | backlog {} handoff(s) across {} lead(s)", @@ -697,7 +714,32 @@ fn format_coordination_status( Ok(status.to_string()) } -fn summarize_coordinate_backlog(outcome: &session::manager::CoordinateBacklogOutcome) -> String { +#[derive(Debug, Clone, Serialize)] +struct CoordinateBacklogPassSummary { + pass: usize, + processed: usize, + routed: usize, + deferred: usize, + rerouted: usize, + dispatched_leads: usize, + rebalanced_leads: usize, + remaining_backlog_sessions: usize, + remaining_backlog_messages: usize, + remaining_absorbable_sessions: usize, + remaining_saturated_sessions: usize, + message: String, +} + +#[derive(Debug, Clone, Serialize)] +struct CoordinateBacklogRun { + pass_budget: usize, + passes: Vec, + final_status: Option, +} + +fn summarize_coordinate_backlog( + outcome: &session::manager::CoordinateBacklogOutcome, +) -> CoordinateBacklogPassSummary { let total_processed: usize = outcome .dispatched .iter() @@ -721,7 +763,7 @@ fn summarize_coordinate_backlog(outcome: &session::manager::CoordinateBacklogOut .map(|rebalance| rebalance.rerouted.len()) .sum(); - if total_routed == 0 && total_rerouted == 0 && outcome.remaining_backlog_sessions == 0 { + let message = if total_routed == 0 && total_rerouted == 0 && outcome.remaining_backlog_sessions == 0 { "Backlog already clear".to_string() } else { format!( @@ -737,6 +779,21 @@ fn summarize_coordinate_backlog(outcome: &session::manager::CoordinateBacklogOut outcome.remaining_absorbable_sessions, outcome.remaining_saturated_sessions ) + }; + + CoordinateBacklogPassSummary { + pass: 0, + processed: total_processed, + routed: total_routed, + deferred: total_deferred, + rerouted: total_rerouted, + dispatched_leads: outcome.dispatched.len(), + rebalanced_leads: outcome.rebalanced.len(), + remaining_backlog_sessions: outcome.remaining_backlog_sessions, + remaining_backlog_messages: outcome.remaining_backlog_messages, + remaining_absorbable_sessions: outcome.remaining_absorbable_sessions, + remaining_saturated_sessions: outcome.remaining_saturated_sessions, + message, } } @@ -1030,10 +1087,12 @@ mod tests { match cli.command { Some(Commands::CoordinateBacklog { + json, until_healthy, max_passes, .. }) => { + assert!(!json); assert!(until_healthy); assert_eq!(max_passes, 3); } @@ -1041,6 +1100,26 @@ mod tests { } } + #[test] + fn cli_parses_coordinate_backlog_json_flag() { + let cli = Cli::try_parse_from(["ecc", "coordinate-backlog", "--json"]) + .expect("coordinate-backlog --json should parse"); + + match cli.command { + Some(Commands::CoordinateBacklog { + json, + until_healthy, + max_passes, + .. + }) => { + assert!(json); + assert!(!until_healthy); + assert_eq!(max_passes, 5); + } + _ => panic!("expected coordinate-backlog subcommand"), + } + } + #[test] fn cli_parses_rebalance_all_command() { let cli = Cli::try_parse_from([ @@ -1181,7 +1260,55 @@ mod tests { remaining_saturated_sessions: 0, }); - assert_eq!(summary, "Backlog already clear"); + assert_eq!(summary.message, "Backlog already clear"); + assert_eq!(summary.processed, 0); + assert_eq!(summary.rerouted, 0); + } + + #[test] + fn summarize_coordinate_backlog_structures_counts() { + let summary = summarize_coordinate_backlog(&session::manager::CoordinateBacklogOutcome { + dispatched: vec![session::manager::LeadDispatchOutcome { + lead_session_id: "lead".into(), + unread_count: 2, + routed: vec![ + session::manager::InboxDrainOutcome { + message_id: 1, + task: "one".into(), + session_id: "a".into(), + action: session::manager::AssignmentAction::Spawned, + }, + session::manager::InboxDrainOutcome { + message_id: 2, + task: "two".into(), + session_id: "lead".into(), + action: session::manager::AssignmentAction::DeferredSaturated, + }, + ], + }], + rebalanced: vec![session::manager::LeadRebalanceOutcome { + lead_session_id: "lead".into(), + rerouted: vec![session::manager::RebalanceOutcome { + from_session_id: "a".into(), + message_id: 3, + task: "three".into(), + session_id: "b".into(), + action: session::manager::AssignmentAction::ReusedIdle, + }], + }], + remaining_backlog_sessions: 1, + remaining_backlog_messages: 2, + remaining_absorbable_sessions: 1, + remaining_saturated_sessions: 0, + }); + + assert_eq!(summary.processed, 2); + assert_eq!(summary.routed, 1); + assert_eq!(summary.deferred, 1); + assert_eq!(summary.rerouted, 1); + assert_eq!(summary.dispatched_leads, 1); + assert_eq!(summary.rebalanced_leads, 1); + assert_eq!(summary.remaining_backlog_messages, 2); } #[test]