feat: route ecc2 delegates by graph context

This commit is contained in:
Affaan Mustafa
2026-04-10 04:41:00 -07:00
parent 4b1ff48219
commit 0b68af123c

View File

@@ -1156,7 +1156,7 @@ async fn assign_session_in_dir_with_runner_program(
.unwrap_or(0) .unwrap_or(0)
== 0 == 0
}) })
.min_by_key(|session| session.updated_at) .max_by_key(|session| delegate_selection_key(db, session, task))
{ {
send_task_handoff(db, &lead, &idle_delegate.id, task, "reused idle delegate")?; send_task_handoff(db, &lead, &idle_delegate.id, task, "reused idle delegate")?;
return Ok(AssignmentOutcome { return Ok(AssignmentOutcome {
@@ -1208,13 +1208,14 @@ async fn assign_session_in_dir_with_runner_program(
if let Some(active_delegate) = delegates if let Some(active_delegate) = delegates
.iter() .iter()
.filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending)) .filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending))
.min_by_key(|session| { .max_by_key(|session| {
( (
delegate_handoff_backlog graph_context_match_score(db, &session.id, task),
-(delegate_handoff_backlog
.get(&session.id) .get(&session.id)
.copied() .copied()
.unwrap_or(0), .unwrap_or(0) as i64),
session.updated_at, -session.updated_at.timestamp_millis(),
) )
}) })
{ {
@@ -2358,6 +2359,61 @@ fn direct_delegate_sessions(
Ok(sessions) Ok(sessions)
} }
fn delegate_selection_key(db: &StateStore, session: &Session, task: &str) -> (usize, i64) {
(
graph_context_match_score(db, &session.id, task),
-session.updated_at.timestamp_millis(),
)
}
fn graph_context_match_score(db: &StateStore, session_id: &str, task: &str) -> usize {
let terms = graph_match_terms(task);
if terms.is_empty() {
return 0;
}
let entities = match db.list_context_entities(Some(session_id), None, 48) {
Ok(entities) => entities,
Err(_) => return 0,
};
let mut matched = HashSet::new();
for entity in entities {
let mut haystacks = vec![entity.name.to_lowercase(), entity.summary.to_lowercase()];
if let Some(path) = entity.path.as_ref() {
haystacks.push(path.to_lowercase());
}
for (key, value) in entity.metadata {
haystacks.push(key.to_lowercase());
haystacks.push(value.to_lowercase());
}
for term in &terms {
if haystacks.iter().any(|haystack| haystack.contains(term)) {
matched.insert(term.clone());
}
}
}
matched.len()
}
fn graph_match_terms(task: &str) -> Vec<String> {
let mut terms = Vec::new();
let mut seen = HashSet::new();
for token in task
.split(|ch: char| !(ch.is_ascii_alphanumeric() || matches!(ch, '_' | '.' | '-')))
.map(str::trim)
.filter(|token| token.len() >= 3)
{
let lowered = token.to_ascii_lowercase();
if seen.insert(lowered.clone()) {
terms.push(lowered);
}
}
terms
}
fn summarize_backlog_pressure( fn summarize_backlog_pressure(
db: &StateStore, db: &StateStore,
cfg: &Config, cfg: &Config,
@@ -4740,6 +4796,112 @@ mod tests {
Ok(()) Ok(())
} }
#[tokio::test(flavor = "current_thread")]
async fn assign_session_prefers_idle_delegate_with_graph_context_match() -> Result<()> {
let tempdir = TestDir::new("manager-assign-graph-context-idle")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
pid: Some(42),
worktree: None,
created_at: now - Duration::minutes(4),
updated_at: now - Duration::minutes(4),
last_heartbeat_at: now - Duration::minutes(4),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "older-worker".to_string(),
task: "legacy delegated task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Idle,
pid: Some(100),
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "auth-worker".to_string(),
task: "auth delegated task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Idle,
pid: Some(101),
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
last_heartbeat_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.send_message(
"lead",
"older-worker",
"{\"task\":\"legacy delegated task\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)?;
db.send_message(
"lead",
"auth-worker",
"{\"task\":\"auth delegated task\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)?;
db.mark_messages_read("older-worker")?;
db.mark_messages_read("auth-worker")?;
db.upsert_context_entity(
Some("auth-worker"),
"file",
"auth-callback.ts",
Some("src/auth/callback.ts"),
"Auth callback recovery edge cases",
&BTreeMap::new(),
)?;
let (fake_runner, _) = write_fake_claude(tempdir.path())?;
let outcome = assign_session_in_dir_with_runner_program(
&db,
&cfg,
"lead",
"Investigate auth callback recovery",
"claude",
true,
&repo_root,
&fake_runner,
None,
SessionGrouping::default(),
)
.await?;
assert_eq!(outcome.action, AssignmentAction::ReusedIdle);
assert_eq!(outcome.session_id, "auth-worker");
let auth_messages = db.list_messages_for_session("auth-worker", 10)?;
assert!(auth_messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("Investigate auth callback recovery")
}));
Ok(())
}
#[tokio::test(flavor = "current_thread")] #[tokio::test(flavor = "current_thread")]
async fn assign_session_spawns_instead_of_reusing_backed_up_idle_delegate() -> Result<()> { async fn assign_session_spawns_instead_of_reusing_backed_up_idle_delegate() -> Result<()> {
let tempdir = TestDir::new("manager-assign-spawn-backed-up-idle")?; let tempdir = TestDir::new("manager-assign-spawn-backed-up-idle")?;