feat: add ecc2 coordinate backlog json output

This commit is contained in:
Affaan Mustafa
2026-04-08 13:24:32 -07:00
parent d738089e3e
commit 2b7b717664

View File

@@ -7,6 +7,7 @@ mod worktree;
use anyhow::Result; use anyhow::Result;
use clap::Parser; use clap::Parser;
use serde::Serialize;
use std::path::PathBuf; use std::path::PathBuf;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
@@ -101,6 +102,9 @@ enum Commands {
/// Maximum lead sessions to sweep in one pass /// Maximum lead sessions to sweep in one pass
#[arg(long, default_value_t = 10)] #[arg(long, default_value_t = 10)]
lead_limit: usize, lead_limit: usize,
/// Emit machine-readable JSON instead of the human summary
#[arg(long)]
json: bool,
/// Keep coordinating until the backlog is healthy, saturated, or max passes is reached /// Keep coordinating until the backlog is healthy, saturated, or max passes is reached
#[arg(long)] #[arg(long)]
until_healthy: bool, until_healthy: bool,
@@ -420,6 +424,7 @@ async fn main() -> Result<()> {
agent, agent,
worktree: use_worktree, worktree: use_worktree,
lead_limit, lead_limit,
json,
until_healthy, until_healthy,
max_passes, max_passes,
}) => { }) => {
@@ -429,6 +434,7 @@ async fn main() -> Result<()> {
1 1
}; };
let mut final_status = None; let mut final_status = None;
let mut pass_summaries = Vec::new();
for pass in 1..=pass_budget { for pass in 1..=pass_budget {
let outcome = session::manager::coordinate_backlog( let outcome = session::manager::coordinate_backlog(
@@ -439,12 +445,16 @@ async fn main() -> Result<()> {
lead_limit, lead_limit,
) )
.await?; .await?;
let summary = summarize_coordinate_backlog(&outcome); let mut summary = summarize_coordinate_backlog(&outcome);
summary.pass = pass;
pass_summaries.push(summary.clone());
if pass_budget > 1 { if !json {
println!("Pass {pass}/{pass_budget}: {summary}"); if pass_budget > 1 {
} else { println!("Pass {pass}/{pass_budget}: {}", summary.message);
println!("{summary}"); } else {
println!("{}", summary.message);
}
} }
let status = session::manager::get_coordination_status(&db, &cfg)?; let status = session::manager::get_coordination_status(&db, &cfg)?;
@@ -461,7 +471,14 @@ async fn main() -> Result<()> {
} }
} }
if pass_budget > 1 { if json {
let payload = CoordinateBacklogRun {
pass_budget,
passes: pass_summaries,
final_status,
};
println!("{}", serde_json::to_string_pretty(&payload)?);
} else if pass_budget > 1 {
if let Some(status) = final_status { if let Some(status) = final_status {
println!( println!(
"Final coordination health: {:?} | mode {:?} | backlog {} handoff(s) across {} lead(s)", "Final coordination health: {:?} | mode {:?} | backlog {} handoff(s) across {} lead(s)",
@@ -697,7 +714,32 @@ fn format_coordination_status(
Ok(status.to_string()) Ok(status.to_string())
} }
fn summarize_coordinate_backlog(outcome: &session::manager::CoordinateBacklogOutcome) -> String { #[derive(Debug, Clone, Serialize)]
struct CoordinateBacklogPassSummary {
pass: usize,
processed: usize,
routed: usize,
deferred: usize,
rerouted: usize,
dispatched_leads: usize,
rebalanced_leads: usize,
remaining_backlog_sessions: usize,
remaining_backlog_messages: usize,
remaining_absorbable_sessions: usize,
remaining_saturated_sessions: usize,
message: String,
}
#[derive(Debug, Clone, Serialize)]
struct CoordinateBacklogRun {
pass_budget: usize,
passes: Vec<CoordinateBacklogPassSummary>,
final_status: Option<session::manager::CoordinationStatus>,
}
fn summarize_coordinate_backlog(
outcome: &session::manager::CoordinateBacklogOutcome,
) -> CoordinateBacklogPassSummary {
let total_processed: usize = outcome let total_processed: usize = outcome
.dispatched .dispatched
.iter() .iter()
@@ -721,7 +763,7 @@ fn summarize_coordinate_backlog(outcome: &session::manager::CoordinateBacklogOut
.map(|rebalance| rebalance.rerouted.len()) .map(|rebalance| rebalance.rerouted.len())
.sum(); .sum();
if total_routed == 0 && total_rerouted == 0 && outcome.remaining_backlog_sessions == 0 { let message = if total_routed == 0 && total_rerouted == 0 && outcome.remaining_backlog_sessions == 0 {
"Backlog already clear".to_string() "Backlog already clear".to_string()
} else { } else {
format!( format!(
@@ -737,6 +779,21 @@ fn summarize_coordinate_backlog(outcome: &session::manager::CoordinateBacklogOut
outcome.remaining_absorbable_sessions, outcome.remaining_absorbable_sessions,
outcome.remaining_saturated_sessions outcome.remaining_saturated_sessions
) )
};
CoordinateBacklogPassSummary {
pass: 0,
processed: total_processed,
routed: total_routed,
deferred: total_deferred,
rerouted: total_rerouted,
dispatched_leads: outcome.dispatched.len(),
rebalanced_leads: outcome.rebalanced.len(),
remaining_backlog_sessions: outcome.remaining_backlog_sessions,
remaining_backlog_messages: outcome.remaining_backlog_messages,
remaining_absorbable_sessions: outcome.remaining_absorbable_sessions,
remaining_saturated_sessions: outcome.remaining_saturated_sessions,
message,
} }
} }
@@ -1030,10 +1087,12 @@ mod tests {
match cli.command { match cli.command {
Some(Commands::CoordinateBacklog { Some(Commands::CoordinateBacklog {
json,
until_healthy, until_healthy,
max_passes, max_passes,
.. ..
}) => { }) => {
assert!(!json);
assert!(until_healthy); assert!(until_healthy);
assert_eq!(max_passes, 3); assert_eq!(max_passes, 3);
} }
@@ -1041,6 +1100,26 @@ mod tests {
} }
} }
#[test]
fn cli_parses_coordinate_backlog_json_flag() {
let cli = Cli::try_parse_from(["ecc", "coordinate-backlog", "--json"])
.expect("coordinate-backlog --json should parse");
match cli.command {
Some(Commands::CoordinateBacklog {
json,
until_healthy,
max_passes,
..
}) => {
assert!(json);
assert!(!until_healthy);
assert_eq!(max_passes, 5);
}
_ => panic!("expected coordinate-backlog subcommand"),
}
}
#[test] #[test]
fn cli_parses_rebalance_all_command() { fn cli_parses_rebalance_all_command() {
let cli = Cli::try_parse_from([ let cli = Cli::try_parse_from([
@@ -1181,7 +1260,55 @@ mod tests {
remaining_saturated_sessions: 0, remaining_saturated_sessions: 0,
}); });
assert_eq!(summary, "Backlog already clear"); assert_eq!(summary.message, "Backlog already clear");
assert_eq!(summary.processed, 0);
assert_eq!(summary.rerouted, 0);
}
#[test]
fn summarize_coordinate_backlog_structures_counts() {
let summary = summarize_coordinate_backlog(&session::manager::CoordinateBacklogOutcome {
dispatched: vec![session::manager::LeadDispatchOutcome {
lead_session_id: "lead".into(),
unread_count: 2,
routed: vec![
session::manager::InboxDrainOutcome {
message_id: 1,
task: "one".into(),
session_id: "a".into(),
action: session::manager::AssignmentAction::Spawned,
},
session::manager::InboxDrainOutcome {
message_id: 2,
task: "two".into(),
session_id: "lead".into(),
action: session::manager::AssignmentAction::DeferredSaturated,
},
],
}],
rebalanced: vec![session::manager::LeadRebalanceOutcome {
lead_session_id: "lead".into(),
rerouted: vec![session::manager::RebalanceOutcome {
from_session_id: "a".into(),
message_id: 3,
task: "three".into(),
session_id: "b".into(),
action: session::manager::AssignmentAction::ReusedIdle,
}],
}],
remaining_backlog_sessions: 1,
remaining_backlog_messages: 2,
remaining_absorbable_sessions: 1,
remaining_saturated_sessions: 0,
});
assert_eq!(summary.processed, 2);
assert_eq!(summary.routed, 1);
assert_eq!(summary.deferred, 1);
assert_eq!(summary.rerouted, 1);
assert_eq!(summary.dispatched_leads, 1);
assert_eq!(summary.rebalanced_leads, 1);
assert_eq!(summary.remaining_backlog_messages, 2);
} }
#[test] #[test]