From 8ff5e736cd7acc617790e07110081a43be8c8cd6 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Tue, 7 Apr 2026 12:46:25 -0700 Subject: [PATCH] feat: make ecc2 routing inbox-aware --- ecc2/src/session/manager.rs | 113 +++++++++++++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 2 deletions(-) diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 41e887ec..ae0f2360 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -176,10 +176,14 @@ async fn assign_session_in_dir_with_runner_program( ) -> Result { let lead = resolve_session(db, lead_id)?; let delegates = direct_delegate_sessions(db, &lead.id, agent_type)?; + let unread_counts = db.unread_message_counts()?; if let Some(idle_delegate) = delegates .iter() - .filter(|session| session.state == SessionState::Idle) + .filter(|session| { + session.state == SessionState::Idle + && unread_counts.get(&session.id).copied().unwrap_or(0) == 0 + }) .min_by_key(|session| session.updated_at) { send_task_handoff(db, &lead, &idle_delegate.id, task, "reused idle delegate")?; @@ -207,10 +211,38 @@ async fn assign_session_in_dir_with_runner_program( }); } + if let Some(idle_delegate) = delegates + .iter() + .filter(|session| session.state == SessionState::Idle) + .min_by_key(|session| { + ( + unread_counts.get(&session.id).copied().unwrap_or(0), + session.updated_at, + ) + }) + { + send_task_handoff( + db, + &lead, + &idle_delegate.id, + task, + "reused idle delegate with existing inbox backlog", + )?; + return Ok(AssignmentOutcome { + session_id: idle_delegate.id.clone(), + action: AssignmentAction::ReusedIdle, + }); + } + if let Some(active_delegate) = delegates .iter() .filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending)) - .min_by_key(|session| session.updated_at) + .min_by_key(|session| { + ( + unread_counts.get(&session.id).copied().unwrap_or(0), + session.updated_at, + ) + }) { send_task_handoff( db, @@ -1325,6 +1357,7 @@ mod tests { "{\"task\":\"old worker task\",\"context\":\"Delegated from lead\"}", "task_handoff", )?; + db.mark_messages_read("idle-worker")?; let (fake_runner, _) = write_fake_claude(tempdir.path())?; let outcome = assign_session_in_dir_with_runner_program( @@ -1351,6 +1384,82 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "current_thread")] + async fn assign_session_spawns_instead_of_reusing_backed_up_idle_delegate() -> Result<()> { + let tempdir = TestDir::new("manager-assign-spawn-backed-up-idle")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "lead".to_string(), + task: "lead task".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(42), + worktree: None, + created_at: now - Duration::minutes(3), + updated_at: now - Duration::minutes(3), + metrics: SessionMetrics::default(), + })?; + db.insert_session(&Session { + id: "idle-worker".to_string(), + task: "old worker task".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Idle, + pid: Some(99), + worktree: None, + created_at: now - Duration::minutes(2), + updated_at: now - Duration::minutes(2), + metrics: SessionMetrics::default(), + })?; + db.send_message( + "lead", + "idle-worker", + "{\"task\":\"old worker task\",\"context\":\"Delegated from lead\"}", + "task_handoff", + )?; + + let (fake_runner, _) = write_fake_claude(tempdir.path())?; + let outcome = assign_session_in_dir_with_runner_program( + &db, + &cfg, + "lead", + "Fresh delegated task", + "claude", + true, + &repo_root, + &fake_runner, + ) + .await?; + + assert_eq!(outcome.action, AssignmentAction::Spawned); + assert_ne!(outcome.session_id, "idle-worker"); + + let idle_messages = db.list_messages_for_session("idle-worker", 10)?; + let fresh_assignments = idle_messages + .iter() + .filter(|message| { + message.msg_type == "task_handoff" + && message.content.contains("Fresh delegated task") + }) + .count(); + assert_eq!(fresh_assignments, 0); + + let spawned_messages = db.list_messages_for_session(&outcome.session_id, 10)?; + assert!(spawned_messages.iter().any(|message| { + message.msg_type == "task_handoff" + && message.content.contains("Fresh delegated task") + })); + + Ok(()) + } + #[tokio::test(flavor = "current_thread")] async fn assign_session_spawns_when_team_has_capacity() -> Result<()> { let tempdir = TestDir::new("manager-assign-spawn")?;