From 3199120abe59697d3e0a200544c703c5f3ed61f3 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 03:47:11 -0700 Subject: [PATCH] feat: route ecc2 by handoff backlog --- ecc2/src/session/manager.rs | 99 +++++++++++++++++++++++++++++++++++-- ecc2/src/session/store.rs | 13 +++++ ecc2/src/tui/dashboard.rs | 64 +++++++++++++++++++++++- 3 files changed, 170 insertions(+), 6 deletions(-) diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 20e7db1a..a2c1a741 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -428,13 +428,23 @@ async fn assign_session_in_dir_with_runner_program( ) -> Result { let lead = resolve_session(db, lead_id)?; let delegates = direct_delegate_sessions(db, &lead.id, agent_type)?; - let unread_counts = db.unread_message_counts()?; + let delegate_handoff_backlog = delegates + .iter() + .map(|session| { + db.unread_task_handoff_count(&session.id) + .map(|count| (session.id.clone(), count)) + }) + .collect::>>()?; if let Some(idle_delegate) = delegates .iter() .filter(|session| { session.state == SessionState::Idle - && unread_counts.get(&session.id).copied().unwrap_or(0) == 0 + && delegate_handoff_backlog + .get(&session.id) + .copied() + .unwrap_or(0) + == 0 }) .min_by_key(|session| session.updated_at) { @@ -468,7 +478,10 @@ async fn assign_session_in_dir_with_runner_program( .filter(|session| session.state == SessionState::Idle) .min_by_key(|session| { ( - unread_counts.get(&session.id).copied().unwrap_or(0), + delegate_handoff_backlog + .get(&session.id) + .copied() + .unwrap_or(0), session.updated_at, ) }) @@ -484,12 +497,20 @@ async fn assign_session_in_dir_with_runner_program( .filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending)) .min_by_key(|session| { ( - unread_counts.get(&session.id).copied().unwrap_or(0), + delegate_handoff_backlog + .get(&session.id) + .copied() + .unwrap_or(0), session.updated_at, ) }) { - if unread_counts.get(&active_delegate.id).copied().unwrap_or(0) > 0 { + if delegate_handoff_backlog + .get(&active_delegate.id) + .copied() + .unwrap_or(0) + > 0 + { return Ok(AssignmentOutcome { session_id: lead.id.clone(), action: AssignmentAction::DeferredSaturated, @@ -1798,6 +1819,74 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "current_thread")] + async fn assign_session_reuses_idle_delegate_when_only_non_handoff_messages_are_unread() -> Result<()> { + let tempdir = TestDir::new("manager-assign-reuse-idle-info-inbox")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "lead".to_string(), + task: "lead task".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(42), + worktree: None, + created_at: now - Duration::minutes(3), + updated_at: now - Duration::minutes(3), + metrics: SessionMetrics::default(), + })?; + db.insert_session(&Session { + id: "idle-worker".to_string(), + task: "old worker task".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Idle, + pid: Some(99), + worktree: None, + created_at: now - Duration::minutes(2), + updated_at: now - Duration::minutes(2), + metrics: SessionMetrics::default(), + })?; + db.send_message( + "lead", + "idle-worker", + "{\"task\":\"old worker task\",\"context\":\"Delegated from lead\"}", + "task_handoff", + )?; + db.mark_messages_read("idle-worker")?; + db.send_message("lead", "idle-worker", "FYI status update", "info")?; + + let (fake_runner, _) = write_fake_claude(tempdir.path())?; + let outcome = assign_session_in_dir_with_runner_program( + &db, + &cfg, + "lead", + "Fresh delegated task", + "claude", + true, + &repo_root, + &fake_runner, + ) + .await?; + + assert_eq!(outcome.action, AssignmentAction::ReusedIdle); + assert_eq!(outcome.session_id, "idle-worker"); + + let idle_messages = db.list_messages_for_session("idle-worker", 10)?; + assert!(idle_messages.iter().any(|message| { + message.msg_type == "task_handoff" + && message.content.contains("Fresh delegated task") + })); + + Ok(()) + } + #[tokio::test(flavor = "current_thread")] async fn assign_session_spawns_when_team_has_capacity() -> Result<()> { let tempdir = TestDir::new("manager-assign-spawn")?; diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 15eef24e..793fa6d3 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -555,6 +555,19 @@ impl StateStore { .map_err(Into::into) } + pub fn unread_task_handoff_count(&self, session_id: &str) -> Result { + self.conn + .query_row( + "SELECT COUNT(*) + FROM messages + WHERE to_session = ?1 AND msg_type = 'task_handoff' AND read = 0", + rusqlite::params![session_id], + |row| row.get::<_, i64>(0), + ) + .map(|count| count as usize) + .map_err(Into::into) + } + pub fn unread_task_handoff_targets(&self, limit: usize) -> Result> { let mut stmt = self.conn.prepare( "SELECT to_session, COUNT(*) as unread_count diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index faa69262..63d5526a 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -1285,6 +1285,16 @@ impl Dashboard { .get(&child_id) .copied() .unwrap_or(0); + let handoff_backlog = match self.db.unread_task_handoff_count(&child_id) { + Ok(count) => count, + Err(error) => { + tracing::warn!( + "Failed to load delegated child handoff backlog {}: {error}", + child_id + ); + 0 + } + }; let state = session.state.clone(); match state { SessionState::Idle => team.idle += 1, @@ -1296,7 +1306,7 @@ impl Dashboard { } route_candidates.push(DelegatedChildSummary { - unread_messages, + unread_messages: handoff_backlog, state: state.clone(), session_id: child_id.clone(), }); @@ -2345,6 +2355,58 @@ mod tests { assert!(!text.contains("Inbox focus-12")); } + #[test] + fn route_preview_ignores_non_handoff_inbox_noise() { + let lead = sample_session( + "lead-12345678", + "planner", + SessionState::Running, + Some("ecc/lead"), + 512, + 42, + ); + let idle_worker = sample_session( + "idle-worker", + "planner", + SessionState::Idle, + Some("ecc/idle"), + 128, + 12, + ); + + let mut dashboard = test_dashboard(vec![lead.clone(), idle_worker.clone()], 0); + dashboard.db.insert_session(&lead).unwrap(); + dashboard.db.insert_session(&idle_worker).unwrap(); + dashboard + .db + .send_message("lead-12345678", "idle-worker", "FYI status update", "info") + .unwrap(); + dashboard + .db + .send_message( + "lead-12345678", + "idle-worker", + "{\"task\":\"Delegated work\",\"context\":\"Delegated from lead\"}", + "task_handoff", + ) + .unwrap(); + dashboard.db.mark_messages_read("idle-worker").unwrap(); + dashboard + .db + .send_message("lead-12345678", "idle-worker", "FYI status update", "info") + .unwrap(); + + dashboard.unread_message_counts = dashboard.db.unread_message_counts().unwrap(); + dashboard.sync_selected_lineage(); + + assert_eq!( + dashboard.selected_route_preview.as_deref(), + Some("reuse idle idle-wor") + ); + assert_eq!(dashboard.selected_child_sessions.len(), 1); + assert_eq!(dashboard.selected_child_sessions[0].unread_messages, 1); + } + #[test] fn aggregate_cost_summary_mentions_total_cost() { let db = StateStore::open(Path::new(":memory:")).unwrap();