From 91e145338fefd0a2403c4da57422fee5a28f7e02 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 03:06:19 -0700 Subject: [PATCH] feat: defer ecc2 handoffs on saturated teams --- ecc2/src/main.rs | 95 ++++++++++++++++----- ecc2/src/session/manager.rs | 163 +++++++++++++++++++++++++++++++++--- ecc2/src/tui/dashboard.rs | 44 ++++++++-- 3 files changed, 264 insertions(+), 38 deletions(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 2032ab67..b931e27b 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -279,16 +279,25 @@ async fn main() -> Result<()> { use_worktree, ) .await?; - println!( - "Assignment routed: {} -> {} ({})", - short_session(&lead_id), - short_session(&outcome.session_id), - match outcome.action { - session::manager::AssignmentAction::Spawned => "spawned", - session::manager::AssignmentAction::ReusedIdle => "reused-idle", - session::manager::AssignmentAction::ReusedActive => "reused-active", - } - ); + if session::manager::assignment_action_routes_work(outcome.action) { + println!( + "Assignment routed: {} -> {} ({})", + short_session(&lead_id), + short_session(&outcome.session_id), + match outcome.action { + session::manager::AssignmentAction::Spawned => "spawned", + session::manager::AssignmentAction::ReusedIdle => "reused-idle", + session::manager::AssignmentAction::ReusedActive => "reused-active", + session::manager::AssignmentAction::DeferredSaturated => unreachable!(), + } + ); + } else { + println!( + "Assignment deferred: {} is saturated; task stayed in {} inbox", + short_session(&lead_id), + short_session(&lead_id), + ); + } } Some(Commands::DrainInbox { session_id, @@ -309,10 +318,18 @@ async fn main() -> Result<()> { if outcomes.is_empty() { println!("No unread task handoffs for {}", short_session(&lead_id)); } else { + let routed_count = outcomes + .iter() + .filter(|outcome| session::manager::assignment_action_routes_work(outcome.action)) + .count(); + let deferred_count = outcomes.len().saturating_sub(routed_count); println!( - "Routed {} inbox task handoff(s) from {}", + "Processed {} inbox task handoff(s) from {} ({} routed, {} deferred)", outcomes.len(), short_session(&lead_id) + , + routed_count, + deferred_count ); for outcome in outcomes { println!( @@ -323,6 +340,9 @@ async fn main() -> Result<()> { session::manager::AssignmentAction::Spawned => "spawned", session::manager::AssignmentAction::ReusedIdle => "reused-idle", session::manager::AssignmentAction::ReusedActive => "reused-active", + session::manager::AssignmentAction::DeferredSaturated => { + "deferred-saturated" + } }, outcome.task ); @@ -345,18 +365,38 @@ async fn main() -> Result<()> { if outcomes.is_empty() { println!("No unread task handoff backlog found"); } else { - let total_routed: usize = outcomes.iter().map(|outcome| outcome.routed.len()).sum(); + let total_processed: usize = outcomes.iter().map(|outcome| outcome.routed.len()).sum(); + let total_routed: usize = outcomes + .iter() + .map(|outcome| { + outcome + .routed + .iter() + .filter(|item| session::manager::assignment_action_routes_work(item.action)) + .count() + }) + .sum(); + let total_deferred = total_processed.saturating_sub(total_routed); println!( - "Auto-dispatched {} task handoff(s) across {} lead session(s)", + "Auto-dispatch processed {} task handoff(s) across {} lead session(s) ({} routed, {} deferred)", + total_processed, + outcomes.len(), total_routed, - outcomes.len() + total_deferred ); for outcome in outcomes { + let routed = outcome + .routed + .iter() + .filter(|item| session::manager::assignment_action_routes_work(item.action)) + .count(); + let deferred = outcome.routed.len().saturating_sub(routed); println!( - "- {} | unread {} | routed {}", + "- {} | unread {} | routed {} | deferred {}", short_session(&outcome.lead_session_id), outcome.unread_count, - outcome.routed.len() + routed, + deferred ); } } @@ -374,11 +414,23 @@ async fn main() -> Result<()> { lead_limit, ) .await?; - let total_routed: usize = outcome + let total_processed: usize = outcome .dispatched .iter() .map(|dispatch| dispatch.routed.len()) .sum(); + let total_routed: usize = outcome + .dispatched + .iter() + .map(|dispatch| { + dispatch + .routed + .iter() + .filter(|item| session::manager::assignment_action_routes_work(item.action)) + .count() + }) + .sum(); + let total_deferred = total_processed.saturating_sub(total_routed); let total_rerouted: usize = outcome .rebalanced .iter() @@ -392,9 +444,11 @@ async fn main() -> Result<()> { println!("Backlog already clear"); } else { println!( - "Coordinated backlog: dispatched {} handoff(s) across {} lead(s); rebalanced {} handoff(s) across {} lead(s); remaining {} handoff(s) across {} session(s) [{} absorbable, {} saturated]", - total_routed, + "Coordinated backlog: processed {} handoff(s) across {} lead(s) ({} routed, {} deferred); rebalanced {} handoff(s) across {} lead(s); remaining {} handoff(s) across {} session(s) [{} absorbable, {} saturated]", + total_processed, outcome.dispatched.len(), + total_routed, + total_deferred, total_rerouted, outcome.rebalanced.len(), outcome.remaining_backlog_messages, @@ -470,6 +524,9 @@ async fn main() -> Result<()> { session::manager::AssignmentAction::Spawned => "spawned", session::manager::AssignmentAction::ReusedIdle => "reused-idle", session::manager::AssignmentAction::ReusedActive => "reused-active", + session::manager::AssignmentAction::DeferredSaturated => { + "deferred-saturated" + } }, outcome.task ); diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 889e3a87..20e7db1a 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -121,7 +121,9 @@ pub async fn drain_inbox( ) .await?; - let _ = db.mark_message_read(message.id)?; + if assignment_action_routes_work(outcome.action) { + let _ = db.mark_message_read(message.id)?; + } outcomes.push(InboxDrainOutcome { message_id: message.id, task, @@ -461,7 +463,7 @@ async fn assign_session_in_dir_with_runner_program( }); } - if let Some(idle_delegate) = delegates + if let Some(_idle_delegate) = delegates .iter() .filter(|session| session.state == SessionState::Idle) .min_by_key(|session| { @@ -471,16 +473,9 @@ async fn assign_session_in_dir_with_runner_program( ) }) { - send_task_handoff( - db, - &lead, - &idle_delegate.id, - task, - "reused idle delegate with existing inbox backlog", - )?; return Ok(AssignmentOutcome { - session_id: idle_delegate.id.clone(), - action: AssignmentAction::ReusedIdle, + session_id: lead.id.clone(), + action: AssignmentAction::DeferredSaturated, }); } @@ -494,6 +489,13 @@ async fn assign_session_in_dir_with_runner_program( ) }) { + if unread_counts.get(&active_delegate.id).copied().unwrap_or(0) > 0 { + return Ok(AssignmentOutcome { + session_id: lead.id.clone(), + action: AssignmentAction::DeferredSaturated, + }); + } + send_task_handoff( db, &lead, @@ -1074,6 +1076,11 @@ pub enum AssignmentAction { Spawned, ReusedIdle, ReusedActive, + DeferredSaturated, +} + +pub fn assignment_action_routes_work(action: AssignmentAction) -> bool { + !matches!(action, AssignmentAction::DeferredSaturated) } #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] @@ -1862,6 +1869,73 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "current_thread")] + async fn assign_session_defers_when_team_is_saturated() -> Result<()> { + let tempdir = TestDir::new("manager-assign-defer-saturated")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let mut cfg = build_config(tempdir.path()); + cfg.max_parallel_sessions = 1; + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "lead".to_string(), + task: "lead task".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(42), + worktree: None, + created_at: now - Duration::minutes(3), + updated_at: now - Duration::minutes(3), + metrics: SessionMetrics::default(), + })?; + db.insert_session(&Session { + id: "busy-worker".to_string(), + task: "existing work".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(55), + worktree: None, + created_at: now - Duration::minutes(2), + updated_at: now - Duration::minutes(2), + metrics: SessionMetrics::default(), + })?; + db.send_message( + "lead", + "busy-worker", + "{\"task\":\"existing work\",\"context\":\"Delegated from lead\"}", + "task_handoff", + )?; + + let (fake_runner, _) = write_fake_claude(tempdir.path())?; + let outcome = assign_session_in_dir_with_runner_program( + &db, + &cfg, + "lead", + "New delegated task", + "claude", + true, + &repo_root, + &fake_runner, + ) + .await?; + + assert_eq!(outcome.action, AssignmentAction::DeferredSaturated); + assert_eq!(outcome.session_id, "lead"); + + let busy_messages = db.list_messages_for_session("busy-worker", 10)?; + assert!(!busy_messages.iter().any(|message| { + message.msg_type == "task_handoff" + && message.content.contains("New delegated task") + })); + + Ok(()) + } + #[tokio::test(flavor = "current_thread")] async fn drain_inbox_routes_unread_task_handoffs_and_marks_them_read() -> Result<()> { let tempdir = TestDir::new("manager-drain-inbox")?; @@ -1909,6 +1983,73 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "current_thread")] + async fn drain_inbox_leaves_saturated_handoffs_unread() -> Result<()> { + let tempdir = TestDir::new("manager-drain-inbox-defer")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let mut cfg = build_config(tempdir.path()); + cfg.max_parallel_sessions = 1; + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "lead".to_string(), + task: "lead task".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(42), + worktree: None, + created_at: now - Duration::minutes(3), + updated_at: now - Duration::minutes(3), + metrics: SessionMetrics::default(), + })?; + db.insert_session(&Session { + id: "busy-worker".to_string(), + task: "existing work".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(55), + worktree: None, + created_at: now - Duration::minutes(2), + updated_at: now - Duration::minutes(2), + metrics: SessionMetrics::default(), + })?; + db.send_message( + "lead", + "busy-worker", + "{\"task\":\"existing work\",\"context\":\"Delegated from lead\"}", + "task_handoff", + )?; + db.send_message( + "planner", + "lead", + "{\"task\":\"Review auth changes\",\"context\":\"Inbound request\"}", + "task_handoff", + )?; + + let outcomes = drain_inbox(&db, &cfg, "lead", "claude", true, 5).await?; + assert_eq!(outcomes.len(), 1); + assert_eq!(outcomes[0].task, "Review auth changes"); + assert_eq!(outcomes[0].action, AssignmentAction::DeferredSaturated); + assert_eq!(outcomes[0].session_id, "lead"); + + let unread = db.unread_message_counts()?; + assert_eq!(unread.get("lead"), Some(&1)); + assert_eq!(unread.get("busy-worker"), Some(&1)); + + let messages = db.list_messages_for_session("busy-worker", 10)?; + assert!(!messages.iter().any(|message| { + message.msg_type == "task_handoff" + && message.content.contains("Review auth changes") + })); + + Ok(()) + } + #[tokio::test(flavor = "current_thread")] async fn auto_dispatch_backlog_routes_multiple_lead_inboxes() -> Result<()> { let tempdir = TestDir::new("manager-auto-dispatch")?; diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index c4bdea08..5cf95725 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -817,7 +817,18 @@ impl Dashboard { } }; - let total_routed: usize = outcomes.iter().map(|outcome| outcome.routed.len()).sum(); + let total_processed: usize = outcomes.iter().map(|outcome| outcome.routed.len()).sum(); + let total_routed: usize = outcomes + .iter() + .map(|outcome| { + outcome + .routed + .iter() + .filter(|item| manager::assignment_action_routes_work(item.action)) + .count() + }) + .sum(); + let total_deferred = total_processed.saturating_sub(total_routed); let selected_session_id = self .sessions .get(self.selected_session) @@ -831,13 +842,15 @@ impl Dashboard { self.sync_selected_lineage(); self.refresh_logs(); - if total_routed == 0 { + if total_processed == 0 { self.set_operator_note("no unread handoff backlog found".to_string()); } else { self.set_operator_note(format!( - "auto-dispatched {} handoff(s) across {} lead session(s)", + "auto-dispatch processed {} handoff(s) across {} lead session(s) ({} routed, {} deferred)", + total_processed, + outcomes.len(), total_routed, - outcomes.len() + total_deferred )); } } @@ -908,11 +921,23 @@ impl Dashboard { return; } }; - let total_routed: usize = outcome + let total_processed: usize = outcome .dispatched .iter() .map(|dispatch| dispatch.routed.len()) .sum(); + let total_routed: usize = outcome + .dispatched + .iter() + .map(|dispatch| { + dispatch + .routed + .iter() + .filter(|item| manager::assignment_action_routes_work(item.action)) + .count() + }) + .sum(); + let total_deferred = total_processed.saturating_sub(total_routed); let total_rerouted: usize = outcome .rebalanced .iter() @@ -932,13 +957,15 @@ impl Dashboard { self.sync_selected_lineage(); self.refresh_logs(); - if total_routed == 0 && total_rerouted == 0 && outcome.remaining_backlog_sessions == 0 { + if total_processed == 0 && total_rerouted == 0 && outcome.remaining_backlog_sessions == 0 { self.set_operator_note("backlog already clear".to_string()); } else { self.set_operator_note(format!( - "coordinated backlog: dispatched {} across {} lead(s), rebalanced {} across {} lead(s), remaining {} across {} session(s) [{} absorbable, {} saturated]", - total_routed, + "coordinated backlog: processed {} across {} lead(s) ({} routed, {} deferred), rebalanced {} across {} lead(s), remaining {} across {} session(s) [{} absorbable, {} saturated]", + total_processed, outcome.dispatched.len(), + total_routed, + total_deferred, total_rerouted, outcome.rebalanced.len(), outcome.remaining_backlog_messages, @@ -1940,6 +1967,7 @@ fn assignment_action_label(action: manager::AssignmentAction) -> &'static str { manager::AssignmentAction::Spawned => "spawned", manager::AssignmentAction::ReusedIdle => "reused idle", manager::AssignmentAction::ReusedActive => "reused active", + manager::AssignmentAction::DeferredSaturated => "deferred saturated", } }