From a3f600e25fcc47833b26c7ad7b3379fd6806d119 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 02:57:26 -0700 Subject: [PATCH] feat: classify ecc2 remaining coordination pressure --- ecc2/src/main.rs | 6 +- ecc2/src/session/manager.rs | 109 ++++++++++++++++++++++++++++++++++++ ecc2/src/tui/dashboard.rs | 6 +- 3 files changed, 117 insertions(+), 4 deletions(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index d75344a3..2032ab67 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -392,13 +392,15 @@ async fn main() -> Result<()> { println!("Backlog already clear"); } else { println!( - "Coordinated backlog: dispatched {} handoff(s) across {} lead(s); rebalanced {} handoff(s) across {} lead(s); remaining {} handoff(s) across {} session(s)", + "Coordinated backlog: dispatched {} handoff(s) across {} lead(s); rebalanced {} handoff(s) across {} lead(s); remaining {} handoff(s) across {} session(s) [{} absorbable, {} saturated]", total_routed, outcome.dispatched.len(), total_rerouted, outcome.rebalanced.len(), outcome.remaining_backlog_messages, - outcome.remaining_backlog_sessions + outcome.remaining_backlog_sessions, + outcome.remaining_absorbable_sessions, + outcome.remaining_saturated_sessions ); } } diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 5f0537a1..889e3a87 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -212,6 +212,7 @@ pub async fn coordinate_backlog( let dispatched = auto_dispatch_backlog(db, cfg, agent_type, use_worktree, lead_limit).await?; let rebalanced = rebalance_all_teams(db, cfg, agent_type, use_worktree, lead_limit).await?; let remaining_targets = db.unread_task_handoff_targets(db.list_sessions()?.len().max(1))?; + let pressure = summarize_backlog_pressure(db, cfg, agent_type, &remaining_targets)?; let remaining_backlog_sessions = remaining_targets.len(); let remaining_backlog_messages = remaining_targets .iter() @@ -223,6 +224,8 @@ pub async fn coordinate_backlog( rebalanced, remaining_backlog_sessions, remaining_backlog_messages, + remaining_absorbable_sessions: pressure.absorbable_sessions, + remaining_saturated_sessions: pressure.saturated_sessions, }) } @@ -811,6 +814,33 @@ fn direct_delegate_sessions(db: &StateStore, lead_id: &str, agent_type: &str) -> Ok(sessions) } +fn summarize_backlog_pressure( + db: &StateStore, + cfg: &Config, + agent_type: &str, + targets: &[(String, usize)], +) -> Result { + let unread_counts = db.unread_message_counts()?; + let mut summary = BacklogPressureSummary::default(); + + for (session_id, _) in targets { + let delegates = direct_delegate_sessions(db, session_id, agent_type)?; + let has_clear_idle_delegate = delegates.iter().any(|delegate| { + delegate.state == SessionState::Idle + && unread_counts.get(&delegate.id).copied().unwrap_or(0) == 0 + }); + let has_capacity = delegates.len() < cfg.max_parallel_sessions; + + if has_clear_idle_delegate || has_capacity { + summary.absorbable_sessions += 1; + } else { + summary.saturated_sessions += 1; + } + } + + Ok(summary) +} + fn send_task_handoff( db: &StateStore, from_session: &Session, @@ -1035,6 +1065,8 @@ pub struct CoordinateBacklogOutcome { pub rebalanced: Vec, pub remaining_backlog_sessions: usize, pub remaining_backlog_messages: usize, + pub remaining_absorbable_sessions: usize, + pub remaining_saturated_sessions: usize, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -1044,6 +1076,12 @@ pub enum AssignmentAction { ReusedActive, } +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +struct BacklogPressureSummary { + absorbable_sessions: usize, + saturated_sessions: usize, +} + struct DelegatedSessionSummary { depth: usize, unread_messages: usize, @@ -1975,6 +2013,77 @@ mod tests { assert_eq!(outcome.rebalanced.len(), 0); assert_eq!(outcome.remaining_backlog_sessions, 2); assert_eq!(outcome.remaining_backlog_messages, 2); + assert_eq!(outcome.remaining_absorbable_sessions, 2); + assert_eq!(outcome.remaining_saturated_sessions, 0); + + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn coordinate_backlog_classifies_remaining_saturated_pressure() -> Result<()> { + let tempdir = TestDir::new("manager-coordinate-saturated")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let mut cfg = build_config(tempdir.path()); + cfg.max_parallel_sessions = 1; + cfg.auto_dispatch_limit_per_session = 1; + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "worker".to_string(), + task: "worker task".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(42), + worktree: None, + created_at: now - Duration::minutes(3), + updated_at: now - Duration::minutes(3), + metrics: SessionMetrics::default(), + })?; + + db.insert_session(&Session { + id: "worker-child".to_string(), + task: "delegate task".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(43), + worktree: None, + created_at: now - Duration::minutes(2), + updated_at: now - Duration::minutes(2), + metrics: SessionMetrics::default(), + })?; + + db.send_message( + "worker", + "worker-child", + "{\"task\":\"seed delegate\",\"context\":\"Delegated from worker\"}", + "task_handoff", + )?; + let _ = db.mark_messages_read("worker-child")?; + + db.send_message( + "planner", + "worker", + "{\"task\":\"task-a\",\"context\":\"Inbound\"}", + "task_handoff", + )?; + db.send_message( + "planner", + "worker", + "{\"task\":\"task-b\",\"context\":\"Inbound\"}", + "task_handoff", + )?; + + let outcome = coordinate_backlog(&db, &cfg, "claude", true, 10).await?; + + assert_eq!(outcome.remaining_backlog_sessions, 1); + assert_eq!(outcome.remaining_backlog_messages, 2); + assert_eq!(outcome.remaining_absorbable_sessions, 0); + assert_eq!(outcome.remaining_saturated_sessions, 1); Ok(()) } diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index cdf85cfa..c4bdea08 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -936,13 +936,15 @@ impl Dashboard { self.set_operator_note("backlog already clear".to_string()); } else { self.set_operator_note(format!( - "coordinated backlog: dispatched {} across {} lead(s), rebalanced {} across {} lead(s), remaining {} across {} session(s)", + "coordinated backlog: dispatched {} across {} lead(s), rebalanced {} across {} lead(s), remaining {} across {} session(s) [{} absorbable, {} saturated]", total_routed, outcome.dispatched.len(), total_rerouted, outcome.rebalanced.len(), outcome.remaining_backlog_messages, - outcome.remaining_backlog_sessions + outcome.remaining_backlog_sessions, + outcome.remaining_absorbable_sessions, + outcome.remaining_saturated_sessions )); } }