feat: preview ecc2 graph-aware routing

This commit is contained in:
Affaan Mustafa
2026-04-10 04:49:14 -07:00
parent 0b68af123c
commit 23348a21a6
2 changed files with 351 additions and 25 deletions

View File

@@ -480,11 +480,8 @@ pub async fn drain_inbox(
let mut outcomes = Vec::new(); let mut outcomes = Vec::new();
for message in messages { for message in messages {
let task = match comms::parse(&message.content) { let task = parse_task_handoff_task(&message.content)
Some(MessageType::TaskHandoff { task, .. }) => task, .unwrap_or_else(|| message.content.clone());
_ => extract_legacy_handoff_task(&message.content)
.unwrap_or_else(|| message.content.clone()),
};
let outcome = assign_session_in_dir_with_runner_program( let outcome = assign_session_in_dir_with_runner_program(
db, db,
@@ -687,11 +684,8 @@ pub async fn rebalance_team_backlog(
continue; continue;
} }
let task = match comms::parse(&message.content) { let task = parse_task_handoff_task(&message.content)
Some(MessageType::TaskHandoff { task, .. }) => task, .unwrap_or_else(|| message.content.clone());
_ => extract_legacy_handoff_task(&message.content)
.unwrap_or_else(|| message.content.clone()),
};
let outcome = assign_session_in_dir_with_runner_program( let outcome = assign_session_in_dir_with_runner_program(
db, db,
@@ -2367,19 +2361,24 @@ fn delegate_selection_key(db: &StateStore, session: &Session, task: &str) -> (us
} }
fn graph_context_match_score(db: &StateStore, session_id: &str, task: &str) -> usize { fn graph_context_match_score(db: &StateStore, session_id: &str, task: &str) -> usize {
graph_context_matched_terms(db, session_id, task).len()
}
fn graph_context_matched_terms(db: &StateStore, session_id: &str, task: &str) -> Vec<String> {
let terms = graph_match_terms(task); let terms = graph_match_terms(task);
if terms.is_empty() { if terms.is_empty() {
return 0; return Vec::new();
} }
let entities = match db.list_context_entities(Some(session_id), None, 48) { let entities = match db.list_context_entities(Some(session_id), None, 48) {
Ok(entities) => entities, Ok(entities) => entities,
Err(_) => return 0, Err(_) => return Vec::new(),
}; };
let mut matched = HashSet::new(); let mut haystacks = Vec::new();
for entity in entities { for entity in entities {
let mut haystacks = vec![entity.name.to_lowercase(), entity.summary.to_lowercase()]; haystacks.push(entity.name.to_lowercase());
haystacks.push(entity.summary.to_lowercase());
if let Some(path) = entity.path.as_ref() { if let Some(path) = entity.path.as_ref() {
haystacks.push(path.to_lowercase()); haystacks.push(path.to_lowercase());
} }
@@ -2387,15 +2386,12 @@ fn graph_context_match_score(db: &StateStore, session_id: &str, task: &str) -> u
haystacks.push(key.to_lowercase()); haystacks.push(key.to_lowercase());
haystacks.push(value.to_lowercase()); haystacks.push(value.to_lowercase());
} }
for term in &terms {
if haystacks.iter().any(|haystack| haystack.contains(term)) {
matched.insert(term.clone());
}
}
} }
matched.len() terms
.into_iter()
.filter(|term| haystacks.iter().any(|haystack| haystack.contains(term)))
.collect()
} }
fn graph_match_terms(task: &str) -> Vec<String> { fn graph_match_terms(task: &str) -> Vec<String> {
@@ -2475,6 +2471,13 @@ fn send_task_handoff(
) )
} }
pub(crate) fn parse_task_handoff_task(content: &str) -> Option<String> {
match comms::parse(content) {
Some(MessageType::TaskHandoff { task, .. }) => Some(task),
_ => extract_legacy_handoff_task(content),
}
}
fn extract_legacy_handoff_task(content: &str) -> Option<String> { fn extract_legacy_handoff_task(content: &str) -> Option<String> {
let value: serde_json::Value = serde_json::from_str(content).ok()?; let value: serde_json::Value = serde_json::from_str(content).ok()?;
value value
@@ -2684,6 +2687,15 @@ pub struct AssignmentOutcome {
pub action: AssignmentAction, pub action: AssignmentAction,
} }
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AssignmentPreview {
pub session_id: Option<String>,
pub action: AssignmentAction,
pub delegate_state: Option<SessionState>,
pub handoff_backlog: usize,
pub graph_match_terms: Vec<String>,
}
pub struct InboxDrainOutcome { pub struct InboxDrainOutcome {
pub message_id: i64, pub message_id: i64,
pub task: String, pub task: String,
@@ -2759,6 +2771,120 @@ pub enum AssignmentAction {
DeferredSaturated, DeferredSaturated,
} }
pub fn preview_assignment_for_task(
db: &StateStore,
cfg: &Config,
lead_id: &str,
task: &str,
agent_type: &str,
) -> Result<AssignmentPreview> {
let lead = resolve_session(db, lead_id)?;
let delegates = direct_delegate_sessions(db, &lead.id, agent_type)?;
let delegate_handoff_backlog = delegates
.iter()
.map(|session| {
db.unread_task_handoff_count(&session.id)
.map(|count| (session.id.clone(), count))
})
.collect::<Result<HashMap<_, _>>>()?;
if let Some(idle_delegate) = delegates
.iter()
.filter(|session| {
session.state == SessionState::Idle
&& delegate_handoff_backlog
.get(&session.id)
.copied()
.unwrap_or(0)
== 0
})
.max_by_key(|session| delegate_selection_key(db, session, task))
{
return Ok(AssignmentPreview {
session_id: Some(idle_delegate.id.clone()),
action: AssignmentAction::ReusedIdle,
delegate_state: Some(idle_delegate.state.clone()),
handoff_backlog: 0,
graph_match_terms: graph_context_matched_terms(db, &idle_delegate.id, task),
});
}
if delegates.len() < cfg.max_parallel_sessions {
return Ok(AssignmentPreview {
session_id: None,
action: AssignmentAction::Spawned,
delegate_state: None,
handoff_backlog: 0,
graph_match_terms: Vec::new(),
});
}
if let Some(idle_delegate) = delegates
.iter()
.filter(|session| session.state == SessionState::Idle)
.min_by_key(|session| {
(
delegate_handoff_backlog
.get(&session.id)
.copied()
.unwrap_or(0),
session.updated_at,
)
})
{
let handoff_backlog = delegate_handoff_backlog
.get(&idle_delegate.id)
.copied()
.unwrap_or(0);
return Ok(AssignmentPreview {
session_id: Some(idle_delegate.id.clone()),
action: AssignmentAction::DeferredSaturated,
delegate_state: Some(idle_delegate.state.clone()),
handoff_backlog,
graph_match_terms: graph_context_matched_terms(db, &idle_delegate.id, task),
});
}
if let Some(active_delegate) = delegates
.iter()
.filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending))
.max_by_key(|session| {
(
graph_context_match_score(db, &session.id, task),
-(delegate_handoff_backlog
.get(&session.id)
.copied()
.unwrap_or(0) as i64),
-session.updated_at.timestamp_millis(),
)
})
{
let handoff_backlog = delegate_handoff_backlog
.get(&active_delegate.id)
.copied()
.unwrap_or(0);
return Ok(AssignmentPreview {
session_id: Some(active_delegate.id.clone()),
action: if handoff_backlog > 0 {
AssignmentAction::DeferredSaturated
} else {
AssignmentAction::ReusedActive
},
delegate_state: Some(active_delegate.state.clone()),
handoff_backlog,
graph_match_terms: graph_context_matched_terms(db, &active_delegate.id, task),
});
}
Ok(AssignmentPreview {
session_id: None,
action: AssignmentAction::Spawned,
delegate_state: None,
handoff_backlog: 0,
graph_match_terms: Vec::new(),
})
}
pub fn assignment_action_routes_work(action: AssignmentAction) -> bool { pub fn assignment_action_routes_work(action: AssignmentAction) -> bool {
!matches!(action, AssignmentAction::DeferredSaturated) !matches!(action, AssignmentAction::DeferredSaturated)
} }
@@ -4875,6 +5001,24 @@ mod tests {
&BTreeMap::new(), &BTreeMap::new(),
)?; )?;
let preview = preview_assignment_for_task(
&db,
&cfg,
"lead",
"Investigate auth callback recovery",
"claude",
)?;
assert_eq!(preview.action, AssignmentAction::ReusedIdle);
assert_eq!(preview.session_id.as_deref(), Some("auth-worker"));
assert_eq!(
preview.graph_match_terms,
vec![
"auth".to_string(),
"callback".to_string(),
"recovery".to_string()
]
);
let (fake_runner, _) = write_fake_claude(tempdir.path())?; let (fake_runner, _) = write_fake_claude(tempdir.path())?;
let outcome = assign_session_in_dir_with_runner_program( let outcome = assign_session_in_dir_with_runner_program(
&db, &db,

View File

@@ -4910,8 +4910,12 @@ impl Dashboard {
} }
self.selected_team_summary = if team.total > 0 { Some(team) } else { None }; self.selected_team_summary = if team.total > 0 { Some(team) } else { None };
let selected_agent_type = self
.selected_agent_type()
.unwrap_or(self.cfg.default_agent.as_str())
.to_string();
self.selected_route_preview = self.selected_route_preview =
self.build_route_preview(team.total, &route_candidates); self.build_route_preview(&session_id, &selected_agent_type, team.total, &route_candidates);
delegated.sort_by_key(|delegate| { delegated.sort_by_key(|delegate| {
( (
delegate_attention_priority(delegate), delegate_attention_priority(delegate),
@@ -4934,9 +4938,23 @@ impl Dashboard {
fn build_route_preview( fn build_route_preview(
&self, &self,
lead_id: &str,
lead_agent_type: &str,
delegate_count: usize, delegate_count: usize,
delegates: &[DelegatedChildSummary], delegates: &[DelegatedChildSummary],
) -> Option<String> { ) -> Option<String> {
if let Some(task) = self.latest_route_task(lead_id) {
if let Ok(preview) = manager::preview_assignment_for_task(
&self.db,
&self.cfg,
lead_id,
&task,
lead_agent_type,
) {
return Some(self.format_assignment_preview(&task, &preview));
}
}
if let Some(idle_clear) = delegates if let Some(idle_clear) = delegates
.iter() .iter()
.filter(|delegate| { .filter(|delegate| {
@@ -4960,7 +4978,7 @@ impl Dashboard {
.min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str())) .min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str()))
{ {
return Some(format!( return Some(format!(
"reuse idle {} with backlog {}", "defer; idle {} backlog {}",
format_session_id(&idle_backed_up.session_id), format_session_id(&idle_backed_up.session_id),
idle_backed_up.handoff_backlog idle_backed_up.handoff_backlog
)); ));
@@ -4977,9 +4995,18 @@ impl Dashboard {
.min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str())) .min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str()))
{ {
return Some(format!( return Some(format!(
"reuse active {} with backlog {}", "{} active {}{}",
if active_delegate.handoff_backlog > 0 {
"defer;"
} else {
"reuse"
},
format_session_id(&active_delegate.session_id), format_session_id(&active_delegate.session_id),
active_delegate.handoff_backlog if active_delegate.handoff_backlog > 0 {
format!(" backlog {}", active_delegate.handoff_backlog)
} else {
String::new()
}
)); ));
} }
@@ -4990,6 +5017,78 @@ impl Dashboard {
} }
} }
fn latest_route_task(&self, session_id: &str) -> Option<String> {
self.db
.list_messages_for_session(session_id, 16)
.ok()?
.into_iter()
.rev()
.find_map(|message| {
if message.to_session != session_id || message.msg_type != "task_handoff" {
return None;
}
manager::parse_task_handoff_task(&message.content)
.or_else(|| Some(message.content))
})
}
fn format_assignment_preview(
&self,
task: &str,
preview: &manager::AssignmentPreview,
) -> String {
let task_preview = truncate_for_dashboard(task, 40);
let graph_suffix = if preview.graph_match_terms.is_empty() {
String::new()
} else {
format!(
" | graph {}",
truncate_for_dashboard(&preview.graph_match_terms.join(", "), 36)
)
};
match preview.action {
manager::AssignmentAction::Spawned => {
format!("for `{task_preview}` spawn new delegate")
}
manager::AssignmentAction::ReusedIdle => format!(
"for `{task_preview}` reuse idle {}{}",
preview
.session_id
.as_deref()
.map(format_session_id)
.unwrap_or_else(|| "unknown".to_string()),
graph_suffix
),
manager::AssignmentAction::ReusedActive => format!(
"for `{task_preview}` reuse active {}{}",
preview
.session_id
.as_deref()
.map(format_session_id)
.unwrap_or_else(|| "unknown".to_string()),
graph_suffix
),
manager::AssignmentAction::DeferredSaturated => {
let state_label = match preview.delegate_state {
Some(SessionState::Idle) => "idle",
Some(SessionState::Running) | Some(SessionState::Pending) => "active",
_ => "delegate",
};
format!(
"for `{task_preview}` defer; {state_label} {} backlog {}{}",
preview
.session_id
.as_deref()
.map(format_session_id)
.unwrap_or_else(|| "unknown".to_string()),
preview.handoff_backlog,
graph_suffix
)
}
}
}
fn selected_session_id(&self) -> Option<&str> { fn selected_session_id(&self) -> Option<&str> {
self.sessions self.sessions
.get(self.selected_session) .get(self.selected_session)
@@ -11052,6 +11151,89 @@ diff --git a/src/lib.rs b/src/lib.rs
assert!(!text.contains("Backlog focus-12")); assert!(!text.contains("Backlog focus-12"));
} }
#[test]
fn route_preview_uses_graph_context_for_latest_incoming_handoff() {
let lead = sample_session(
"lead-12345678",
"planner",
SessionState::Running,
Some("ecc/lead"),
512,
42,
);
let older_worker = sample_session(
"older-worker",
"planner",
SessionState::Idle,
Some("ecc/older"),
128,
12,
);
let auth_worker = sample_session(
"auth-worker",
"planner",
SessionState::Idle,
Some("ecc/auth"),
256,
24,
);
let mut dashboard =
test_dashboard(vec![lead.clone(), older_worker.clone(), auth_worker.clone()], 0);
dashboard.db.insert_session(&lead).unwrap();
dashboard.db.insert_session(&older_worker).unwrap();
dashboard.db.insert_session(&auth_worker).unwrap();
dashboard
.db
.send_message(
"lead-12345678",
"older-worker",
"{\"task\":\"Legacy delegated work\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)
.unwrap();
dashboard
.db
.send_message(
"lead-12345678",
"auth-worker",
"{\"task\":\"Auth delegated work\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)
.unwrap();
dashboard.db.mark_messages_read("older-worker").unwrap();
dashboard.db.mark_messages_read("auth-worker").unwrap();
dashboard
.db
.send_message(
"planner-root",
"lead-12345678",
"{\"task\":\"Investigate auth callback recovery\",\"context\":\"Delegated from planner-root\"}",
"task_handoff",
)
.unwrap();
dashboard
.db
.upsert_context_entity(
Some("auth-worker"),
"file",
"auth-callback.ts",
Some("src/auth/callback.ts"),
"Auth callback recovery edge cases",
&BTreeMap::new(),
)
.unwrap();
dashboard.unread_message_counts = dashboard.db.unread_message_counts().unwrap();
dashboard.sync_selected_messages();
dashboard.sync_selected_lineage();
assert_eq!(
dashboard.selected_route_preview.as_deref(),
Some("for `Investigate auth callback recovery` reuse idle auth-wor | graph auth, callback, recovery")
);
}
#[test] #[test]
fn route_preview_ignores_non_handoff_inbox_noise() { fn route_preview_ignores_non_handoff_inbox_noise() {
let lead = sample_session( let lead = sample_session(