feat: make ecc2 routing inbox-aware

This commit is contained in:
Affaan Mustafa
2026-04-07 12:46:25 -07:00
parent 7afc6892b1
commit 8ff5e736cd

View File

@@ -176,10 +176,14 @@ async fn assign_session_in_dir_with_runner_program(
) -> Result<AssignmentOutcome> {
let lead = resolve_session(db, lead_id)?;
let delegates = direct_delegate_sessions(db, &lead.id, agent_type)?;
let unread_counts = db.unread_message_counts()?;
if let Some(idle_delegate) = delegates
.iter()
.filter(|session| session.state == SessionState::Idle)
.filter(|session| {
session.state == SessionState::Idle
&& unread_counts.get(&session.id).copied().unwrap_or(0) == 0
})
.min_by_key(|session| session.updated_at)
{
send_task_handoff(db, &lead, &idle_delegate.id, task, "reused idle delegate")?;
@@ -207,10 +211,38 @@ async fn assign_session_in_dir_with_runner_program(
});
}
if let Some(idle_delegate) = delegates
.iter()
.filter(|session| session.state == SessionState::Idle)
.min_by_key(|session| {
(
unread_counts.get(&session.id).copied().unwrap_or(0),
session.updated_at,
)
})
{
send_task_handoff(
db,
&lead,
&idle_delegate.id,
task,
"reused idle delegate with existing inbox backlog",
)?;
return Ok(AssignmentOutcome {
session_id: idle_delegate.id.clone(),
action: AssignmentAction::ReusedIdle,
});
}
if let Some(active_delegate) = delegates
.iter()
.filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending))
.min_by_key(|session| session.updated_at)
.min_by_key(|session| {
(
unread_counts.get(&session.id).copied().unwrap_or(0),
session.updated_at,
)
})
{
send_task_handoff(
db,
@@ -1325,6 +1357,7 @@ mod tests {
"{\"task\":\"old worker task\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)?;
db.mark_messages_read("idle-worker")?;
let (fake_runner, _) = write_fake_claude(tempdir.path())?;
let outcome = assign_session_in_dir_with_runner_program(
@@ -1351,6 +1384,82 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn assign_session_spawns_instead_of_reusing_backed_up_idle_delegate() -> Result<()> {
let tempdir = TestDir::new("manager-assign-spawn-backed-up-idle")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
pid: Some(42),
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "idle-worker".to_string(),
task: "old worker task".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Idle,
pid: Some(99),
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.send_message(
"lead",
"idle-worker",
"{\"task\":\"old worker task\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)?;
let (fake_runner, _) = write_fake_claude(tempdir.path())?;
let outcome = assign_session_in_dir_with_runner_program(
&db,
&cfg,
"lead",
"Fresh delegated task",
"claude",
true,
&repo_root,
&fake_runner,
)
.await?;
assert_eq!(outcome.action, AssignmentAction::Spawned);
assert_ne!(outcome.session_id, "idle-worker");
let idle_messages = db.list_messages_for_session("idle-worker", 10)?;
let fresh_assignments = idle_messages
.iter()
.filter(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("Fresh delegated task")
})
.count();
assert_eq!(fresh_assignments, 0);
let spawned_messages = db.list_messages_for_session(&outcome.session_id, 10)?;
assert!(spawned_messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("Fresh delegated task")
}));
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn assign_session_spawns_when_team_has_capacity() -> Result<()> {
let tempdir = TestDir::new("manager-assign-spawn")?;