mirror of
https://github.com/affaan-m/everything-claude-code.git
synced 2026-05-18 06:43:05 +08:00
Compare commits
9 Commits
9952fcbd7c
...
9d766af025
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9d766af025 | ||
|
|
2fba71fcdb | ||
|
|
63c437b986 | ||
|
|
3199120abe | ||
|
|
478466168a | ||
|
|
cf7d3ae584 | ||
|
|
051d47eb5f | ||
|
|
40ed9c7f6a | ||
|
|
09f6bc3166 |
@@ -177,6 +177,12 @@ where
|
||||
}
|
||||
|
||||
let first_dispatch = dispatch().await?;
|
||||
if prior_activity.stabilized_after_recovery_at().is_some() && first_dispatch.deferred == 0 {
|
||||
tracing::info!(
|
||||
"Skipping rebalance because stabilized dispatch cycle has no deferred handoffs"
|
||||
);
|
||||
return Ok((first_dispatch, 0, DispatchPassSummary::default()));
|
||||
}
|
||||
let rebalanced = rebalance().await?;
|
||||
let recovery_dispatch = if first_dispatch.deferred > 0 && rebalanced > 0 {
|
||||
let recovery = dispatch().await?;
|
||||
@@ -796,6 +802,56 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn coordinate_backlog_cycle_skips_rebalance_when_stabilized_and_dispatch_is_healthy() -> Result<()> {
|
||||
let cfg = Config {
|
||||
auto_dispatch_unread_handoffs: true,
|
||||
..Config::default()
|
||||
};
|
||||
let now = chrono::Utc::now();
|
||||
let activity = DaemonActivity {
|
||||
last_dispatch_at: Some(now + chrono::Duration::seconds(2)),
|
||||
last_dispatch_routed: 2,
|
||||
last_dispatch_deferred: 0,
|
||||
last_dispatch_leads: 1,
|
||||
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
|
||||
last_recovery_dispatch_routed: 1,
|
||||
last_recovery_dispatch_leads: 1,
|
||||
last_rebalance_at: Some(now),
|
||||
last_rebalance_rerouted: 1,
|
||||
last_rebalance_leads: 1,
|
||||
};
|
||||
let rebalance_calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
|
||||
let rebalance_calls_clone = rebalance_calls.clone();
|
||||
|
||||
let (first, rebalanced, recovery) = coordinate_backlog_cycle_with(
|
||||
&cfg,
|
||||
&activity,
|
||||
|| async move {
|
||||
Ok(DispatchPassSummary {
|
||||
routed: 1,
|
||||
deferred: 0,
|
||||
leads: 1,
|
||||
})
|
||||
},
|
||||
move || {
|
||||
let rebalance_calls_clone = rebalance_calls_clone.clone();
|
||||
async move {
|
||||
rebalance_calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
Ok(1)
|
||||
}
|
||||
},
|
||||
|_, _| Ok(()),
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(first.routed, 1);
|
||||
assert_eq!(rebalanced, 0);
|
||||
assert_eq!(recovery, DispatchPassSummary::default());
|
||||
assert_eq!(rebalance_calls.load(std::sync::atomic::Ordering::SeqCst), 0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn maybe_auto_rebalance_noops_when_disabled() -> Result<()> {
|
||||
let path = temp_db_path();
|
||||
|
||||
@@ -42,7 +42,10 @@ pub fn get_status(db: &StateStore, id: &str) -> Result<SessionStatus> {
|
||||
|
||||
pub fn get_team_status(db: &StateStore, id: &str, depth: usize) -> Result<TeamStatus> {
|
||||
let root = resolve_session(db, id)?;
|
||||
let unread_counts = db.unread_message_counts()?;
|
||||
let handoff_backlog = db
|
||||
.unread_task_handoff_targets(db.list_sessions()?.len().max(1))?
|
||||
.into_iter()
|
||||
.collect();
|
||||
let mut visited = HashSet::new();
|
||||
visited.insert(root.id.clone());
|
||||
|
||||
@@ -52,14 +55,14 @@ pub fn get_team_status(db: &StateStore, id: &str, depth: usize) -> Result<TeamSt
|
||||
&root.id,
|
||||
depth,
|
||||
1,
|
||||
&unread_counts,
|
||||
&handoff_backlog,
|
||||
&mut visited,
|
||||
&mut descendants,
|
||||
)?;
|
||||
|
||||
Ok(TeamStatus {
|
||||
root,
|
||||
unread_messages: unread_counts,
|
||||
handoff_backlog,
|
||||
descendants,
|
||||
})
|
||||
}
|
||||
@@ -428,13 +431,23 @@ async fn assign_session_in_dir_with_runner_program(
|
||||
) -> Result<AssignmentOutcome> {
|
||||
let lead = resolve_session(db, lead_id)?;
|
||||
let delegates = direct_delegate_sessions(db, &lead.id, agent_type)?;
|
||||
let unread_counts = db.unread_message_counts()?;
|
||||
let delegate_handoff_backlog = delegates
|
||||
.iter()
|
||||
.map(|session| {
|
||||
db.unread_task_handoff_count(&session.id)
|
||||
.map(|count| (session.id.clone(), count))
|
||||
})
|
||||
.collect::<Result<std::collections::HashMap<_, _>>>()?;
|
||||
|
||||
if let Some(idle_delegate) = delegates
|
||||
.iter()
|
||||
.filter(|session| {
|
||||
session.state == SessionState::Idle
|
||||
&& unread_counts.get(&session.id).copied().unwrap_or(0) == 0
|
||||
&& delegate_handoff_backlog
|
||||
.get(&session.id)
|
||||
.copied()
|
||||
.unwrap_or(0)
|
||||
== 0
|
||||
})
|
||||
.min_by_key(|session| session.updated_at)
|
||||
{
|
||||
@@ -468,7 +481,10 @@ async fn assign_session_in_dir_with_runner_program(
|
||||
.filter(|session| session.state == SessionState::Idle)
|
||||
.min_by_key(|session| {
|
||||
(
|
||||
unread_counts.get(&session.id).copied().unwrap_or(0),
|
||||
delegate_handoff_backlog
|
||||
.get(&session.id)
|
||||
.copied()
|
||||
.unwrap_or(0),
|
||||
session.updated_at,
|
||||
)
|
||||
})
|
||||
@@ -484,12 +500,20 @@ async fn assign_session_in_dir_with_runner_program(
|
||||
.filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending))
|
||||
.min_by_key(|session| {
|
||||
(
|
||||
unread_counts.get(&session.id).copied().unwrap_or(0),
|
||||
delegate_handoff_backlog
|
||||
.get(&session.id)
|
||||
.copied()
|
||||
.unwrap_or(0),
|
||||
session.updated_at,
|
||||
)
|
||||
})
|
||||
{
|
||||
if unread_counts.get(&active_delegate.id).copied().unwrap_or(0) > 0 {
|
||||
if delegate_handoff_backlog
|
||||
.get(&active_delegate.id)
|
||||
.copied()
|
||||
.unwrap_or(0)
|
||||
> 0
|
||||
{
|
||||
return Ok(AssignmentOutcome {
|
||||
session_id: lead.id.clone(),
|
||||
action: AssignmentAction::DeferredSaturated,
|
||||
@@ -531,7 +555,7 @@ fn collect_delegation_descendants(
|
||||
session_id: &str,
|
||||
remaining_depth: usize,
|
||||
current_depth: usize,
|
||||
unread_counts: &std::collections::HashMap<String, usize>,
|
||||
handoff_backlog: &std::collections::HashMap<String, usize>,
|
||||
visited: &mut HashSet<String>,
|
||||
descendants: &mut Vec<DelegatedSessionSummary>,
|
||||
) -> Result<()> {
|
||||
@@ -550,7 +574,7 @@ fn collect_delegation_descendants(
|
||||
|
||||
descendants.push(DelegatedSessionSummary {
|
||||
depth: current_depth,
|
||||
unread_messages: unread_counts.get(&child_id).copied().unwrap_or(0),
|
||||
handoff_backlog: handoff_backlog.get(&child_id).copied().unwrap_or(0),
|
||||
session,
|
||||
});
|
||||
|
||||
@@ -559,7 +583,7 @@ fn collect_delegation_descendants(
|
||||
&child_id,
|
||||
remaining_depth.saturating_sub(1),
|
||||
current_depth + 1,
|
||||
unread_counts,
|
||||
handoff_backlog,
|
||||
visited,
|
||||
descendants,
|
||||
)?;
|
||||
@@ -822,14 +846,13 @@ fn summarize_backlog_pressure(
|
||||
agent_type: &str,
|
||||
targets: &[(String, usize)],
|
||||
) -> Result<BacklogPressureSummary> {
|
||||
let unread_counts = db.unread_message_counts()?;
|
||||
let mut summary = BacklogPressureSummary::default();
|
||||
|
||||
for (session_id, _) in targets {
|
||||
let delegates = direct_delegate_sessions(db, session_id, agent_type)?;
|
||||
let has_clear_idle_delegate = delegates.iter().any(|delegate| {
|
||||
delegate.state == SessionState::Idle
|
||||
&& unread_counts.get(&delegate.id).copied().unwrap_or(0) == 0
|
||||
&& db.unread_task_handoff_count(&delegate.id).unwrap_or(0) == 0
|
||||
});
|
||||
let has_capacity = delegates.len() < cfg.max_parallel_sessions;
|
||||
|
||||
@@ -1027,7 +1050,7 @@ pub struct SessionStatus {
|
||||
|
||||
pub struct TeamStatus {
|
||||
root: Session,
|
||||
unread_messages: std::collections::HashMap<String, usize>,
|
||||
handoff_backlog: std::collections::HashMap<String, usize>,
|
||||
descendants: Vec<DelegatedSessionSummary>,
|
||||
}
|
||||
|
||||
@@ -1091,7 +1114,7 @@ struct BacklogPressureSummary {
|
||||
|
||||
struct DelegatedSessionSummary {
|
||||
depth: usize,
|
||||
unread_messages: usize,
|
||||
handoff_backlog: usize,
|
||||
session: Session,
|
||||
}
|
||||
|
||||
@@ -1133,8 +1156,8 @@ impl fmt::Display for TeamStatus {
|
||||
writeln!(f, "Branch: {}", worktree.branch)?;
|
||||
}
|
||||
|
||||
let lead_unread = self.unread_messages.get(&self.root.id).copied().unwrap_or(0);
|
||||
writeln!(f, "Inbox: {}", lead_unread)?;
|
||||
let lead_handoff_backlog = self.handoff_backlog.get(&self.root.id).copied().unwrap_or(0);
|
||||
writeln!(f, "Backlog: {}", lead_handoff_backlog)?;
|
||||
|
||||
if self.descendants.is_empty() {
|
||||
return write!(f, "Board: no delegated sessions");
|
||||
@@ -1164,11 +1187,11 @@ impl fmt::Display for TeamStatus {
|
||||
for item in items {
|
||||
writeln!(
|
||||
f,
|
||||
" - {}{} [{}] | inbox {} | {}",
|
||||
" - {}{} [{}] | backlog {} handoff(s) | {}",
|
||||
" ".repeat(item.depth.saturating_sub(1)),
|
||||
item.session.id,
|
||||
item.session.agent_type,
|
||||
item.unread_messages,
|
||||
item.handoff_backlog,
|
||||
item.session.task
|
||||
)?;
|
||||
}
|
||||
@@ -1798,6 +1821,74 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn assign_session_reuses_idle_delegate_when_only_non_handoff_messages_are_unread() -> Result<()> {
|
||||
let tempdir = TestDir::new("manager-assign-reuse-idle-info-inbox")?;
|
||||
let repo_root = tempdir.path().join("repo");
|
||||
init_git_repo(&repo_root)?;
|
||||
|
||||
let cfg = build_config(tempdir.path());
|
||||
let db = StateStore::open(&cfg.db_path)?;
|
||||
let now = Utc::now();
|
||||
|
||||
db.insert_session(&Session {
|
||||
id: "lead".to_string(),
|
||||
task: "lead task".to_string(),
|
||||
agent_type: "claude".to_string(),
|
||||
working_dir: repo_root.clone(),
|
||||
state: SessionState::Running,
|
||||
pid: Some(42),
|
||||
worktree: None,
|
||||
created_at: now - Duration::minutes(3),
|
||||
updated_at: now - Duration::minutes(3),
|
||||
metrics: SessionMetrics::default(),
|
||||
})?;
|
||||
db.insert_session(&Session {
|
||||
id: "idle-worker".to_string(),
|
||||
task: "old worker task".to_string(),
|
||||
agent_type: "claude".to_string(),
|
||||
working_dir: repo_root.clone(),
|
||||
state: SessionState::Idle,
|
||||
pid: Some(99),
|
||||
worktree: None,
|
||||
created_at: now - Duration::minutes(2),
|
||||
updated_at: now - Duration::minutes(2),
|
||||
metrics: SessionMetrics::default(),
|
||||
})?;
|
||||
db.send_message(
|
||||
"lead",
|
||||
"idle-worker",
|
||||
"{\"task\":\"old worker task\",\"context\":\"Delegated from lead\"}",
|
||||
"task_handoff",
|
||||
)?;
|
||||
db.mark_messages_read("idle-worker")?;
|
||||
db.send_message("lead", "idle-worker", "FYI status update", "info")?;
|
||||
|
||||
let (fake_runner, _) = write_fake_claude(tempdir.path())?;
|
||||
let outcome = assign_session_in_dir_with_runner_program(
|
||||
&db,
|
||||
&cfg,
|
||||
"lead",
|
||||
"Fresh delegated task",
|
||||
"claude",
|
||||
true,
|
||||
&repo_root,
|
||||
&fake_runner,
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(outcome.action, AssignmentAction::ReusedIdle);
|
||||
assert_eq!(outcome.session_id, "idle-worker");
|
||||
|
||||
let idle_messages = db.list_messages_for_session("idle-worker", 10)?;
|
||||
assert!(idle_messages.iter().any(|message| {
|
||||
message.msg_type == "task_handoff"
|
||||
&& message.content.contains("Fresh delegated task")
|
||||
}));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn assign_session_spawns_when_team_has_capacity() -> Result<()> {
|
||||
let tempdir = TestDir::new("manager-assign-spawn")?;
|
||||
@@ -2315,4 +2406,59 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn team_status_reports_handoff_backlog_not_generic_inbox_noise() -> Result<()> {
|
||||
let tempdir = TestDir::new("manager-team-status-backlog")?;
|
||||
let repo_root = tempdir.path().join("repo");
|
||||
init_git_repo(&repo_root)?;
|
||||
|
||||
let cfg = build_config(tempdir.path());
|
||||
let db = StateStore::open(&cfg.db_path)?;
|
||||
let now = Utc::now();
|
||||
|
||||
db.insert_session(&Session {
|
||||
id: "lead".to_string(),
|
||||
task: "lead task".to_string(),
|
||||
agent_type: "claude".to_string(),
|
||||
working_dir: repo_root.clone(),
|
||||
state: SessionState::Running,
|
||||
pid: Some(42),
|
||||
worktree: None,
|
||||
created_at: now - Duration::minutes(4),
|
||||
updated_at: now - Duration::minutes(4),
|
||||
metrics: SessionMetrics::default(),
|
||||
})?;
|
||||
db.insert_session(&Session {
|
||||
id: "worker".to_string(),
|
||||
task: "delegate task".to_string(),
|
||||
agent_type: "claude".to_string(),
|
||||
working_dir: repo_root,
|
||||
state: SessionState::Idle,
|
||||
pid: None,
|
||||
worktree: None,
|
||||
created_at: now - Duration::minutes(3),
|
||||
updated_at: now - Duration::minutes(3),
|
||||
metrics: SessionMetrics::default(),
|
||||
})?;
|
||||
|
||||
db.send_message("lead", "worker", "FYI status update", "info")?;
|
||||
db.send_message(
|
||||
"lead",
|
||||
"worker",
|
||||
"{\"task\":\"Delegated work\",\"context\":\"Delegated from lead\"}",
|
||||
"task_handoff",
|
||||
)?;
|
||||
let _ = db.mark_messages_read("worker")?;
|
||||
db.send_message("lead", "worker", "FYI reminder", "info")?;
|
||||
|
||||
let status = get_team_status(&db, "lead", 3)?;
|
||||
let rendered = format!("{status}");
|
||||
|
||||
assert!(rendered.contains("Backlog: 0"));
|
||||
assert!(rendered.contains("| backlog 0 handoff(s) |"));
|
||||
assert!(!rendered.contains("Inbox:"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,6 +46,38 @@ impl DaemonActivity {
|
||||
pub fn dispatch_cooloff_active(&self) -> bool {
|
||||
self.prefers_rebalance_first() && self.last_dispatch_deferred >= 2
|
||||
}
|
||||
|
||||
pub fn chronic_saturation_cleared_at(
|
||||
&self,
|
||||
) -> Option<&chrono::DateTime<chrono::Utc>> {
|
||||
if self.prefers_rebalance_first() {
|
||||
return None;
|
||||
}
|
||||
|
||||
match (
|
||||
self.last_dispatch_at.as_ref(),
|
||||
self.last_recovery_dispatch_at.as_ref(),
|
||||
) {
|
||||
(Some(dispatch_at), Some(recovery_at)) if recovery_at > dispatch_at => Some(recovery_at),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stabilized_after_recovery_at(
|
||||
&self,
|
||||
) -> Option<&chrono::DateTime<chrono::Utc>> {
|
||||
if self.last_dispatch_deferred != 0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
match (
|
||||
self.last_dispatch_at.as_ref(),
|
||||
self.last_recovery_dispatch_at.as_ref(),
|
||||
) {
|
||||
(Some(dispatch_at), Some(recovery_at)) if dispatch_at > recovery_at => Some(dispatch_at),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StateStore {
|
||||
@@ -523,6 +555,19 @@ impl StateStore {
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
pub fn unread_task_handoff_count(&self, session_id: &str) -> Result<usize> {
|
||||
self.conn
|
||||
.query_row(
|
||||
"SELECT COUNT(*)
|
||||
FROM messages
|
||||
WHERE to_session = ?1 AND msg_type = 'task_handoff' AND read = 0",
|
||||
rusqlite::params![session_id],
|
||||
|row| row.get::<_, i64>(0),
|
||||
)
|
||||
.map(|count| count as usize)
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
pub fn unread_task_handoff_targets(&self, limit: usize) -> Result<Vec<(String, usize)>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
"SELECT to_session, COUNT(*) as unread_count
|
||||
@@ -1068,6 +1113,8 @@ mod tests {
|
||||
let clear = DaemonActivity::default();
|
||||
assert!(!clear.prefers_rebalance_first());
|
||||
assert!(!clear.dispatch_cooloff_active());
|
||||
assert!(clear.chronic_saturation_cleared_at().is_none());
|
||||
assert!(clear.stabilized_after_recovery_at().is_none());
|
||||
|
||||
let unresolved = DaemonActivity {
|
||||
last_dispatch_at: Some(now),
|
||||
@@ -1083,6 +1130,8 @@ mod tests {
|
||||
};
|
||||
assert!(unresolved.prefers_rebalance_first());
|
||||
assert!(unresolved.dispatch_cooloff_active());
|
||||
assert!(unresolved.chronic_saturation_cleared_at().is_none());
|
||||
assert!(unresolved.stabilized_after_recovery_at().is_none());
|
||||
|
||||
let recovered = DaemonActivity {
|
||||
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
|
||||
@@ -1091,5 +1140,25 @@ mod tests {
|
||||
};
|
||||
assert!(!recovered.prefers_rebalance_first());
|
||||
assert!(!recovered.dispatch_cooloff_active());
|
||||
assert_eq!(
|
||||
recovered.chronic_saturation_cleared_at(),
|
||||
recovered.last_recovery_dispatch_at.as_ref()
|
||||
);
|
||||
assert!(recovered.stabilized_after_recovery_at().is_none());
|
||||
|
||||
let stabilized = DaemonActivity {
|
||||
last_dispatch_at: Some(now + chrono::Duration::seconds(2)),
|
||||
last_dispatch_routed: 2,
|
||||
last_dispatch_deferred: 0,
|
||||
last_dispatch_leads: 1,
|
||||
..recovered
|
||||
};
|
||||
assert!(!stabilized.prefers_rebalance_first());
|
||||
assert!(!stabilized.dispatch_cooloff_active());
|
||||
assert!(stabilized.chronic_saturation_cleared_at().is_none());
|
||||
assert_eq!(
|
||||
stabilized.stabilized_after_recovery_at(),
|
||||
stabilized.last_dispatch_at.as_ref()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@ pub struct Dashboard {
|
||||
sessions: Vec<Session>,
|
||||
session_output_cache: HashMap<String, Vec<OutputLine>>,
|
||||
unread_message_counts: HashMap<String, usize>,
|
||||
handoff_backlog_counts: HashMap<String, usize>,
|
||||
global_handoff_backlog_leads: usize,
|
||||
global_handoff_backlog_messages: usize,
|
||||
daemon_activity: DaemonActivity,
|
||||
@@ -103,7 +104,7 @@ struct AggregateUsage {
|
||||
struct DelegatedChildSummary {
|
||||
session_id: String,
|
||||
state: SessionState,
|
||||
unread_messages: usize,
|
||||
handoff_backlog: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||
@@ -141,6 +142,7 @@ impl Dashboard {
|
||||
sessions,
|
||||
session_output_cache: HashMap::new(),
|
||||
unread_message_counts: HashMap::new(),
|
||||
handoff_backlog_counts: HashMap::new(),
|
||||
global_handoff_backlog_leads: 0,
|
||||
global_handoff_backlog_messages: 0,
|
||||
daemon_activity: DaemonActivity::default(),
|
||||
@@ -162,6 +164,7 @@ impl Dashboard {
|
||||
session_table_state,
|
||||
};
|
||||
dashboard.unread_message_counts = dashboard.db.unread_message_counts().unwrap_or_default();
|
||||
dashboard.sync_handoff_backlog_counts();
|
||||
dashboard.sync_global_handoff_backlog();
|
||||
dashboard.sync_selected_output();
|
||||
dashboard.sync_selected_diff();
|
||||
@@ -241,27 +244,32 @@ impl Dashboard {
|
||||
return;
|
||||
}
|
||||
|
||||
let summary = SessionSummary::from_sessions(&self.sessions, &self.unread_message_counts);
|
||||
let stabilized = self.daemon_activity.stabilized_after_recovery_at().is_some();
|
||||
let summary =
|
||||
SessionSummary::from_sessions(&self.sessions, &self.handoff_backlog_counts, stabilized);
|
||||
let chunks = Layout::default()
|
||||
.direction(Direction::Vertical)
|
||||
.constraints([Constraint::Length(2), Constraint::Min(3)])
|
||||
.split(inner_area);
|
||||
|
||||
frame.render_widget(
|
||||
Paragraph::new(vec![summary_line(&summary), attention_queue_line(&summary)]),
|
||||
Paragraph::new(vec![
|
||||
summary_line(&summary),
|
||||
attention_queue_line(&summary, stabilized),
|
||||
]),
|
||||
chunks[0],
|
||||
);
|
||||
|
||||
let rows = self.sessions.iter().map(|session| {
|
||||
session_row(
|
||||
session,
|
||||
self.unread_message_counts
|
||||
self.handoff_backlog_counts
|
||||
.get(&session.id)
|
||||
.copied()
|
||||
.unwrap_or(0),
|
||||
)
|
||||
});
|
||||
let header = Row::new(["ID", "Agent", "State", "Branch", "Inbox", "Tokens", "Duration"])
|
||||
let header = Row::new(["ID", "Agent", "State", "Branch", "Backlog", "Tokens", "Duration"])
|
||||
.style(Style::default().add_modifier(Modifier::BOLD));
|
||||
let widths = [
|
||||
Constraint::Length(8),
|
||||
@@ -453,9 +461,9 @@ impl Dashboard {
|
||||
"",
|
||||
" n New session",
|
||||
" a Assign follow-up work from selected session",
|
||||
" b Rebalance backed-up delegate inboxes for selected lead",
|
||||
" B Rebalance backed-up delegate inboxes across lead teams",
|
||||
" i Drain unread task handoffs from selected session inbox",
|
||||
" b Rebalance backed-up delegate handoff backlog for selected lead",
|
||||
" B Rebalance backed-up delegate handoff backlog across lead teams",
|
||||
" i Drain unread task handoffs from selected lead",
|
||||
" g Auto-dispatch unread handoffs across lead sessions",
|
||||
" G Dispatch then rebalance backlog across lead teams",
|
||||
" p Toggle daemon auto-dispatch policy and persist config",
|
||||
@@ -1130,6 +1138,7 @@ impl Dashboard {
|
||||
HashMap::new()
|
||||
}
|
||||
};
|
||||
self.sync_handoff_backlog_counts();
|
||||
self.sync_global_handoff_backlog();
|
||||
self.sync_daemon_activity();
|
||||
self.sync_selection_by_id(selected_id.as_deref());
|
||||
@@ -1182,6 +1191,19 @@ impl Dashboard {
|
||||
}
|
||||
}
|
||||
|
||||
fn sync_handoff_backlog_counts(&mut self) {
|
||||
let limit = self.sessions.len().max(1);
|
||||
self.handoff_backlog_counts.clear();
|
||||
match self.db.unread_task_handoff_targets(limit) {
|
||||
Ok(targets) => {
|
||||
self.handoff_backlog_counts.extend(targets);
|
||||
}
|
||||
Err(error) => {
|
||||
tracing::warn!("Failed to refresh handoff backlog counts: {error}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn sync_daemon_activity(&mut self) {
|
||||
self.daemon_activity = match self.db.daemon_activity() {
|
||||
Ok(activity) => activity,
|
||||
@@ -1275,11 +1297,16 @@ impl Dashboard {
|
||||
match self.db.get_session(&child_id) {
|
||||
Ok(Some(session)) => {
|
||||
team.total += 1;
|
||||
let unread_messages = self
|
||||
.unread_message_counts
|
||||
.get(&child_id)
|
||||
.copied()
|
||||
.unwrap_or(0);
|
||||
let handoff_backlog = match self.db.unread_task_handoff_count(&child_id) {
|
||||
Ok(count) => count,
|
||||
Err(error) => {
|
||||
tracing::warn!(
|
||||
"Failed to load delegated child handoff backlog {}: {error}",
|
||||
child_id
|
||||
);
|
||||
0
|
||||
}
|
||||
};
|
||||
let state = session.state.clone();
|
||||
match state {
|
||||
SessionState::Idle => team.idle += 1,
|
||||
@@ -1291,12 +1318,12 @@ impl Dashboard {
|
||||
}
|
||||
|
||||
route_candidates.push(DelegatedChildSummary {
|
||||
unread_messages,
|
||||
handoff_backlog,
|
||||
state: state.clone(),
|
||||
session_id: child_id.clone(),
|
||||
});
|
||||
delegated.push(DelegatedChildSummary {
|
||||
unread_messages,
|
||||
handoff_backlog,
|
||||
state,
|
||||
session_id: child_id,
|
||||
});
|
||||
@@ -1333,7 +1360,7 @@ impl Dashboard {
|
||||
) -> Option<String> {
|
||||
if let Some(idle_clear) = delegates
|
||||
.iter()
|
||||
.filter(|delegate| delegate.state == SessionState::Idle && delegate.unread_messages == 0)
|
||||
.filter(|delegate| delegate.state == SessionState::Idle && delegate.handoff_backlog == 0)
|
||||
.min_by_key(|delegate| delegate.session_id.as_str())
|
||||
{
|
||||
return Some(format!(
|
||||
@@ -1349,24 +1376,24 @@ impl Dashboard {
|
||||
if let Some(idle_backed_up) = delegates
|
||||
.iter()
|
||||
.filter(|delegate| delegate.state == SessionState::Idle)
|
||||
.min_by_key(|delegate| (delegate.unread_messages, delegate.session_id.as_str()))
|
||||
.min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str()))
|
||||
{
|
||||
return Some(format!(
|
||||
"reuse idle {} with inbox {}",
|
||||
"reuse idle {} with backlog {}",
|
||||
format_session_id(&idle_backed_up.session_id),
|
||||
idle_backed_up.unread_messages
|
||||
idle_backed_up.handoff_backlog
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(active_delegate) = delegates
|
||||
.iter()
|
||||
.filter(|delegate| matches!(delegate.state, SessionState::Running | SessionState::Pending))
|
||||
.min_by_key(|delegate| (delegate.unread_messages, delegate.session_id.as_str()))
|
||||
.min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str()))
|
||||
{
|
||||
return Some(format!(
|
||||
"reuse active {} with inbox {}",
|
||||
"reuse active {} with backlog {}",
|
||||
format_session_id(&active_delegate.session_id),
|
||||
active_delegate.unread_messages
|
||||
active_delegate.handoff_backlog
|
||||
));
|
||||
}
|
||||
|
||||
@@ -1487,17 +1514,35 @@ impl Dashboard {
|
||||
self.cfg.auto_dispatch_limit_per_session
|
||||
));
|
||||
|
||||
let stabilized = self.daemon_activity.stabilized_after_recovery_at();
|
||||
|
||||
lines.push(format!(
|
||||
"Coordination mode {}",
|
||||
if self.daemon_activity.dispatch_cooloff_active() {
|
||||
"rebalance-cooloff (chronic saturation)"
|
||||
} else if self.daemon_activity.prefers_rebalance_first() {
|
||||
"rebalance-first (chronic saturation)"
|
||||
} else if stabilized.is_some() {
|
||||
"dispatch-first (stabilized)"
|
||||
} else {
|
||||
"dispatch-first"
|
||||
}
|
||||
));
|
||||
|
||||
if let Some(cleared_at) = self.daemon_activity.chronic_saturation_cleared_at() {
|
||||
lines.push(format!(
|
||||
"Chronic saturation cleared @ {}",
|
||||
self.short_timestamp(&cleared_at.to_rfc3339())
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(stabilized_at) = stabilized {
|
||||
lines.push(format!(
|
||||
"Recovery stabilized @ {}",
|
||||
self.short_timestamp(&stabilized_at.to_rfc3339())
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(last_dispatch_at) = self.daemon_activity.last_dispatch_at.as_ref() {
|
||||
lines.push(format!(
|
||||
"Last daemon dispatch {} routed / {} deferred across {} lead(s) @ {}",
|
||||
@@ -1508,24 +1553,26 @@ impl Dashboard {
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(last_recovery_dispatch_at) =
|
||||
self.daemon_activity.last_recovery_dispatch_at.as_ref()
|
||||
{
|
||||
lines.push(format!(
|
||||
"Last daemon recovery dispatch {} handoff(s) across {} lead(s) @ {}",
|
||||
self.daemon_activity.last_recovery_dispatch_routed,
|
||||
self.daemon_activity.last_recovery_dispatch_leads,
|
||||
self.short_timestamp(&last_recovery_dispatch_at.to_rfc3339())
|
||||
));
|
||||
}
|
||||
if stabilized.is_none() {
|
||||
if let Some(last_recovery_dispatch_at) =
|
||||
self.daemon_activity.last_recovery_dispatch_at.as_ref()
|
||||
{
|
||||
lines.push(format!(
|
||||
"Last daemon recovery dispatch {} handoff(s) across {} lead(s) @ {}",
|
||||
self.daemon_activity.last_recovery_dispatch_routed,
|
||||
self.daemon_activity.last_recovery_dispatch_leads,
|
||||
self.short_timestamp(&last_recovery_dispatch_at.to_rfc3339())
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(last_rebalance_at) = self.daemon_activity.last_rebalance_at.as_ref() {
|
||||
lines.push(format!(
|
||||
"Last daemon rebalance {} handoff(s) across {} lead(s) @ {}",
|
||||
self.daemon_activity.last_rebalance_rerouted,
|
||||
self.daemon_activity.last_rebalance_leads,
|
||||
self.short_timestamp(&last_rebalance_at.to_rfc3339())
|
||||
));
|
||||
if let Some(last_rebalance_at) = self.daemon_activity.last_rebalance_at.as_ref() {
|
||||
lines.push(format!(
|
||||
"Last daemon rebalance {} handoff(s) across {} lead(s) @ {}",
|
||||
self.daemon_activity.last_rebalance_rerouted,
|
||||
self.daemon_activity.last_rebalance_leads,
|
||||
self.short_timestamp(&last_rebalance_at.to_rfc3339())
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(route_preview) = self.selected_route_preview.as_ref() {
|
||||
@@ -1536,10 +1583,10 @@ impl Dashboard {
|
||||
lines.push("Delegates".to_string());
|
||||
for child in &self.selected_child_sessions {
|
||||
lines.push(format!(
|
||||
"- {} [{}] | inbox {}",
|
||||
"- {} [{}] | backlog {}",
|
||||
format_session_id(&child.session_id),
|
||||
session_state_label(&child.state),
|
||||
child.unread_messages
|
||||
child.handoff_backlog
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -1575,7 +1622,7 @@ impl Dashboard {
|
||||
|
||||
lines.push(String::new());
|
||||
if self.selected_messages.is_empty() {
|
||||
lines.push("Inbox clear".to_string());
|
||||
lines.push("Message inbox clear".to_string());
|
||||
} else {
|
||||
lines.push("Recent messages:".to_string());
|
||||
let recent = self
|
||||
@@ -1637,18 +1684,19 @@ impl Dashboard {
|
||||
|
||||
fn attention_queue_items(&self, limit: usize) -> Vec<String> {
|
||||
let mut items = Vec::new();
|
||||
let suppress_inbox_attention = self.daemon_activity.stabilized_after_recovery_at().is_some();
|
||||
|
||||
for session in &self.sessions {
|
||||
let unread = self
|
||||
.unread_message_counts
|
||||
let handoff_backlog = self
|
||||
.handoff_backlog_counts
|
||||
.get(&session.id)
|
||||
.copied()
|
||||
.unwrap_or(0);
|
||||
if unread > 0 {
|
||||
if handoff_backlog > 0 && !suppress_inbox_attention {
|
||||
items.push(format!(
|
||||
"- Inbox {} | {} unread | {}",
|
||||
"- Backlog {} | {} handoff(s) | {}",
|
||||
format_session_id(&session.id),
|
||||
unread,
|
||||
handoff_backlog,
|
||||
truncate_for_dashboard(&session.task, 40)
|
||||
));
|
||||
}
|
||||
@@ -1850,12 +1898,24 @@ impl Pane {
|
||||
}
|
||||
|
||||
impl SessionSummary {
|
||||
fn from_sessions(sessions: &[Session], unread_message_counts: &HashMap<String, usize>) -> Self {
|
||||
fn from_sessions(
|
||||
sessions: &[Session],
|
||||
unread_message_counts: &HashMap<String, usize>,
|
||||
suppress_inbox_attention: bool,
|
||||
) -> Self {
|
||||
sessions.iter().fold(
|
||||
Self {
|
||||
total: sessions.len(),
|
||||
unread_messages: unread_message_counts.values().sum(),
|
||||
inbox_sessions: unread_message_counts.values().filter(|count| **count > 0).count(),
|
||||
unread_messages: if suppress_inbox_attention {
|
||||
0
|
||||
} else {
|
||||
unread_message_counts.values().sum()
|
||||
},
|
||||
inbox_sessions: if suppress_inbox_attention {
|
||||
0
|
||||
} else {
|
||||
unread_message_counts.values().filter(|count| **count > 0).count()
|
||||
},
|
||||
..Self::default()
|
||||
},
|
||||
|mut summary, session| {
|
||||
@@ -1922,7 +1982,7 @@ fn summary_span(label: &str, value: usize, color: Color) -> Span<'static> {
|
||||
)
|
||||
}
|
||||
|
||||
fn attention_queue_line(summary: &SessionSummary) -> Line<'static> {
|
||||
fn attention_queue_line(summary: &SessionSummary, stabilized: bool) -> Line<'static> {
|
||||
if summary.failed == 0
|
||||
&& summary.stopped == 0
|
||||
&& summary.pending == 0
|
||||
@@ -1933,7 +1993,11 @@ fn attention_queue_line(summary: &SessionSummary) -> Line<'static> {
|
||||
"Attention queue clear",
|
||||
Style::default().fg(Color::Green).add_modifier(Modifier::BOLD),
|
||||
),
|
||||
Span::raw(" no failed, stopped, or pending sessions"),
|
||||
Span::raw(if stabilized {
|
||||
" stabilized backlog absorbed"
|
||||
} else {
|
||||
" no failed, stopped, or pending sessions"
|
||||
}),
|
||||
]);
|
||||
}
|
||||
|
||||
@@ -1942,7 +2006,7 @@ fn attention_queue_line(summary: &SessionSummary) -> Line<'static> {
|
||||
"Attention queue ",
|
||||
Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD),
|
||||
),
|
||||
summary_span("Inbox", summary.unread_messages, Color::Magenta),
|
||||
summary_span("Backlog", summary.unread_messages, Color::Magenta),
|
||||
summary_span("Failed", summary.failed, Color::Red),
|
||||
summary_span("Stopped", summary.stopped, Color::DarkGray),
|
||||
summary_span("Pending", summary.pending, Color::Yellow),
|
||||
@@ -2131,6 +2195,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn selected_session_metrics_text_includes_daemon_activity() {
|
||||
let now = Utc::now();
|
||||
let mut dashboard = test_dashboard(
|
||||
vec![sample_session(
|
||||
"focus-12345678",
|
||||
@@ -2143,20 +2208,21 @@ mod tests {
|
||||
0,
|
||||
);
|
||||
dashboard.daemon_activity = DaemonActivity {
|
||||
last_dispatch_at: Some(Utc::now()),
|
||||
last_dispatch_at: Some(now),
|
||||
last_dispatch_routed: 4,
|
||||
last_dispatch_deferred: 2,
|
||||
last_dispatch_leads: 2,
|
||||
last_recovery_dispatch_at: Some(Utc::now()),
|
||||
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
|
||||
last_recovery_dispatch_routed: 1,
|
||||
last_recovery_dispatch_leads: 1,
|
||||
last_rebalance_at: Some(Utc::now()),
|
||||
last_rebalance_at: Some(now + chrono::Duration::seconds(2)),
|
||||
last_rebalance_rerouted: 1,
|
||||
last_rebalance_leads: 1,
|
||||
};
|
||||
|
||||
let text = dashboard.selected_session_metrics_text();
|
||||
assert!(text.contains("Coordination mode dispatch-first"));
|
||||
assert!(text.contains("Chronic saturation cleared @"));
|
||||
assert!(text.contains("Last daemon dispatch 4 routed / 2 deferred across 2 lead(s)"));
|
||||
assert!(text.contains("Last daemon recovery dispatch 1 handoff(s) across 1 lead(s)"));
|
||||
assert!(text.contains("Last daemon rebalance 1 handoff(s) across 1 lead(s)"));
|
||||
@@ -2222,6 +2288,138 @@ mod tests {
|
||||
assert!(text.contains("Coordination mode rebalance-cooloff (chronic saturation)"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn selected_session_metrics_text_shows_stabilized_dispatch_mode_after_recovery() {
|
||||
let now = Utc::now();
|
||||
let mut dashboard = test_dashboard(
|
||||
vec![sample_session(
|
||||
"focus-12345678",
|
||||
"planner",
|
||||
SessionState::Running,
|
||||
Some("ecc/focus"),
|
||||
512,
|
||||
42,
|
||||
)],
|
||||
0,
|
||||
);
|
||||
dashboard.daemon_activity = DaemonActivity {
|
||||
last_dispatch_at: Some(now + chrono::Duration::seconds(2)),
|
||||
last_dispatch_routed: 2,
|
||||
last_dispatch_deferred: 0,
|
||||
last_dispatch_leads: 1,
|
||||
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
|
||||
last_recovery_dispatch_routed: 1,
|
||||
last_recovery_dispatch_leads: 1,
|
||||
last_rebalance_at: Some(now),
|
||||
last_rebalance_rerouted: 1,
|
||||
last_rebalance_leads: 1,
|
||||
};
|
||||
|
||||
let text = dashboard.selected_session_metrics_text();
|
||||
assert!(text.contains("Coordination mode dispatch-first (stabilized)"));
|
||||
assert!(text.contains("Recovery stabilized @"));
|
||||
assert!(!text.contains("Last daemon recovery dispatch"));
|
||||
assert!(!text.contains("Last daemon rebalance"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn attention_queue_suppresses_inbox_pressure_when_stabilized() {
|
||||
let now = Utc::now();
|
||||
let sessions = vec![sample_session(
|
||||
"focus-12345678",
|
||||
"planner",
|
||||
SessionState::Running,
|
||||
Some("ecc/focus"),
|
||||
512,
|
||||
42,
|
||||
)];
|
||||
let unread = HashMap::from([(String::from("focus-12345678"), 3usize)]);
|
||||
let summary = SessionSummary::from_sessions(&sessions, &unread, true);
|
||||
|
||||
let line = attention_queue_line(&summary, true);
|
||||
let rendered = line
|
||||
.spans
|
||||
.iter()
|
||||
.map(|span| span.content.as_ref())
|
||||
.collect::<String>();
|
||||
|
||||
assert!(rendered.contains("Attention queue clear"));
|
||||
assert!(rendered.contains("stabilized backlog absorbed"));
|
||||
|
||||
let mut dashboard = test_dashboard(sessions, 0);
|
||||
dashboard.unread_message_counts = unread;
|
||||
dashboard.handoff_backlog_counts = HashMap::from([(String::from("focus-12345678"), 3usize)]);
|
||||
dashboard.daemon_activity = DaemonActivity {
|
||||
last_dispatch_at: Some(now + chrono::Duration::seconds(2)),
|
||||
last_dispatch_routed: 2,
|
||||
last_dispatch_deferred: 0,
|
||||
last_dispatch_leads: 1,
|
||||
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
|
||||
last_recovery_dispatch_routed: 1,
|
||||
last_recovery_dispatch_leads: 1,
|
||||
last_rebalance_at: Some(now),
|
||||
last_rebalance_rerouted: 1,
|
||||
last_rebalance_leads: 1,
|
||||
};
|
||||
|
||||
let text = dashboard.selected_session_metrics_text();
|
||||
assert!(text.contains("Attention queue clear"));
|
||||
assert!(!text.contains("Needs attention:"));
|
||||
assert!(!text.contains("Backlog focus-12"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn route_preview_ignores_non_handoff_inbox_noise() {
|
||||
let lead = sample_session(
|
||||
"lead-12345678",
|
||||
"planner",
|
||||
SessionState::Running,
|
||||
Some("ecc/lead"),
|
||||
512,
|
||||
42,
|
||||
);
|
||||
let idle_worker = sample_session(
|
||||
"idle-worker",
|
||||
"planner",
|
||||
SessionState::Idle,
|
||||
Some("ecc/idle"),
|
||||
128,
|
||||
12,
|
||||
);
|
||||
|
||||
let mut dashboard = test_dashboard(vec![lead.clone(), idle_worker.clone()], 0);
|
||||
dashboard.db.insert_session(&lead).unwrap();
|
||||
dashboard.db.insert_session(&idle_worker).unwrap();
|
||||
dashboard
|
||||
.db
|
||||
.send_message("lead-12345678", "idle-worker", "FYI status update", "info")
|
||||
.unwrap();
|
||||
dashboard
|
||||
.db
|
||||
.send_message(
|
||||
"lead-12345678",
|
||||
"idle-worker",
|
||||
"{\"task\":\"Delegated work\",\"context\":\"Delegated from lead\"}",
|
||||
"task_handoff",
|
||||
)
|
||||
.unwrap();
|
||||
dashboard.db.mark_messages_read("idle-worker").unwrap();
|
||||
dashboard
|
||||
.db
|
||||
.send_message("lead-12345678", "idle-worker", "FYI status update", "info")
|
||||
.unwrap();
|
||||
|
||||
dashboard.unread_message_counts = dashboard.db.unread_message_counts().unwrap();
|
||||
dashboard.sync_selected_lineage();
|
||||
|
||||
assert_eq!(
|
||||
dashboard.selected_route_preview.as_deref(),
|
||||
Some("reuse idle idle-wor")
|
||||
);
|
||||
assert_eq!(dashboard.selected_child_sessions.len(), 1);
|
||||
assert_eq!(dashboard.selected_child_sessions[0].handoff_backlog, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn aggregate_cost_summary_mentions_total_cost() {
|
||||
let db = StateStore::open(Path::new(":memory:")).unwrap();
|
||||
@@ -2722,6 +2920,7 @@ mod tests {
|
||||
sessions,
|
||||
session_output_cache: HashMap::new(),
|
||||
unread_message_counts: HashMap::new(),
|
||||
handoff_backlog_counts: HashMap::new(),
|
||||
global_handoff_backlog_leads: 0,
|
||||
global_handoff_backlog_messages: 0,
|
||||
daemon_activity: DaemonActivity::default(),
|
||||
|
||||
Reference in New Issue
Block a user