feat: add ecc2 looping backlog coordination

This commit is contained in:
Affaan Mustafa
2026-04-08 13:22:02 -07:00
parent bcf8d0617e
commit d738089e3e

View File

@@ -101,6 +101,12 @@ enum Commands {
/// Maximum lead sessions to sweep in one pass
#[arg(long, default_value_t = 10)]
lead_limit: usize,
/// Keep coordinating until the backlog is healthy, saturated, or max passes is reached
#[arg(long)]
until_healthy: bool,
/// Maximum coordination passes when using --until-healthy
#[arg(long, default_value_t = 5)]
max_passes: usize,
},
/// Show global coordination, backlog, and daemon policy status
CoordinationStatus {
@@ -414,57 +420,57 @@ async fn main() -> Result<()> {
agent,
worktree: use_worktree,
lead_limit,
until_healthy,
max_passes,
}) => {
let outcome = session::manager::coordinate_backlog(
&db,
&cfg,
&agent,
use_worktree,
lead_limit,
)
.await?;
let total_processed: usize = outcome
.dispatched
.iter()
.map(|dispatch| dispatch.routed.len())
.sum();
let total_routed: usize = outcome
.dispatched
.iter()
.map(|dispatch| {
dispatch
.routed
.iter()
.filter(|item| session::manager::assignment_action_routes_work(item.action))
.count()
})
.sum();
let total_deferred = total_processed.saturating_sub(total_routed);
let total_rerouted: usize = outcome
.rebalanced
.iter()
.map(|rebalance| rebalance.rerouted.len())
.sum();
if total_routed == 0
&& total_rerouted == 0
&& outcome.remaining_backlog_sessions == 0
{
println!("Backlog already clear");
let pass_budget = if until_healthy {
max_passes.max(1)
} else {
println!(
"Coordinated backlog: processed {} handoff(s) across {} lead(s) ({} routed, {} deferred); rebalanced {} handoff(s) across {} lead(s); remaining {} handoff(s) across {} session(s) [{} absorbable, {} saturated]",
total_processed,
outcome.dispatched.len(),
total_routed,
total_deferred,
total_rerouted,
outcome.rebalanced.len(),
outcome.remaining_backlog_messages,
outcome.remaining_backlog_sessions,
outcome.remaining_absorbable_sessions,
outcome.remaining_saturated_sessions
1
};
let mut final_status = None;
for pass in 1..=pass_budget {
let outcome = session::manager::coordinate_backlog(
&db,
&cfg,
&agent,
use_worktree,
lead_limit,
)
.await?;
let summary = summarize_coordinate_backlog(&outcome);
if pass_budget > 1 {
println!("Pass {pass}/{pass_budget}: {summary}");
} else {
println!("{summary}");
}
let status = session::manager::get_coordination_status(&db, &cfg)?;
let should_stop = matches!(
status.health,
session::manager::CoordinationHealth::Healthy
| session::manager::CoordinationHealth::Saturated
| session::manager::CoordinationHealth::EscalationRequired
);
final_status = Some(status);
if should_stop {
break;
}
}
if pass_budget > 1 {
if let Some(status) = final_status {
println!(
"Final coordination health: {:?} | mode {:?} | backlog {} handoff(s) across {} lead(s)",
status.health,
status.mode,
status.backlog_messages,
status.backlog_leads
);
}
}
}
Some(Commands::CoordinationStatus { json, check }) => {
@@ -691,6 +697,49 @@ fn format_coordination_status(
Ok(status.to_string())
}
fn summarize_coordinate_backlog(outcome: &session::manager::CoordinateBacklogOutcome) -> String {
let total_processed: usize = outcome
.dispatched
.iter()
.map(|dispatch| dispatch.routed.len())
.sum();
let total_routed: usize = outcome
.dispatched
.iter()
.map(|dispatch| {
dispatch
.routed
.iter()
.filter(|item| session::manager::assignment_action_routes_work(item.action))
.count()
})
.sum();
let total_deferred = total_processed.saturating_sub(total_routed);
let total_rerouted: usize = outcome
.rebalanced
.iter()
.map(|rebalance| rebalance.rerouted.len())
.sum();
if total_routed == 0 && total_rerouted == 0 && outcome.remaining_backlog_sessions == 0 {
"Backlog already clear".to_string()
} else {
format!(
"Coordinated backlog: processed {} handoff(s) across {} lead(s) ({} routed, {} deferred); rebalanced {} handoff(s) across {} lead(s); remaining {} handoff(s) across {} session(s) [{} absorbable, {} saturated]",
total_processed,
outcome.dispatched.len(),
total_routed,
total_deferred,
total_rerouted,
outcome.rebalanced.len(),
outcome.remaining_backlog_messages,
outcome.remaining_backlog_sessions,
outcome.remaining_absorbable_sessions,
outcome.remaining_saturated_sessions
)
}
}
fn coordination_status_exit_code(status: &session::manager::CoordinationStatus) -> i32 {
match status.health {
session::manager::CoordinationHealth::Healthy => 0,
@@ -955,10 +1004,38 @@ mod tests {
Some(Commands::CoordinateBacklog {
agent,
lead_limit,
until_healthy,
max_passes,
..
}) => {
assert_eq!(agent, "claude");
assert_eq!(lead_limit, 7);
assert!(!until_healthy);
assert_eq!(max_passes, 5);
}
_ => panic!("expected coordinate-backlog subcommand"),
}
}
#[test]
fn cli_parses_coordinate_backlog_until_healthy_flags() {
let cli = Cli::try_parse_from([
"ecc",
"coordinate-backlog",
"--until-healthy",
"--max-passes",
"3",
])
.expect("coordinate-backlog looping flags should parse");
match cli.command {
Some(Commands::CoordinateBacklog {
until_healthy,
max_passes,
..
}) => {
assert!(until_healthy);
assert_eq!(max_passes, 3);
}
_ => panic!("expected coordinate-backlog subcommand"),
}
@@ -1093,6 +1170,20 @@ mod tests {
assert_eq!(coordination_status_exit_code(&saturated), 2);
}
#[test]
fn summarize_coordinate_backlog_reports_clear_state() {
let summary = summarize_coordinate_backlog(&session::manager::CoordinateBacklogOutcome {
dispatched: Vec::new(),
rebalanced: Vec::new(),
remaining_backlog_sessions: 0,
remaining_backlog_messages: 0,
remaining_absorbable_sessions: 0,
remaining_saturated_sessions: 0,
});
assert_eq!(summary, "Backlog already clear");
}
#[test]
fn cli_parses_rebalance_team_command() {
let cli = Cli::try_parse_from([