feat: align ecc2 delegate backlog semantics

This commit is contained in:
Affaan Mustafa
2026-04-08 03:55:03 -07:00
parent 63c437b986
commit 2fba71fcdb
2 changed files with 84 additions and 32 deletions

View File

@@ -42,7 +42,10 @@ pub fn get_status(db: &StateStore, id: &str) -> Result<SessionStatus> {
pub fn get_team_status(db: &StateStore, id: &str, depth: usize) -> Result<TeamStatus> { pub fn get_team_status(db: &StateStore, id: &str, depth: usize) -> Result<TeamStatus> {
let root = resolve_session(db, id)?; let root = resolve_session(db, id)?;
let unread_counts = db.unread_message_counts()?; let handoff_backlog = db
.unread_task_handoff_targets(db.list_sessions()?.len().max(1))?
.into_iter()
.collect();
let mut visited = HashSet::new(); let mut visited = HashSet::new();
visited.insert(root.id.clone()); visited.insert(root.id.clone());
@@ -52,14 +55,14 @@ pub fn get_team_status(db: &StateStore, id: &str, depth: usize) -> Result<TeamSt
&root.id, &root.id,
depth, depth,
1, 1,
&unread_counts, &handoff_backlog,
&mut visited, &mut visited,
&mut descendants, &mut descendants,
)?; )?;
Ok(TeamStatus { Ok(TeamStatus {
root, root,
unread_messages: unread_counts, handoff_backlog,
descendants, descendants,
}) })
} }
@@ -552,7 +555,7 @@ fn collect_delegation_descendants(
session_id: &str, session_id: &str,
remaining_depth: usize, remaining_depth: usize,
current_depth: usize, current_depth: usize,
unread_counts: &std::collections::HashMap<String, usize>, handoff_backlog: &std::collections::HashMap<String, usize>,
visited: &mut HashSet<String>, visited: &mut HashSet<String>,
descendants: &mut Vec<DelegatedSessionSummary>, descendants: &mut Vec<DelegatedSessionSummary>,
) -> Result<()> { ) -> Result<()> {
@@ -571,7 +574,7 @@ fn collect_delegation_descendants(
descendants.push(DelegatedSessionSummary { descendants.push(DelegatedSessionSummary {
depth: current_depth, depth: current_depth,
unread_messages: unread_counts.get(&child_id).copied().unwrap_or(0), handoff_backlog: handoff_backlog.get(&child_id).copied().unwrap_or(0),
session, session,
}); });
@@ -580,7 +583,7 @@ fn collect_delegation_descendants(
&child_id, &child_id,
remaining_depth.saturating_sub(1), remaining_depth.saturating_sub(1),
current_depth + 1, current_depth + 1,
unread_counts, handoff_backlog,
visited, visited,
descendants, descendants,
)?; )?;
@@ -843,14 +846,13 @@ fn summarize_backlog_pressure(
agent_type: &str, agent_type: &str,
targets: &[(String, usize)], targets: &[(String, usize)],
) -> Result<BacklogPressureSummary> { ) -> Result<BacklogPressureSummary> {
let unread_counts = db.unread_message_counts()?;
let mut summary = BacklogPressureSummary::default(); let mut summary = BacklogPressureSummary::default();
for (session_id, _) in targets { for (session_id, _) in targets {
let delegates = direct_delegate_sessions(db, session_id, agent_type)?; let delegates = direct_delegate_sessions(db, session_id, agent_type)?;
let has_clear_idle_delegate = delegates.iter().any(|delegate| { let has_clear_idle_delegate = delegates.iter().any(|delegate| {
delegate.state == SessionState::Idle delegate.state == SessionState::Idle
&& unread_counts.get(&delegate.id).copied().unwrap_or(0) == 0 && db.unread_task_handoff_count(&delegate.id).unwrap_or(0) == 0
}); });
let has_capacity = delegates.len() < cfg.max_parallel_sessions; let has_capacity = delegates.len() < cfg.max_parallel_sessions;
@@ -1048,7 +1050,7 @@ pub struct SessionStatus {
pub struct TeamStatus { pub struct TeamStatus {
root: Session, root: Session,
unread_messages: std::collections::HashMap<String, usize>, handoff_backlog: std::collections::HashMap<String, usize>,
descendants: Vec<DelegatedSessionSummary>, descendants: Vec<DelegatedSessionSummary>,
} }
@@ -1112,7 +1114,7 @@ struct BacklogPressureSummary {
struct DelegatedSessionSummary { struct DelegatedSessionSummary {
depth: usize, depth: usize,
unread_messages: usize, handoff_backlog: usize,
session: Session, session: Session,
} }
@@ -1154,8 +1156,8 @@ impl fmt::Display for TeamStatus {
writeln!(f, "Branch: {}", worktree.branch)?; writeln!(f, "Branch: {}", worktree.branch)?;
} }
let lead_unread = self.unread_messages.get(&self.root.id).copied().unwrap_or(0); let lead_handoff_backlog = self.handoff_backlog.get(&self.root.id).copied().unwrap_or(0);
writeln!(f, "Inbox: {}", lead_unread)?; writeln!(f, "Backlog: {}", lead_handoff_backlog)?;
if self.descendants.is_empty() { if self.descendants.is_empty() {
return write!(f, "Board: no delegated sessions"); return write!(f, "Board: no delegated sessions");
@@ -1185,11 +1187,11 @@ impl fmt::Display for TeamStatus {
for item in items { for item in items {
writeln!( writeln!(
f, f,
" - {}{} [{}] | inbox {} | {}", " - {}{} [{}] | backlog {} handoff(s) | {}",
" ".repeat(item.depth.saturating_sub(1)), " ".repeat(item.depth.saturating_sub(1)),
item.session.id, item.session.id,
item.session.agent_type, item.session.agent_type,
item.unread_messages, item.handoff_backlog,
item.session.task item.session.task
)?; )?;
} }
@@ -2404,4 +2406,59 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn team_status_reports_handoff_backlog_not_generic_inbox_noise() -> Result<()> {
let tempdir = TestDir::new("manager-team-status-backlog")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
pid: Some(42),
worktree: None,
created_at: now - Duration::minutes(4),
updated_at: now - Duration::minutes(4),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "worker".to_string(),
task: "delegate task".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root,
state: SessionState::Idle,
pid: None,
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.send_message("lead", "worker", "FYI status update", "info")?;
db.send_message(
"lead",
"worker",
"{\"task\":\"Delegated work\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)?;
let _ = db.mark_messages_read("worker")?;
db.send_message("lead", "worker", "FYI reminder", "info")?;
let status = get_team_status(&db, "lead", 3)?;
let rendered = format!("{status}");
assert!(rendered.contains("Backlog: 0"));
assert!(rendered.contains("| backlog 0 handoff(s) |"));
assert!(!rendered.contains("Inbox:"));
Ok(())
}
} }

View File

@@ -104,7 +104,7 @@ struct AggregateUsage {
struct DelegatedChildSummary { struct DelegatedChildSummary {
session_id: String, session_id: String,
state: SessionState, state: SessionState,
unread_messages: usize, handoff_backlog: usize,
} }
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
@@ -1297,11 +1297,6 @@ impl Dashboard {
match self.db.get_session(&child_id) { match self.db.get_session(&child_id) {
Ok(Some(session)) => { Ok(Some(session)) => {
team.total += 1; team.total += 1;
let unread_messages = self
.unread_message_counts
.get(&child_id)
.copied()
.unwrap_or(0);
let handoff_backlog = match self.db.unread_task_handoff_count(&child_id) { let handoff_backlog = match self.db.unread_task_handoff_count(&child_id) {
Ok(count) => count, Ok(count) => count,
Err(error) => { Err(error) => {
@@ -1323,12 +1318,12 @@ impl Dashboard {
} }
route_candidates.push(DelegatedChildSummary { route_candidates.push(DelegatedChildSummary {
unread_messages: handoff_backlog, handoff_backlog,
state: state.clone(), state: state.clone(),
session_id: child_id.clone(), session_id: child_id.clone(),
}); });
delegated.push(DelegatedChildSummary { delegated.push(DelegatedChildSummary {
unread_messages, handoff_backlog,
state, state,
session_id: child_id, session_id: child_id,
}); });
@@ -1365,7 +1360,7 @@ impl Dashboard {
) -> Option<String> { ) -> Option<String> {
if let Some(idle_clear) = delegates if let Some(idle_clear) = delegates
.iter() .iter()
.filter(|delegate| delegate.state == SessionState::Idle && delegate.unread_messages == 0) .filter(|delegate| delegate.state == SessionState::Idle && delegate.handoff_backlog == 0)
.min_by_key(|delegate| delegate.session_id.as_str()) .min_by_key(|delegate| delegate.session_id.as_str())
{ {
return Some(format!( return Some(format!(
@@ -1381,24 +1376,24 @@ impl Dashboard {
if let Some(idle_backed_up) = delegates if let Some(idle_backed_up) = delegates
.iter() .iter()
.filter(|delegate| delegate.state == SessionState::Idle) .filter(|delegate| delegate.state == SessionState::Idle)
.min_by_key(|delegate| (delegate.unread_messages, delegate.session_id.as_str())) .min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str()))
{ {
return Some(format!( return Some(format!(
"reuse idle {} with inbox {}", "reuse idle {} with backlog {}",
format_session_id(&idle_backed_up.session_id), format_session_id(&idle_backed_up.session_id),
idle_backed_up.unread_messages idle_backed_up.handoff_backlog
)); ));
} }
if let Some(active_delegate) = delegates if let Some(active_delegate) = delegates
.iter() .iter()
.filter(|delegate| matches!(delegate.state, SessionState::Running | SessionState::Pending)) .filter(|delegate| matches!(delegate.state, SessionState::Running | SessionState::Pending))
.min_by_key(|delegate| (delegate.unread_messages, delegate.session_id.as_str())) .min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str()))
{ {
return Some(format!( return Some(format!(
"reuse active {} with inbox {}", "reuse active {} with backlog {}",
format_session_id(&active_delegate.session_id), format_session_id(&active_delegate.session_id),
active_delegate.unread_messages active_delegate.handoff_backlog
)); ));
} }
@@ -1588,10 +1583,10 @@ impl Dashboard {
lines.push("Delegates".to_string()); lines.push("Delegates".to_string());
for child in &self.selected_child_sessions { for child in &self.selected_child_sessions {
lines.push(format!( lines.push(format!(
"- {} [{}] | inbox {}", "- {} [{}] | backlog {}",
format_session_id(&child.session_id), format_session_id(&child.session_id),
session_state_label(&child.state), session_state_label(&child.state),
child.unread_messages child.handoff_backlog
)); ));
} }
} }
@@ -2422,7 +2417,7 @@ mod tests {
Some("reuse idle idle-wor") Some("reuse idle idle-wor")
); );
assert_eq!(dashboard.selected_child_sessions.len(), 1); assert_eq!(dashboard.selected_child_sessions.len(), 1);
assert_eq!(dashboard.selected_child_sessions[0].unread_messages, 1); assert_eq!(dashboard.selected_child_sessions[0].handoff_backlog, 0);
} }
#[test] #[test]