9 Commits

Author SHA1 Message Date
Affaan Mustafa
9d766af025 docs: align ecc2 operator backlog language 2026-04-08 03:56:40 -07:00
Affaan Mustafa
2fba71fcdb feat: align ecc2 delegate backlog semantics 2026-04-08 03:55:03 -07:00
Affaan Mustafa
63c437b986 feat: align ecc2 backlog surfaces 2026-04-08 03:51:17 -07:00
Affaan Mustafa
3199120abe feat: route ecc2 by handoff backlog 2026-04-08 03:47:11 -07:00
Affaan Mustafa
478466168a feat: calm ecc2 stabilized attention 2026-04-08 03:43:46 -07:00
Affaan Mustafa
cf7d3ae584 feat: quiet ecc2 stabilized telemetry 2026-04-08 03:41:48 -07:00
Affaan Mustafa
051d47eb5f feat: relax ecc2 stabilized cycles 2026-04-08 03:40:26 -07:00
Affaan Mustafa
40ed9c7f6a feat: surface ecc2 stabilized mode 2026-04-08 03:37:48 -07:00
Affaan Mustafa
09f6bc3166 feat: surface ecc2 recovery events 2026-04-08 03:35:16 -07:00
4 changed files with 545 additions and 75 deletions

View File

@@ -177,6 +177,12 @@ where
}
let first_dispatch = dispatch().await?;
if prior_activity.stabilized_after_recovery_at().is_some() && first_dispatch.deferred == 0 {
tracing::info!(
"Skipping rebalance because stabilized dispatch cycle has no deferred handoffs"
);
return Ok((first_dispatch, 0, DispatchPassSummary::default()));
}
let rebalanced = rebalance().await?;
let recovery_dispatch = if first_dispatch.deferred > 0 && rebalanced > 0 {
let recovery = dispatch().await?;
@@ -796,6 +802,56 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn coordinate_backlog_cycle_skips_rebalance_when_stabilized_and_dispatch_is_healthy() -> Result<()> {
let cfg = Config {
auto_dispatch_unread_handoffs: true,
..Config::default()
};
let now = chrono::Utc::now();
let activity = DaemonActivity {
last_dispatch_at: Some(now + chrono::Duration::seconds(2)),
last_dispatch_routed: 2,
last_dispatch_deferred: 0,
last_dispatch_leads: 1,
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
last_recovery_dispatch_routed: 1,
last_recovery_dispatch_leads: 1,
last_rebalance_at: Some(now),
last_rebalance_rerouted: 1,
last_rebalance_leads: 1,
};
let rebalance_calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let rebalance_calls_clone = rebalance_calls.clone();
let (first, rebalanced, recovery) = coordinate_backlog_cycle_with(
&cfg,
&activity,
|| async move {
Ok(DispatchPassSummary {
routed: 1,
deferred: 0,
leads: 1,
})
},
move || {
let rebalance_calls_clone = rebalance_calls_clone.clone();
async move {
rebalance_calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(1)
}
},
|_, _| Ok(()),
)
.await?;
assert_eq!(first.routed, 1);
assert_eq!(rebalanced, 0);
assert_eq!(recovery, DispatchPassSummary::default());
assert_eq!(rebalance_calls.load(std::sync::atomic::Ordering::SeqCst), 0);
Ok(())
}
#[tokio::test]
async fn maybe_auto_rebalance_noops_when_disabled() -> Result<()> {
let path = temp_db_path();

View File

@@ -42,7 +42,10 @@ pub fn get_status(db: &StateStore, id: &str) -> Result<SessionStatus> {
pub fn get_team_status(db: &StateStore, id: &str, depth: usize) -> Result<TeamStatus> {
let root = resolve_session(db, id)?;
let unread_counts = db.unread_message_counts()?;
let handoff_backlog = db
.unread_task_handoff_targets(db.list_sessions()?.len().max(1))?
.into_iter()
.collect();
let mut visited = HashSet::new();
visited.insert(root.id.clone());
@@ -52,14 +55,14 @@ pub fn get_team_status(db: &StateStore, id: &str, depth: usize) -> Result<TeamSt
&root.id,
depth,
1,
&unread_counts,
&handoff_backlog,
&mut visited,
&mut descendants,
)?;
Ok(TeamStatus {
root,
unread_messages: unread_counts,
handoff_backlog,
descendants,
})
}
@@ -428,13 +431,23 @@ async fn assign_session_in_dir_with_runner_program(
) -> Result<AssignmentOutcome> {
let lead = resolve_session(db, lead_id)?;
let delegates = direct_delegate_sessions(db, &lead.id, agent_type)?;
let unread_counts = db.unread_message_counts()?;
let delegate_handoff_backlog = delegates
.iter()
.map(|session| {
db.unread_task_handoff_count(&session.id)
.map(|count| (session.id.clone(), count))
})
.collect::<Result<std::collections::HashMap<_, _>>>()?;
if let Some(idle_delegate) = delegates
.iter()
.filter(|session| {
session.state == SessionState::Idle
&& unread_counts.get(&session.id).copied().unwrap_or(0) == 0
&& delegate_handoff_backlog
.get(&session.id)
.copied()
.unwrap_or(0)
== 0
})
.min_by_key(|session| session.updated_at)
{
@@ -468,7 +481,10 @@ async fn assign_session_in_dir_with_runner_program(
.filter(|session| session.state == SessionState::Idle)
.min_by_key(|session| {
(
unread_counts.get(&session.id).copied().unwrap_or(0),
delegate_handoff_backlog
.get(&session.id)
.copied()
.unwrap_or(0),
session.updated_at,
)
})
@@ -484,12 +500,20 @@ async fn assign_session_in_dir_with_runner_program(
.filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending))
.min_by_key(|session| {
(
unread_counts.get(&session.id).copied().unwrap_or(0),
delegate_handoff_backlog
.get(&session.id)
.copied()
.unwrap_or(0),
session.updated_at,
)
})
{
if unread_counts.get(&active_delegate.id).copied().unwrap_or(0) > 0 {
if delegate_handoff_backlog
.get(&active_delegate.id)
.copied()
.unwrap_or(0)
> 0
{
return Ok(AssignmentOutcome {
session_id: lead.id.clone(),
action: AssignmentAction::DeferredSaturated,
@@ -531,7 +555,7 @@ fn collect_delegation_descendants(
session_id: &str,
remaining_depth: usize,
current_depth: usize,
unread_counts: &std::collections::HashMap<String, usize>,
handoff_backlog: &std::collections::HashMap<String, usize>,
visited: &mut HashSet<String>,
descendants: &mut Vec<DelegatedSessionSummary>,
) -> Result<()> {
@@ -550,7 +574,7 @@ fn collect_delegation_descendants(
descendants.push(DelegatedSessionSummary {
depth: current_depth,
unread_messages: unread_counts.get(&child_id).copied().unwrap_or(0),
handoff_backlog: handoff_backlog.get(&child_id).copied().unwrap_or(0),
session,
});
@@ -559,7 +583,7 @@ fn collect_delegation_descendants(
&child_id,
remaining_depth.saturating_sub(1),
current_depth + 1,
unread_counts,
handoff_backlog,
visited,
descendants,
)?;
@@ -822,14 +846,13 @@ fn summarize_backlog_pressure(
agent_type: &str,
targets: &[(String, usize)],
) -> Result<BacklogPressureSummary> {
let unread_counts = db.unread_message_counts()?;
let mut summary = BacklogPressureSummary::default();
for (session_id, _) in targets {
let delegates = direct_delegate_sessions(db, session_id, agent_type)?;
let has_clear_idle_delegate = delegates.iter().any(|delegate| {
delegate.state == SessionState::Idle
&& unread_counts.get(&delegate.id).copied().unwrap_or(0) == 0
&& db.unread_task_handoff_count(&delegate.id).unwrap_or(0) == 0
});
let has_capacity = delegates.len() < cfg.max_parallel_sessions;
@@ -1027,7 +1050,7 @@ pub struct SessionStatus {
pub struct TeamStatus {
root: Session,
unread_messages: std::collections::HashMap<String, usize>,
handoff_backlog: std::collections::HashMap<String, usize>,
descendants: Vec<DelegatedSessionSummary>,
}
@@ -1091,7 +1114,7 @@ struct BacklogPressureSummary {
struct DelegatedSessionSummary {
depth: usize,
unread_messages: usize,
handoff_backlog: usize,
session: Session,
}
@@ -1133,8 +1156,8 @@ impl fmt::Display for TeamStatus {
writeln!(f, "Branch: {}", worktree.branch)?;
}
let lead_unread = self.unread_messages.get(&self.root.id).copied().unwrap_or(0);
writeln!(f, "Inbox: {}", lead_unread)?;
let lead_handoff_backlog = self.handoff_backlog.get(&self.root.id).copied().unwrap_or(0);
writeln!(f, "Backlog: {}", lead_handoff_backlog)?;
if self.descendants.is_empty() {
return write!(f, "Board: no delegated sessions");
@@ -1164,11 +1187,11 @@ impl fmt::Display for TeamStatus {
for item in items {
writeln!(
f,
" - {}{} [{}] | inbox {} | {}",
" - {}{} [{}] | backlog {} handoff(s) | {}",
" ".repeat(item.depth.saturating_sub(1)),
item.session.id,
item.session.agent_type,
item.unread_messages,
item.handoff_backlog,
item.session.task
)?;
}
@@ -1798,6 +1821,74 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn assign_session_reuses_idle_delegate_when_only_non_handoff_messages_are_unread() -> Result<()> {
let tempdir = TestDir::new("manager-assign-reuse-idle-info-inbox")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
pid: Some(42),
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "idle-worker".to_string(),
task: "old worker task".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Idle,
pid: Some(99),
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.send_message(
"lead",
"idle-worker",
"{\"task\":\"old worker task\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)?;
db.mark_messages_read("idle-worker")?;
db.send_message("lead", "idle-worker", "FYI status update", "info")?;
let (fake_runner, _) = write_fake_claude(tempdir.path())?;
let outcome = assign_session_in_dir_with_runner_program(
&db,
&cfg,
"lead",
"Fresh delegated task",
"claude",
true,
&repo_root,
&fake_runner,
)
.await?;
assert_eq!(outcome.action, AssignmentAction::ReusedIdle);
assert_eq!(outcome.session_id, "idle-worker");
let idle_messages = db.list_messages_for_session("idle-worker", 10)?;
assert!(idle_messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("Fresh delegated task")
}));
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn assign_session_spawns_when_team_has_capacity() -> Result<()> {
let tempdir = TestDir::new("manager-assign-spawn")?;
@@ -2315,4 +2406,59 @@ mod tests {
Ok(())
}
#[test]
fn team_status_reports_handoff_backlog_not_generic_inbox_noise() -> Result<()> {
let tempdir = TestDir::new("manager-team-status-backlog")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
pid: Some(42),
worktree: None,
created_at: now - Duration::minutes(4),
updated_at: now - Duration::minutes(4),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "worker".to_string(),
task: "delegate task".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root,
state: SessionState::Idle,
pid: None,
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.send_message("lead", "worker", "FYI status update", "info")?;
db.send_message(
"lead",
"worker",
"{\"task\":\"Delegated work\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)?;
let _ = db.mark_messages_read("worker")?;
db.send_message("lead", "worker", "FYI reminder", "info")?;
let status = get_team_status(&db, "lead", 3)?;
let rendered = format!("{status}");
assert!(rendered.contains("Backlog: 0"));
assert!(rendered.contains("| backlog 0 handoff(s) |"));
assert!(!rendered.contains("Inbox:"));
Ok(())
}
}

View File

@@ -46,6 +46,38 @@ impl DaemonActivity {
pub fn dispatch_cooloff_active(&self) -> bool {
self.prefers_rebalance_first() && self.last_dispatch_deferred >= 2
}
pub fn chronic_saturation_cleared_at(
&self,
) -> Option<&chrono::DateTime<chrono::Utc>> {
if self.prefers_rebalance_first() {
return None;
}
match (
self.last_dispatch_at.as_ref(),
self.last_recovery_dispatch_at.as_ref(),
) {
(Some(dispatch_at), Some(recovery_at)) if recovery_at > dispatch_at => Some(recovery_at),
_ => None,
}
}
pub fn stabilized_after_recovery_at(
&self,
) -> Option<&chrono::DateTime<chrono::Utc>> {
if self.last_dispatch_deferred != 0 {
return None;
}
match (
self.last_dispatch_at.as_ref(),
self.last_recovery_dispatch_at.as_ref(),
) {
(Some(dispatch_at), Some(recovery_at)) if dispatch_at > recovery_at => Some(dispatch_at),
_ => None,
}
}
}
impl StateStore {
@@ -523,6 +555,19 @@ impl StateStore {
.map_err(Into::into)
}
pub fn unread_task_handoff_count(&self, session_id: &str) -> Result<usize> {
self.conn
.query_row(
"SELECT COUNT(*)
FROM messages
WHERE to_session = ?1 AND msg_type = 'task_handoff' AND read = 0",
rusqlite::params![session_id],
|row| row.get::<_, i64>(0),
)
.map(|count| count as usize)
.map_err(Into::into)
}
pub fn unread_task_handoff_targets(&self, limit: usize) -> Result<Vec<(String, usize)>> {
let mut stmt = self.conn.prepare(
"SELECT to_session, COUNT(*) as unread_count
@@ -1068,6 +1113,8 @@ mod tests {
let clear = DaemonActivity::default();
assert!(!clear.prefers_rebalance_first());
assert!(!clear.dispatch_cooloff_active());
assert!(clear.chronic_saturation_cleared_at().is_none());
assert!(clear.stabilized_after_recovery_at().is_none());
let unresolved = DaemonActivity {
last_dispatch_at: Some(now),
@@ -1083,6 +1130,8 @@ mod tests {
};
assert!(unresolved.prefers_rebalance_first());
assert!(unresolved.dispatch_cooloff_active());
assert!(unresolved.chronic_saturation_cleared_at().is_none());
assert!(unresolved.stabilized_after_recovery_at().is_none());
let recovered = DaemonActivity {
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
@@ -1091,5 +1140,25 @@ mod tests {
};
assert!(!recovered.prefers_rebalance_first());
assert!(!recovered.dispatch_cooloff_active());
assert_eq!(
recovered.chronic_saturation_cleared_at(),
recovered.last_recovery_dispatch_at.as_ref()
);
assert!(recovered.stabilized_after_recovery_at().is_none());
let stabilized = DaemonActivity {
last_dispatch_at: Some(now + chrono::Duration::seconds(2)),
last_dispatch_routed: 2,
last_dispatch_deferred: 0,
last_dispatch_leads: 1,
..recovered
};
assert!(!stabilized.prefers_rebalance_first());
assert!(!stabilized.dispatch_cooloff_active());
assert!(stabilized.chronic_saturation_cleared_at().is_none());
assert_eq!(
stabilized.stabilized_after_recovery_at(),
stabilized.last_dispatch_at.as_ref()
);
}
}

View File

@@ -40,6 +40,7 @@ pub struct Dashboard {
sessions: Vec<Session>,
session_output_cache: HashMap<String, Vec<OutputLine>>,
unread_message_counts: HashMap<String, usize>,
handoff_backlog_counts: HashMap<String, usize>,
global_handoff_backlog_leads: usize,
global_handoff_backlog_messages: usize,
daemon_activity: DaemonActivity,
@@ -103,7 +104,7 @@ struct AggregateUsage {
struct DelegatedChildSummary {
session_id: String,
state: SessionState,
unread_messages: usize,
handoff_backlog: usize,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
@@ -141,6 +142,7 @@ impl Dashboard {
sessions,
session_output_cache: HashMap::new(),
unread_message_counts: HashMap::new(),
handoff_backlog_counts: HashMap::new(),
global_handoff_backlog_leads: 0,
global_handoff_backlog_messages: 0,
daemon_activity: DaemonActivity::default(),
@@ -162,6 +164,7 @@ impl Dashboard {
session_table_state,
};
dashboard.unread_message_counts = dashboard.db.unread_message_counts().unwrap_or_default();
dashboard.sync_handoff_backlog_counts();
dashboard.sync_global_handoff_backlog();
dashboard.sync_selected_output();
dashboard.sync_selected_diff();
@@ -241,27 +244,32 @@ impl Dashboard {
return;
}
let summary = SessionSummary::from_sessions(&self.sessions, &self.unread_message_counts);
let stabilized = self.daemon_activity.stabilized_after_recovery_at().is_some();
let summary =
SessionSummary::from_sessions(&self.sessions, &self.handoff_backlog_counts, stabilized);
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Length(2), Constraint::Min(3)])
.split(inner_area);
frame.render_widget(
Paragraph::new(vec![summary_line(&summary), attention_queue_line(&summary)]),
Paragraph::new(vec![
summary_line(&summary),
attention_queue_line(&summary, stabilized),
]),
chunks[0],
);
let rows = self.sessions.iter().map(|session| {
session_row(
session,
self.unread_message_counts
self.handoff_backlog_counts
.get(&session.id)
.copied()
.unwrap_or(0),
)
});
let header = Row::new(["ID", "Agent", "State", "Branch", "Inbox", "Tokens", "Duration"])
let header = Row::new(["ID", "Agent", "State", "Branch", "Backlog", "Tokens", "Duration"])
.style(Style::default().add_modifier(Modifier::BOLD));
let widths = [
Constraint::Length(8),
@@ -453,9 +461,9 @@ impl Dashboard {
"",
" n New session",
" a Assign follow-up work from selected session",
" b Rebalance backed-up delegate inboxes for selected lead",
" B Rebalance backed-up delegate inboxes across lead teams",
" i Drain unread task handoffs from selected session inbox",
" b Rebalance backed-up delegate handoff backlog for selected lead",
" B Rebalance backed-up delegate handoff backlog across lead teams",
" i Drain unread task handoffs from selected lead",
" g Auto-dispatch unread handoffs across lead sessions",
" G Dispatch then rebalance backlog across lead teams",
" p Toggle daemon auto-dispatch policy and persist config",
@@ -1130,6 +1138,7 @@ impl Dashboard {
HashMap::new()
}
};
self.sync_handoff_backlog_counts();
self.sync_global_handoff_backlog();
self.sync_daemon_activity();
self.sync_selection_by_id(selected_id.as_deref());
@@ -1182,6 +1191,19 @@ impl Dashboard {
}
}
fn sync_handoff_backlog_counts(&mut self) {
let limit = self.sessions.len().max(1);
self.handoff_backlog_counts.clear();
match self.db.unread_task_handoff_targets(limit) {
Ok(targets) => {
self.handoff_backlog_counts.extend(targets);
}
Err(error) => {
tracing::warn!("Failed to refresh handoff backlog counts: {error}");
}
}
}
fn sync_daemon_activity(&mut self) {
self.daemon_activity = match self.db.daemon_activity() {
Ok(activity) => activity,
@@ -1275,11 +1297,16 @@ impl Dashboard {
match self.db.get_session(&child_id) {
Ok(Some(session)) => {
team.total += 1;
let unread_messages = self
.unread_message_counts
.get(&child_id)
.copied()
.unwrap_or(0);
let handoff_backlog = match self.db.unread_task_handoff_count(&child_id) {
Ok(count) => count,
Err(error) => {
tracing::warn!(
"Failed to load delegated child handoff backlog {}: {error}",
child_id
);
0
}
};
let state = session.state.clone();
match state {
SessionState::Idle => team.idle += 1,
@@ -1291,12 +1318,12 @@ impl Dashboard {
}
route_candidates.push(DelegatedChildSummary {
unread_messages,
handoff_backlog,
state: state.clone(),
session_id: child_id.clone(),
});
delegated.push(DelegatedChildSummary {
unread_messages,
handoff_backlog,
state,
session_id: child_id,
});
@@ -1333,7 +1360,7 @@ impl Dashboard {
) -> Option<String> {
if let Some(idle_clear) = delegates
.iter()
.filter(|delegate| delegate.state == SessionState::Idle && delegate.unread_messages == 0)
.filter(|delegate| delegate.state == SessionState::Idle && delegate.handoff_backlog == 0)
.min_by_key(|delegate| delegate.session_id.as_str())
{
return Some(format!(
@@ -1349,24 +1376,24 @@ impl Dashboard {
if let Some(idle_backed_up) = delegates
.iter()
.filter(|delegate| delegate.state == SessionState::Idle)
.min_by_key(|delegate| (delegate.unread_messages, delegate.session_id.as_str()))
.min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str()))
{
return Some(format!(
"reuse idle {} with inbox {}",
"reuse idle {} with backlog {}",
format_session_id(&idle_backed_up.session_id),
idle_backed_up.unread_messages
idle_backed_up.handoff_backlog
));
}
if let Some(active_delegate) = delegates
.iter()
.filter(|delegate| matches!(delegate.state, SessionState::Running | SessionState::Pending))
.min_by_key(|delegate| (delegate.unread_messages, delegate.session_id.as_str()))
.min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str()))
{
return Some(format!(
"reuse active {} with inbox {}",
"reuse active {} with backlog {}",
format_session_id(&active_delegate.session_id),
active_delegate.unread_messages
active_delegate.handoff_backlog
));
}
@@ -1487,17 +1514,35 @@ impl Dashboard {
self.cfg.auto_dispatch_limit_per_session
));
let stabilized = self.daemon_activity.stabilized_after_recovery_at();
lines.push(format!(
"Coordination mode {}",
if self.daemon_activity.dispatch_cooloff_active() {
"rebalance-cooloff (chronic saturation)"
} else if self.daemon_activity.prefers_rebalance_first() {
"rebalance-first (chronic saturation)"
} else if stabilized.is_some() {
"dispatch-first (stabilized)"
} else {
"dispatch-first"
}
));
if let Some(cleared_at) = self.daemon_activity.chronic_saturation_cleared_at() {
lines.push(format!(
"Chronic saturation cleared @ {}",
self.short_timestamp(&cleared_at.to_rfc3339())
));
}
if let Some(stabilized_at) = stabilized {
lines.push(format!(
"Recovery stabilized @ {}",
self.short_timestamp(&stabilized_at.to_rfc3339())
));
}
if let Some(last_dispatch_at) = self.daemon_activity.last_dispatch_at.as_ref() {
lines.push(format!(
"Last daemon dispatch {} routed / {} deferred across {} lead(s) @ {}",
@@ -1508,24 +1553,26 @@ impl Dashboard {
));
}
if let Some(last_recovery_dispatch_at) =
self.daemon_activity.last_recovery_dispatch_at.as_ref()
{
lines.push(format!(
"Last daemon recovery dispatch {} handoff(s) across {} lead(s) @ {}",
self.daemon_activity.last_recovery_dispatch_routed,
self.daemon_activity.last_recovery_dispatch_leads,
self.short_timestamp(&last_recovery_dispatch_at.to_rfc3339())
));
}
if stabilized.is_none() {
if let Some(last_recovery_dispatch_at) =
self.daemon_activity.last_recovery_dispatch_at.as_ref()
{
lines.push(format!(
"Last daemon recovery dispatch {} handoff(s) across {} lead(s) @ {}",
self.daemon_activity.last_recovery_dispatch_routed,
self.daemon_activity.last_recovery_dispatch_leads,
self.short_timestamp(&last_recovery_dispatch_at.to_rfc3339())
));
}
if let Some(last_rebalance_at) = self.daemon_activity.last_rebalance_at.as_ref() {
lines.push(format!(
"Last daemon rebalance {} handoff(s) across {} lead(s) @ {}",
self.daemon_activity.last_rebalance_rerouted,
self.daemon_activity.last_rebalance_leads,
self.short_timestamp(&last_rebalance_at.to_rfc3339())
));
if let Some(last_rebalance_at) = self.daemon_activity.last_rebalance_at.as_ref() {
lines.push(format!(
"Last daemon rebalance {} handoff(s) across {} lead(s) @ {}",
self.daemon_activity.last_rebalance_rerouted,
self.daemon_activity.last_rebalance_leads,
self.short_timestamp(&last_rebalance_at.to_rfc3339())
));
}
}
if let Some(route_preview) = self.selected_route_preview.as_ref() {
@@ -1536,10 +1583,10 @@ impl Dashboard {
lines.push("Delegates".to_string());
for child in &self.selected_child_sessions {
lines.push(format!(
"- {} [{}] | inbox {}",
"- {} [{}] | backlog {}",
format_session_id(&child.session_id),
session_state_label(&child.state),
child.unread_messages
child.handoff_backlog
));
}
}
@@ -1575,7 +1622,7 @@ impl Dashboard {
lines.push(String::new());
if self.selected_messages.is_empty() {
lines.push("Inbox clear".to_string());
lines.push("Message inbox clear".to_string());
} else {
lines.push("Recent messages:".to_string());
let recent = self
@@ -1637,18 +1684,19 @@ impl Dashboard {
fn attention_queue_items(&self, limit: usize) -> Vec<String> {
let mut items = Vec::new();
let suppress_inbox_attention = self.daemon_activity.stabilized_after_recovery_at().is_some();
for session in &self.sessions {
let unread = self
.unread_message_counts
let handoff_backlog = self
.handoff_backlog_counts
.get(&session.id)
.copied()
.unwrap_or(0);
if unread > 0 {
if handoff_backlog > 0 && !suppress_inbox_attention {
items.push(format!(
"- Inbox {} | {} unread | {}",
"- Backlog {} | {} handoff(s) | {}",
format_session_id(&session.id),
unread,
handoff_backlog,
truncate_for_dashboard(&session.task, 40)
));
}
@@ -1850,12 +1898,24 @@ impl Pane {
}
impl SessionSummary {
fn from_sessions(sessions: &[Session], unread_message_counts: &HashMap<String, usize>) -> Self {
fn from_sessions(
sessions: &[Session],
unread_message_counts: &HashMap<String, usize>,
suppress_inbox_attention: bool,
) -> Self {
sessions.iter().fold(
Self {
total: sessions.len(),
unread_messages: unread_message_counts.values().sum(),
inbox_sessions: unread_message_counts.values().filter(|count| **count > 0).count(),
unread_messages: if suppress_inbox_attention {
0
} else {
unread_message_counts.values().sum()
},
inbox_sessions: if suppress_inbox_attention {
0
} else {
unread_message_counts.values().filter(|count| **count > 0).count()
},
..Self::default()
},
|mut summary, session| {
@@ -1922,7 +1982,7 @@ fn summary_span(label: &str, value: usize, color: Color) -> Span<'static> {
)
}
fn attention_queue_line(summary: &SessionSummary) -> Line<'static> {
fn attention_queue_line(summary: &SessionSummary, stabilized: bool) -> Line<'static> {
if summary.failed == 0
&& summary.stopped == 0
&& summary.pending == 0
@@ -1933,7 +1993,11 @@ fn attention_queue_line(summary: &SessionSummary) -> Line<'static> {
"Attention queue clear",
Style::default().fg(Color::Green).add_modifier(Modifier::BOLD),
),
Span::raw(" no failed, stopped, or pending sessions"),
Span::raw(if stabilized {
" stabilized backlog absorbed"
} else {
" no failed, stopped, or pending sessions"
}),
]);
}
@@ -1942,7 +2006,7 @@ fn attention_queue_line(summary: &SessionSummary) -> Line<'static> {
"Attention queue ",
Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD),
),
summary_span("Inbox", summary.unread_messages, Color::Magenta),
summary_span("Backlog", summary.unread_messages, Color::Magenta),
summary_span("Failed", summary.failed, Color::Red),
summary_span("Stopped", summary.stopped, Color::DarkGray),
summary_span("Pending", summary.pending, Color::Yellow),
@@ -2131,6 +2195,7 @@ mod tests {
#[test]
fn selected_session_metrics_text_includes_daemon_activity() {
let now = Utc::now();
let mut dashboard = test_dashboard(
vec![sample_session(
"focus-12345678",
@@ -2143,20 +2208,21 @@ mod tests {
0,
);
dashboard.daemon_activity = DaemonActivity {
last_dispatch_at: Some(Utc::now()),
last_dispatch_at: Some(now),
last_dispatch_routed: 4,
last_dispatch_deferred: 2,
last_dispatch_leads: 2,
last_recovery_dispatch_at: Some(Utc::now()),
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
last_recovery_dispatch_routed: 1,
last_recovery_dispatch_leads: 1,
last_rebalance_at: Some(Utc::now()),
last_rebalance_at: Some(now + chrono::Duration::seconds(2)),
last_rebalance_rerouted: 1,
last_rebalance_leads: 1,
};
let text = dashboard.selected_session_metrics_text();
assert!(text.contains("Coordination mode dispatch-first"));
assert!(text.contains("Chronic saturation cleared @"));
assert!(text.contains("Last daemon dispatch 4 routed / 2 deferred across 2 lead(s)"));
assert!(text.contains("Last daemon recovery dispatch 1 handoff(s) across 1 lead(s)"));
assert!(text.contains("Last daemon rebalance 1 handoff(s) across 1 lead(s)"));
@@ -2222,6 +2288,138 @@ mod tests {
assert!(text.contains("Coordination mode rebalance-cooloff (chronic saturation)"));
}
#[test]
fn selected_session_metrics_text_shows_stabilized_dispatch_mode_after_recovery() {
let now = Utc::now();
let mut dashboard = test_dashboard(
vec![sample_session(
"focus-12345678",
"planner",
SessionState::Running,
Some("ecc/focus"),
512,
42,
)],
0,
);
dashboard.daemon_activity = DaemonActivity {
last_dispatch_at: Some(now + chrono::Duration::seconds(2)),
last_dispatch_routed: 2,
last_dispatch_deferred: 0,
last_dispatch_leads: 1,
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
last_recovery_dispatch_routed: 1,
last_recovery_dispatch_leads: 1,
last_rebalance_at: Some(now),
last_rebalance_rerouted: 1,
last_rebalance_leads: 1,
};
let text = dashboard.selected_session_metrics_text();
assert!(text.contains("Coordination mode dispatch-first (stabilized)"));
assert!(text.contains("Recovery stabilized @"));
assert!(!text.contains("Last daemon recovery dispatch"));
assert!(!text.contains("Last daemon rebalance"));
}
#[test]
fn attention_queue_suppresses_inbox_pressure_when_stabilized() {
let now = Utc::now();
let sessions = vec![sample_session(
"focus-12345678",
"planner",
SessionState::Running,
Some("ecc/focus"),
512,
42,
)];
let unread = HashMap::from([(String::from("focus-12345678"), 3usize)]);
let summary = SessionSummary::from_sessions(&sessions, &unread, true);
let line = attention_queue_line(&summary, true);
let rendered = line
.spans
.iter()
.map(|span| span.content.as_ref())
.collect::<String>();
assert!(rendered.contains("Attention queue clear"));
assert!(rendered.contains("stabilized backlog absorbed"));
let mut dashboard = test_dashboard(sessions, 0);
dashboard.unread_message_counts = unread;
dashboard.handoff_backlog_counts = HashMap::from([(String::from("focus-12345678"), 3usize)]);
dashboard.daemon_activity = DaemonActivity {
last_dispatch_at: Some(now + chrono::Duration::seconds(2)),
last_dispatch_routed: 2,
last_dispatch_deferred: 0,
last_dispatch_leads: 1,
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
last_recovery_dispatch_routed: 1,
last_recovery_dispatch_leads: 1,
last_rebalance_at: Some(now),
last_rebalance_rerouted: 1,
last_rebalance_leads: 1,
};
let text = dashboard.selected_session_metrics_text();
assert!(text.contains("Attention queue clear"));
assert!(!text.contains("Needs attention:"));
assert!(!text.contains("Backlog focus-12"));
}
#[test]
fn route_preview_ignores_non_handoff_inbox_noise() {
let lead = sample_session(
"lead-12345678",
"planner",
SessionState::Running,
Some("ecc/lead"),
512,
42,
);
let idle_worker = sample_session(
"idle-worker",
"planner",
SessionState::Idle,
Some("ecc/idle"),
128,
12,
);
let mut dashboard = test_dashboard(vec![lead.clone(), idle_worker.clone()], 0);
dashboard.db.insert_session(&lead).unwrap();
dashboard.db.insert_session(&idle_worker).unwrap();
dashboard
.db
.send_message("lead-12345678", "idle-worker", "FYI status update", "info")
.unwrap();
dashboard
.db
.send_message(
"lead-12345678",
"idle-worker",
"{\"task\":\"Delegated work\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)
.unwrap();
dashboard.db.mark_messages_read("idle-worker").unwrap();
dashboard
.db
.send_message("lead-12345678", "idle-worker", "FYI status update", "info")
.unwrap();
dashboard.unread_message_counts = dashboard.db.unread_message_counts().unwrap();
dashboard.sync_selected_lineage();
assert_eq!(
dashboard.selected_route_preview.as_deref(),
Some("reuse idle idle-wor")
);
assert_eq!(dashboard.selected_child_sessions.len(), 1);
assert_eq!(dashboard.selected_child_sessions[0].handoff_backlog, 0);
}
#[test]
fn aggregate_cost_summary_mentions_total_cost() {
let db = StateStore::open(Path::new(":memory:")).unwrap();
@@ -2722,6 +2920,7 @@ mod tests {
sessions,
session_output_cache: HashMap::new(),
unread_message_counts: HashMap::new(),
handoff_backlog_counts: HashMap::new(),
global_handoff_backlog_leads: 0,
global_handoff_backlog_messages: 0,
daemon_activity: DaemonActivity::default(),