From cd948783749be3847c6235c9069522ea32dafcc0 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 13:13:46 -0700 Subject: [PATCH] feat: add ecc2 coordination status command --- ecc2/src/main.rs | 17 +++ ecc2/src/session/manager.rs | 225 ++++++++++++++++++++++++++++++++++++ 2 files changed, 242 insertions(+) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index b931e27b..70f469b4 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -102,6 +102,8 @@ enum Commands { #[arg(long, default_value_t = 10)] lead_limit: usize, }, + /// Show global coordination, backlog, and daemon policy status + CoordinationStatus, /// Rebalance unread handoffs across lead teams with backed-up delegates RebalanceAll { /// Agent type for routed delegates @@ -458,6 +460,10 @@ async fn main() -> Result<()> { ); } } + Some(Commands::CoordinationStatus) => { + let status = session::manager::get_coordination_status(&db, &cfg)?; + println!("{status}"); + } Some(Commands::RebalanceAll { agent, worktree: use_worktree, @@ -953,6 +959,17 @@ mod tests { } } + #[test] + fn cli_parses_coordination_status_command() { + let cli = Cli::try_parse_from(["ecc", "coordination-status"]) + .expect("coordination-status should parse"); + + match cli.command { + Some(Commands::CoordinationStatus) => {} + _ => panic!("expected coordination-status subcommand"), + } + } + #[test] fn cli_parses_rebalance_team_command() { let cli = Cli::try_parse_from([ diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index ee210c41..2d214141 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -1094,6 +1094,16 @@ pub struct CoordinateBacklogOutcome { pub remaining_saturated_sessions: usize, } +pub struct CoordinationStatus { + pub backlog_leads: usize, + pub backlog_messages: usize, + pub absorbable_sessions: usize, + pub saturated_sessions: usize, + pub auto_dispatch_enabled: bool, + pub auto_dispatch_limit_per_session: usize, + pub daemon_activity: super::store::DaemonActivity, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum AssignmentAction { Spawned, @@ -1106,6 +1116,25 @@ pub fn assignment_action_routes_work(action: AssignmentAction) -> bool { !matches!(action, AssignmentAction::DeferredSaturated) } +pub fn get_coordination_status(db: &StateStore, cfg: &Config) -> Result { + let targets = db.unread_task_handoff_targets(db.list_sessions()?.len().max(1))?; + let pressure = summarize_backlog_pressure(db, cfg, &cfg.default_agent, &targets)?; + let backlog_messages = targets + .iter() + .map(|(_, unread_count)| *unread_count) + .sum::(); + + Ok(CoordinationStatus { + backlog_leads: targets.len(), + backlog_messages, + absorbable_sessions: pressure.absorbable_sessions, + saturated_sessions: pressure.saturated_sessions, + auto_dispatch_enabled: cfg.auto_dispatch_unread_handoffs, + auto_dispatch_limit_per_session: cfg.auto_dispatch_limit_per_session, + daemon_activity: db.daemon_activity()?, + }) +} + #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] struct BacklogPressureSummary { absorbable_sessions: usize, @@ -1201,6 +1230,105 @@ impl fmt::Display for TeamStatus { } } +impl fmt::Display for CoordinationStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let stabilized = self.daemon_activity.stabilized_after_recovery_at(); + let mode = if self.daemon_activity.dispatch_cooloff_active() { + "rebalance-cooloff (chronic saturation)" + } else if self.daemon_activity.prefers_rebalance_first() { + "rebalance-first (chronic saturation)" + } else if stabilized.is_some() { + "dispatch-first (stabilized)" + } else { + "dispatch-first" + }; + + writeln!( + f, + "Global handoff backlog: {} lead(s) / {} handoff(s) [{} absorbable, {} saturated]", + self.backlog_leads, + self.backlog_messages, + self.absorbable_sessions, + self.saturated_sessions + )?; + writeln!( + f, + "Auto-dispatch: {} @ {}/lead", + if self.auto_dispatch_enabled { + "on" + } else { + "off" + }, + self.auto_dispatch_limit_per_session + )?; + writeln!(f, "Coordination mode: {mode}")?; + + if self.daemon_activity.chronic_saturation_streak > 0 { + writeln!( + f, + "Chronic saturation streak: {} cycle(s)", + self.daemon_activity.chronic_saturation_streak + )?; + } + + if self.daemon_activity.operator_escalation_required() { + writeln!( + f, + "Operator escalation: chronic saturation is not clearing" + )?; + } + + if let Some(cleared_at) = self.daemon_activity.chronic_saturation_cleared_at() { + writeln!( + f, + "Chronic saturation cleared: {}", + cleared_at.to_rfc3339() + )?; + } + + if let Some(stabilized_at) = stabilized { + writeln!(f, "Recovery stabilized: {}", stabilized_at.to_rfc3339())?; + } + + if let Some(last_dispatch_at) = self.daemon_activity.last_dispatch_at.as_ref() { + writeln!( + f, + "Last daemon dispatch: {} routed / {} deferred across {} lead(s) @ {}", + self.daemon_activity.last_dispatch_routed, + self.daemon_activity.last_dispatch_deferred, + self.daemon_activity.last_dispatch_leads, + last_dispatch_at.to_rfc3339() + )?; + } + + if stabilized.is_none() { + if let Some(last_recovery_dispatch_at) = + self.daemon_activity.last_recovery_dispatch_at.as_ref() + { + writeln!( + f, + "Last daemon recovery dispatch: {} handoff(s) across {} lead(s) @ {}", + self.daemon_activity.last_recovery_dispatch_routed, + self.daemon_activity.last_recovery_dispatch_leads, + last_recovery_dispatch_at.to_rfc3339() + )?; + } + + if let Some(last_rebalance_at) = self.daemon_activity.last_rebalance_at.as_ref() { + writeln!( + f, + "Last daemon rebalance: {} handoff(s) across {} lead(s) @ {}", + self.daemon_activity.last_rebalance_rerouted, + self.daemon_activity.last_rebalance_leads, + last_rebalance_at.to_rfc3339() + )?; + } + } + + Ok(()) + } +} + fn session_state_label(state: &SessionState) -> &'static str { match state { SessionState::Pending => "Pending", @@ -1283,6 +1411,23 @@ mod tests { } } + fn build_daemon_activity() -> super::super::store::DaemonActivity { + let now = Utc::now(); + super::super::store::DaemonActivity { + last_dispatch_at: Some(now), + last_dispatch_routed: 3, + last_dispatch_deferred: 1, + last_dispatch_leads: 2, + chronic_saturation_streak: 2, + last_recovery_dispatch_at: Some(now - Duration::seconds(5)), + last_recovery_dispatch_routed: 2, + last_recovery_dispatch_leads: 1, + last_rebalance_at: Some(now - Duration::seconds(2)), + last_rebalance_rerouted: 0, + last_rebalance_leads: 1, + } + } + fn init_git_repo(path: &Path) -> Result<()> { fs::create_dir_all(path)?; run_git(path, ["init", "-q"])?; @@ -2461,4 +2606,84 @@ mod tests { Ok(()) } + + #[test] + fn coordination_status_display_surfaces_mode_and_activity() { + let status = CoordinationStatus { + backlog_leads: 2, + backlog_messages: 5, + absorbable_sessions: 1, + saturated_sessions: 1, + auto_dispatch_enabled: true, + auto_dispatch_limit_per_session: 4, + daemon_activity: build_daemon_activity(), + }; + + let rendered = status.to_string(); + assert!( + rendered.contains( + "Global handoff backlog: 2 lead(s) / 5 handoff(s) [1 absorbable, 1 saturated]" + ) + ); + assert!(rendered.contains("Auto-dispatch: on @ 4/lead")); + assert!(rendered.contains("Coordination mode: rebalance-first (chronic saturation)")); + assert!(rendered.contains("Chronic saturation streak: 2 cycle(s)")); + assert!(rendered.contains("Last daemon dispatch: 3 routed / 1 deferred across 2 lead(s)")); + assert!(rendered.contains("Last daemon recovery dispatch: 2 handoff(s) across 1 lead(s)")); + assert!(rendered.contains("Last daemon rebalance: 0 handoff(s) across 1 lead(s)")); + } + + #[test] + fn coordination_status_summarizes_real_handoff_backlog() -> Result<()> { + let tempdir = TestDir::new("manager-coordination-status")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let cfg = Config { + max_parallel_sessions: 1, + ..build_config(tempdir.path()) + }; + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&build_session("source", SessionState::Running, now))?; + db.insert_session(&build_session("lead-a", SessionState::Running, now))?; + db.insert_session(&build_session("lead-b", SessionState::Running, now))?; + db.insert_session(&build_session( + "delegate-b", + SessionState::Idle, + now - Duration::seconds(1), + ))?; + + db.send_message( + "source", + "lead-a", + "{\"task\":\"clear docs\",\"context\":\"incoming\"}", + "task_handoff", + )?; + db.send_message( + "source", + "lead-b", + "{\"task\":\"review queue\",\"context\":\"incoming\"}", + "task_handoff", + )?; + db.send_message( + "lead-b", + "delegate-b", + "{\"task\":\"delegate queue\",\"context\":\"routed\"}", + "task_handoff", + )?; + + db.record_daemon_dispatch_pass(1, 1, 2)?; + + let status = get_coordination_status(&db, &cfg)?; + assert_eq!(status.backlog_leads, 3); + assert_eq!(status.backlog_messages, 3); + assert_eq!(status.absorbable_sessions, 2); + assert_eq!(status.saturated_sessions, 1); + assert_eq!(status.daemon_activity.last_dispatch_routed, 1); + assert_eq!(status.daemon_activity.last_dispatch_deferred, 1); + + Ok(()) + } }