From 868763dfa900141e52fcf5d97d479ec7805b1556 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 02:50:53 -0700 Subject: [PATCH] feat: report ecc2 remaining coordination backlog --- ecc2/src/main.rs | 36 ++++++++--------- ecc2/src/session/manager.rs | 80 +++++++++++++++++++++++++++++++++++++ ecc2/src/tui/dashboard.rs | 44 ++++++++------------ 3 files changed, 115 insertions(+), 45 deletions(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 4a6dfdff..d75344a3 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -366,7 +366,7 @@ async fn main() -> Result<()> { worktree: use_worktree, lead_limit, }) => { - let dispatch_outcomes = session::manager::auto_dispatch_backlog( + let outcome = session::manager::coordinate_backlog( &db, &cfg, &agent, @@ -374,31 +374,31 @@ async fn main() -> Result<()> { lead_limit, ) .await?; - let total_routed: usize = - dispatch_outcomes.iter().map(|outcome| outcome.routed.len()).sum(); - - let rebalance_outcomes = session::manager::rebalance_all_teams( - &db, - &cfg, - &agent, - use_worktree, - lead_limit, - ) - .await?; - let total_rerouted: usize = rebalance_outcomes + let total_routed: usize = outcome + .dispatched .iter() - .map(|outcome| outcome.rerouted.len()) + .map(|dispatch| dispatch.routed.len()) + .sum(); + let total_rerouted: usize = outcome + .rebalanced + .iter() + .map(|rebalance| rebalance.rerouted.len()) .sum(); - if total_routed == 0 && total_rerouted == 0 { + if total_routed == 0 + && total_rerouted == 0 + && outcome.remaining_backlog_sessions == 0 + { println!("Backlog already clear"); } else { println!( - "Coordinated backlog: dispatched {} handoff(s) across {} lead(s); rebalanced {} handoff(s) across {} lead(s)", + "Coordinated backlog: dispatched {} handoff(s) across {} lead(s); rebalanced {} handoff(s) across {} lead(s); remaining {} handoff(s) across {} session(s)", total_routed, - dispatch_outcomes.len(), + outcome.dispatched.len(), total_rerouted, - rebalance_outcomes.len() + outcome.rebalanced.len(), + outcome.remaining_backlog_messages, + outcome.remaining_backlog_sessions ); } } diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index bde00d61..5f0537a1 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -202,6 +202,30 @@ pub async fn rebalance_all_teams( Ok(outcomes) } +pub async fn coordinate_backlog( + db: &StateStore, + cfg: &Config, + agent_type: &str, + use_worktree: bool, + lead_limit: usize, +) -> Result { + let dispatched = auto_dispatch_backlog(db, cfg, agent_type, use_worktree, lead_limit).await?; + let rebalanced = rebalance_all_teams(db, cfg, agent_type, use_worktree, lead_limit).await?; + let remaining_targets = db.unread_task_handoff_targets(db.list_sessions()?.len().max(1))?; + let remaining_backlog_sessions = remaining_targets.len(); + let remaining_backlog_messages = remaining_targets + .iter() + .map(|(_, unread_count)| *unread_count) + .sum(); + + Ok(CoordinateBacklogOutcome { + dispatched, + rebalanced, + remaining_backlog_sessions, + remaining_backlog_messages, + }) +} + pub async fn rebalance_team_backlog( db: &StateStore, cfg: &Config, @@ -1006,6 +1030,13 @@ pub struct LeadRebalanceOutcome { pub rerouted: Vec, } +pub struct CoordinateBacklogOutcome { + pub dispatched: Vec, + pub rebalanced: Vec, + pub remaining_backlog_sessions: usize, + pub remaining_backlog_messages: usize, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum AssignmentAction { Spawned, @@ -1899,6 +1930,55 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "current_thread")] + async fn coordinate_backlog_reports_remaining_backlog_after_limited_pass() -> Result<()> { + let tempdir = TestDir::new("manager-coordinate-backlog")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let mut cfg = build_config(tempdir.path()); + cfg.auto_dispatch_limit_per_session = 5; + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + for lead_id in ["lead-a", "lead-b"] { + db.insert_session(&Session { + id: lead_id.to_string(), + task: format!("{lead_id} task"), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(42), + worktree: None, + created_at: now - Duration::minutes(3), + updated_at: now - Duration::minutes(3), + metrics: SessionMetrics::default(), + })?; + } + + db.send_message( + "planner", + "lead-a", + "{\"task\":\"Review auth\",\"context\":\"Inbound\"}", + "task_handoff", + )?; + db.send_message( + "planner", + "lead-b", + "{\"task\":\"Review billing\",\"context\":\"Inbound\"}", + "task_handoff", + )?; + + let outcome = coordinate_backlog(&db, &cfg, "claude", true, 1).await?; + + assert_eq!(outcome.dispatched.len(), 1); + assert_eq!(outcome.rebalanced.len(), 0); + assert_eq!(outcome.remaining_backlog_sessions, 2); + assert_eq!(outcome.remaining_backlog_messages, 2); + + Ok(()) + } + #[tokio::test(flavor = "current_thread")] async fn rebalance_team_backlog_moves_work_off_backed_up_delegate() -> Result<()> { let tempdir = TestDir::new("manager-rebalance-team")?; diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index 621121e4..cdf85cfa 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -892,7 +892,7 @@ impl Dashboard { let agent = self.cfg.default_agent.clone(); let lead_limit = self.sessions.len().max(1); - let dispatch_outcomes = match manager::auto_dispatch_backlog( + let outcome = match manager::coordinate_backlog( &self.db, &self.cfg, &agent, @@ -903,32 +903,20 @@ impl Dashboard { { Ok(outcomes) => outcomes, Err(error) => { - tracing::warn!("Failed to coordinate backlog dispatch from dashboard: {error}"); - self.set_operator_note(format!("global coordinate failed during dispatch: {error}")); + tracing::warn!("Failed to coordinate backlog from dashboard: {error}"); + self.set_operator_note(format!("global coordinate failed: {error}")); return; } }; - let total_routed: usize = dispatch_outcomes.iter().map(|outcome| outcome.routed.len()).sum(); - - let rebalance_outcomes = match manager::rebalance_all_teams( - &self.db, - &self.cfg, - &agent, - true, - lead_limit, - ) - .await - { - Ok(outcomes) => outcomes, - Err(error) => { - tracing::warn!("Failed to coordinate backlog rebalance from dashboard: {error}"); - self.set_operator_note(format!("global coordinate failed during rebalance: {error}")); - return; - } - }; - let total_rerouted: usize = rebalance_outcomes + let total_routed: usize = outcome + .dispatched .iter() - .map(|outcome| outcome.rerouted.len()) + .map(|dispatch| dispatch.routed.len()) + .sum(); + let total_rerouted: usize = outcome + .rebalanced + .iter() + .map(|rebalance| rebalance.rerouted.len()) .sum(); let selected_session_id = self @@ -944,15 +932,17 @@ impl Dashboard { self.sync_selected_lineage(); self.refresh_logs(); - if total_routed == 0 && total_rerouted == 0 { + if total_routed == 0 && total_rerouted == 0 && outcome.remaining_backlog_sessions == 0 { self.set_operator_note("backlog already clear".to_string()); } else { self.set_operator_note(format!( - "coordinated backlog: dispatched {} handoff(s) across {} lead(s), rebalanced {} handoff(s) across {} lead(s)", + "coordinated backlog: dispatched {} across {} lead(s), rebalanced {} across {} lead(s), remaining {} across {} session(s)", total_routed, - dispatch_outcomes.len(), + outcome.dispatched.len(), total_rerouted, - rebalance_outcomes.len() + outcome.rebalanced.len(), + outcome.remaining_backlog_messages, + outcome.remaining_backlog_sessions )); } }