From 23348a21a68be5aee366c84957c2a283d2e5e94b Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Fri, 10 Apr 2026 04:49:14 -0700 Subject: [PATCH] feat: preview ecc2 graph-aware routing --- ecc2/src/session/manager.rs | 186 +++++++++++++++++++++++++++++++---- ecc2/src/tui/dashboard.rs | 190 +++++++++++++++++++++++++++++++++++- 2 files changed, 351 insertions(+), 25 deletions(-) diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index d9fbc2d5..1fbbbaaf 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -480,11 +480,8 @@ pub async fn drain_inbox( let mut outcomes = Vec::new(); for message in messages { - let task = match comms::parse(&message.content) { - Some(MessageType::TaskHandoff { task, .. }) => task, - _ => extract_legacy_handoff_task(&message.content) - .unwrap_or_else(|| message.content.clone()), - }; + let task = parse_task_handoff_task(&message.content) + .unwrap_or_else(|| message.content.clone()); let outcome = assign_session_in_dir_with_runner_program( db, @@ -687,11 +684,8 @@ pub async fn rebalance_team_backlog( continue; } - let task = match comms::parse(&message.content) { - Some(MessageType::TaskHandoff { task, .. }) => task, - _ => extract_legacy_handoff_task(&message.content) - .unwrap_or_else(|| message.content.clone()), - }; + let task = parse_task_handoff_task(&message.content) + .unwrap_or_else(|| message.content.clone()); let outcome = assign_session_in_dir_with_runner_program( db, @@ -2367,19 +2361,24 @@ fn delegate_selection_key(db: &StateStore, session: &Session, task: &str) -> (us } fn graph_context_match_score(db: &StateStore, session_id: &str, task: &str) -> usize { + graph_context_matched_terms(db, session_id, task).len() +} + +fn graph_context_matched_terms(db: &StateStore, session_id: &str, task: &str) -> Vec { let terms = graph_match_terms(task); if terms.is_empty() { - return 0; + return Vec::new(); } let entities = match db.list_context_entities(Some(session_id), None, 48) { Ok(entities) => entities, - Err(_) => return 0, + Err(_) => return Vec::new(), }; - let mut matched = HashSet::new(); + let mut haystacks = Vec::new(); for entity in entities { - let mut haystacks = vec![entity.name.to_lowercase(), entity.summary.to_lowercase()]; + haystacks.push(entity.name.to_lowercase()); + haystacks.push(entity.summary.to_lowercase()); if let Some(path) = entity.path.as_ref() { haystacks.push(path.to_lowercase()); } @@ -2387,15 +2386,12 @@ fn graph_context_match_score(db: &StateStore, session_id: &str, task: &str) -> u haystacks.push(key.to_lowercase()); haystacks.push(value.to_lowercase()); } - - for term in &terms { - if haystacks.iter().any(|haystack| haystack.contains(term)) { - matched.insert(term.clone()); - } - } } - matched.len() + terms + .into_iter() + .filter(|term| haystacks.iter().any(|haystack| haystack.contains(term))) + .collect() } fn graph_match_terms(task: &str) -> Vec { @@ -2475,6 +2471,13 @@ fn send_task_handoff( ) } +pub(crate) fn parse_task_handoff_task(content: &str) -> Option { + match comms::parse(content) { + Some(MessageType::TaskHandoff { task, .. }) => Some(task), + _ => extract_legacy_handoff_task(content), + } +} + fn extract_legacy_handoff_task(content: &str) -> Option { let value: serde_json::Value = serde_json::from_str(content).ok()?; value @@ -2684,6 +2687,15 @@ pub struct AssignmentOutcome { pub action: AssignmentAction, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AssignmentPreview { + pub session_id: Option, + pub action: AssignmentAction, + pub delegate_state: Option, + pub handoff_backlog: usize, + pub graph_match_terms: Vec, +} + pub struct InboxDrainOutcome { pub message_id: i64, pub task: String, @@ -2759,6 +2771,120 @@ pub enum AssignmentAction { DeferredSaturated, } +pub fn preview_assignment_for_task( + db: &StateStore, + cfg: &Config, + lead_id: &str, + task: &str, + agent_type: &str, +) -> Result { + let lead = resolve_session(db, lead_id)?; + let delegates = direct_delegate_sessions(db, &lead.id, agent_type)?; + let delegate_handoff_backlog = delegates + .iter() + .map(|session| { + db.unread_task_handoff_count(&session.id) + .map(|count| (session.id.clone(), count)) + }) + .collect::>>()?; + + if let Some(idle_delegate) = delegates + .iter() + .filter(|session| { + session.state == SessionState::Idle + && delegate_handoff_backlog + .get(&session.id) + .copied() + .unwrap_or(0) + == 0 + }) + .max_by_key(|session| delegate_selection_key(db, session, task)) + { + return Ok(AssignmentPreview { + session_id: Some(idle_delegate.id.clone()), + action: AssignmentAction::ReusedIdle, + delegate_state: Some(idle_delegate.state.clone()), + handoff_backlog: 0, + graph_match_terms: graph_context_matched_terms(db, &idle_delegate.id, task), + }); + } + + if delegates.len() < cfg.max_parallel_sessions { + return Ok(AssignmentPreview { + session_id: None, + action: AssignmentAction::Spawned, + delegate_state: None, + handoff_backlog: 0, + graph_match_terms: Vec::new(), + }); + } + + if let Some(idle_delegate) = delegates + .iter() + .filter(|session| session.state == SessionState::Idle) + .min_by_key(|session| { + ( + delegate_handoff_backlog + .get(&session.id) + .copied() + .unwrap_or(0), + session.updated_at, + ) + }) + { + let handoff_backlog = delegate_handoff_backlog + .get(&idle_delegate.id) + .copied() + .unwrap_or(0); + return Ok(AssignmentPreview { + session_id: Some(idle_delegate.id.clone()), + action: AssignmentAction::DeferredSaturated, + delegate_state: Some(idle_delegate.state.clone()), + handoff_backlog, + graph_match_terms: graph_context_matched_terms(db, &idle_delegate.id, task), + }); + } + + if let Some(active_delegate) = delegates + .iter() + .filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending)) + .max_by_key(|session| { + ( + graph_context_match_score(db, &session.id, task), + -(delegate_handoff_backlog + .get(&session.id) + .copied() + .unwrap_or(0) as i64), + -session.updated_at.timestamp_millis(), + ) + }) + { + let handoff_backlog = delegate_handoff_backlog + .get(&active_delegate.id) + .copied() + .unwrap_or(0); + return Ok(AssignmentPreview { + session_id: Some(active_delegate.id.clone()), + action: if handoff_backlog > 0 { + AssignmentAction::DeferredSaturated + } else { + AssignmentAction::ReusedActive + }, + delegate_state: Some(active_delegate.state.clone()), + handoff_backlog, + graph_match_terms: graph_context_matched_terms(db, &active_delegate.id, task), + }); + } + + Ok(AssignmentPreview { + session_id: None, + action: AssignmentAction::Spawned, + delegate_state: None, + handoff_backlog: 0, + graph_match_terms: Vec::new(), + }) +} + pub fn assignment_action_routes_work(action: AssignmentAction) -> bool { !matches!(action, AssignmentAction::DeferredSaturated) } @@ -4875,6 +5001,24 @@ mod tests { &BTreeMap::new(), )?; + let preview = preview_assignment_for_task( + &db, + &cfg, + "lead", + "Investigate auth callback recovery", + "claude", + )?; + assert_eq!(preview.action, AssignmentAction::ReusedIdle); + assert_eq!(preview.session_id.as_deref(), Some("auth-worker")); + assert_eq!( + preview.graph_match_terms, + vec![ + "auth".to_string(), + "callback".to_string(), + "recovery".to_string() + ] + ); + let (fake_runner, _) = write_fake_claude(tempdir.path())?; let outcome = assign_session_in_dir_with_runner_program( &db, diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index 9c7854c5..396026fb 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -4910,8 +4910,12 @@ impl Dashboard { } self.selected_team_summary = if team.total > 0 { Some(team) } else { None }; + let selected_agent_type = self + .selected_agent_type() + .unwrap_or(self.cfg.default_agent.as_str()) + .to_string(); self.selected_route_preview = - self.build_route_preview(team.total, &route_candidates); + self.build_route_preview(&session_id, &selected_agent_type, team.total, &route_candidates); delegated.sort_by_key(|delegate| { ( delegate_attention_priority(delegate), @@ -4934,9 +4938,23 @@ impl Dashboard { fn build_route_preview( &self, + lead_id: &str, + lead_agent_type: &str, delegate_count: usize, delegates: &[DelegatedChildSummary], ) -> Option { + if let Some(task) = self.latest_route_task(lead_id) { + if let Ok(preview) = manager::preview_assignment_for_task( + &self.db, + &self.cfg, + lead_id, + &task, + lead_agent_type, + ) { + return Some(self.format_assignment_preview(&task, &preview)); + } + } + if let Some(idle_clear) = delegates .iter() .filter(|delegate| { @@ -4960,7 +4978,7 @@ impl Dashboard { .min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str())) { return Some(format!( - "reuse idle {} with backlog {}", + "defer; idle {} backlog {}", format_session_id(&idle_backed_up.session_id), idle_backed_up.handoff_backlog )); @@ -4977,9 +4995,18 @@ impl Dashboard { .min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str())) { return Some(format!( - "reuse active {} with backlog {}", + "{} active {}{}", + if active_delegate.handoff_backlog > 0 { + "defer;" + } else { + "reuse" + }, format_session_id(&active_delegate.session_id), - active_delegate.handoff_backlog + if active_delegate.handoff_backlog > 0 { + format!(" backlog {}", active_delegate.handoff_backlog) + } else { + String::new() + } )); } @@ -4990,6 +5017,78 @@ impl Dashboard { } } + fn latest_route_task(&self, session_id: &str) -> Option { + self.db + .list_messages_for_session(session_id, 16) + .ok()? + .into_iter() + .rev() + .find_map(|message| { + if message.to_session != session_id || message.msg_type != "task_handoff" { + return None; + } + manager::parse_task_handoff_task(&message.content) + .or_else(|| Some(message.content)) + }) + } + + fn format_assignment_preview( + &self, + task: &str, + preview: &manager::AssignmentPreview, + ) -> String { + let task_preview = truncate_for_dashboard(task, 40); + let graph_suffix = if preview.graph_match_terms.is_empty() { + String::new() + } else { + format!( + " | graph {}", + truncate_for_dashboard(&preview.graph_match_terms.join(", "), 36) + ) + }; + + match preview.action { + manager::AssignmentAction::Spawned => { + format!("for `{task_preview}` spawn new delegate") + } + manager::AssignmentAction::ReusedIdle => format!( + "for `{task_preview}` reuse idle {}{}", + preview + .session_id + .as_deref() + .map(format_session_id) + .unwrap_or_else(|| "unknown".to_string()), + graph_suffix + ), + manager::AssignmentAction::ReusedActive => format!( + "for `{task_preview}` reuse active {}{}", + preview + .session_id + .as_deref() + .map(format_session_id) + .unwrap_or_else(|| "unknown".to_string()), + graph_suffix + ), + manager::AssignmentAction::DeferredSaturated => { + let state_label = match preview.delegate_state { + Some(SessionState::Idle) => "idle", + Some(SessionState::Running) | Some(SessionState::Pending) => "active", + _ => "delegate", + }; + format!( + "for `{task_preview}` defer; {state_label} {} backlog {}{}", + preview + .session_id + .as_deref() + .map(format_session_id) + .unwrap_or_else(|| "unknown".to_string()), + preview.handoff_backlog, + graph_suffix + ) + } + } + } + fn selected_session_id(&self) -> Option<&str> { self.sessions .get(self.selected_session) @@ -11052,6 +11151,89 @@ diff --git a/src/lib.rs b/src/lib.rs assert!(!text.contains("Backlog focus-12")); } + #[test] + fn route_preview_uses_graph_context_for_latest_incoming_handoff() { + let lead = sample_session( + "lead-12345678", + "planner", + SessionState::Running, + Some("ecc/lead"), + 512, + 42, + ); + let older_worker = sample_session( + "older-worker", + "planner", + SessionState::Idle, + Some("ecc/older"), + 128, + 12, + ); + let auth_worker = sample_session( + "auth-worker", + "planner", + SessionState::Idle, + Some("ecc/auth"), + 256, + 24, + ); + + let mut dashboard = + test_dashboard(vec![lead.clone(), older_worker.clone(), auth_worker.clone()], 0); + dashboard.db.insert_session(&lead).unwrap(); + dashboard.db.insert_session(&older_worker).unwrap(); + dashboard.db.insert_session(&auth_worker).unwrap(); + dashboard + .db + .send_message( + "lead-12345678", + "older-worker", + "{\"task\":\"Legacy delegated work\",\"context\":\"Delegated from lead\"}", + "task_handoff", + ) + .unwrap(); + dashboard + .db + .send_message( + "lead-12345678", + "auth-worker", + "{\"task\":\"Auth delegated work\",\"context\":\"Delegated from lead\"}", + "task_handoff", + ) + .unwrap(); + dashboard.db.mark_messages_read("older-worker").unwrap(); + dashboard.db.mark_messages_read("auth-worker").unwrap(); + dashboard + .db + .send_message( + "planner-root", + "lead-12345678", + "{\"task\":\"Investigate auth callback recovery\",\"context\":\"Delegated from planner-root\"}", + "task_handoff", + ) + .unwrap(); + dashboard + .db + .upsert_context_entity( + Some("auth-worker"), + "file", + "auth-callback.ts", + Some("src/auth/callback.ts"), + "Auth callback recovery edge cases", + &BTreeMap::new(), + ) + .unwrap(); + + dashboard.unread_message_counts = dashboard.db.unread_message_counts().unwrap(); + dashboard.sync_selected_messages(); + dashboard.sync_selected_lineage(); + + assert_eq!( + dashboard.selected_route_preview.as_deref(), + Some("for `Investigate auth callback recovery` reuse idle auth-wor | graph auth, callback, recovery") + ); + } + #[test] fn route_preview_ignores_non_handoff_inbox_noise() { let lead = sample_session(