feat: track ecc2 chronic saturation streak

This commit is contained in:
Affaan Mustafa
2026-04-08 12:36:32 -07:00
parent 9d766af025
commit 10e34aa47a
3 changed files with 394 additions and 175 deletions

View File

@@ -1,28 +1,28 @@
use std::collections::HashMap;
use ratatui::{
prelude::*,
widgets::{
Block, Borders, Cell, HighlightSpacing, Paragraph, Row, Table, TableState, Tabs, Wrap,
},
};
use std::collections::HashMap;
use tokio::sync::broadcast;
use super::widgets::{budget_state, format_currency, format_token_count, BudgetState, TokenMeter};
use crate::comms;
use crate::config::{Config, PaneLayout};
use crate::observability::ToolLogEntry;
use crate::session::output::{OutputEvent, OutputLine, SessionOutputStore, OUTPUT_BUFFER_LIMIT};
use crate::session::manager;
use crate::session::output::{OutputEvent, OutputLine, SessionOutputStore, OUTPUT_BUFFER_LIMIT};
use crate::session::store::{DaemonActivity, StateStore};
use crate::session::{Session, SessionMessage, SessionState};
use crate::worktree;
#[cfg(test)]
use std::path::Path;
#[cfg(test)]
use crate::session::output::OutputStream;
#[cfg(test)]
use crate::session::{SessionMetrics, WorktreeInfo};
#[cfg(test)]
use std::path::Path;
const DEFAULT_PANE_SIZE_PERCENT: u16 = 35;
const DEFAULT_GRID_SIZE_PERCENT: u16 = 50;
@@ -122,7 +122,11 @@ impl Dashboard {
Self::with_output_store(db, cfg, SessionOutputStore::default())
}
pub fn with_output_store(db: StateStore, cfg: Config, output_store: SessionOutputStore) -> Self {
pub fn with_output_store(
db: StateStore,
cfg: Config,
output_store: SessionOutputStore,
) -> Self {
let pane_size_percent = match cfg.pane_layout {
PaneLayout::Grid => DEFAULT_GRID_SIZE_PERCENT,
PaneLayout::Horizontal | PaneLayout::Vertical => DEFAULT_PANE_SIZE_PERCENT,
@@ -221,13 +225,13 @@ impl Dashboard {
.map(|pane| pane.title())
.collect::<Vec<_>>(),
)
.block(Block::default().borders(Borders::ALL).title(title))
.select(self.selected_pane_index())
.highlight_style(
Style::default()
.fg(Color::Cyan)
.add_modifier(Modifier::BOLD),
);
.block(Block::default().borders(Borders::ALL).title(title))
.select(self.selected_pane_index())
.highlight_style(
Style::default()
.fg(Color::Cyan)
.add_modifier(Modifier::BOLD),
);
frame.render_widget(tabs, area);
}
@@ -244,7 +248,10 @@ impl Dashboard {
return;
}
let stabilized = self.daemon_activity.stabilized_after_recovery_at().is_some();
let stabilized = self
.daemon_activity
.stabilized_after_recovery_at()
.is_some();
let summary =
SessionSummary::from_sessions(&self.sessions, &self.handoff_backlog_counts, stabilized);
let chunks = Layout::default()
@@ -269,8 +276,10 @@ impl Dashboard {
.unwrap_or(0),
)
});
let header = Row::new(["ID", "Agent", "State", "Branch", "Backlog", "Tokens", "Duration"])
.style(Style::default().add_modifier(Modifier::BOLD));
let header = Row::new([
"ID", "Agent", "State", "Branch", "Backlog", "Tokens", "Duration",
])
.style(Style::default().add_modifier(Modifier::BOLD));
let widths = [
Constraint::Length(8),
Constraint::Length(10),
@@ -600,14 +609,15 @@ impl Dashboard {
let task = self.new_session_task();
let agent = self.cfg.default_agent.clone();
let session_id = match manager::create_session(&self.db, &self.cfg, &task, &agent, true).await {
Ok(session_id) => session_id,
Err(error) => {
tracing::warn!("Failed to create new session from dashboard: {error}");
self.set_operator_note(format!("new session failed: {error}"));
return;
}
};
let session_id =
match manager::create_session(&self.db, &self.cfg, &task, &agent, true).await {
Ok(session_id) => session_id,
Err(error) => {
tracing::warn!("Failed to create new session from dashboard: {error}");
self.set_operator_note(format!("new session failed: {error}"));
return;
}
};
if let Some(source_session) = self.sessions.get(self.selected_session) {
let context = format!(
@@ -644,7 +654,10 @@ impl Dashboard {
self.refresh();
self.sync_selection_by_id(Some(&session_id));
self.set_operator_note(format!("spawned session {}", format_session_id(&session_id)));
self.set_operator_note(format!(
"spawned session {}",
format_session_id(&session_id)
));
self.reset_output_view();
self.sync_selected_output();
self.sync_selected_diff();
@@ -808,22 +821,17 @@ impl Dashboard {
let agent = self.cfg.default_agent.clone();
let lead_limit = self.sessions.len().max(1);
let outcomes = match manager::auto_dispatch_backlog(
&self.db,
&self.cfg,
&agent,
true,
lead_limit,
)
.await
{
Ok(outcomes) => outcomes,
Err(error) => {
tracing::warn!("Failed to auto-dispatch backlog from dashboard: {error}");
self.set_operator_note(format!("global auto-dispatch failed: {error}"));
return;
}
};
let outcomes =
match manager::auto_dispatch_backlog(&self.db, &self.cfg, &agent, true, lead_limit)
.await
{
Ok(outcomes) => outcomes,
Err(error) => {
tracing::warn!("Failed to auto-dispatch backlog from dashboard: {error}");
self.set_operator_note(format!("global auto-dispatch failed: {error}"));
return;
}
};
let total_processed: usize = outcomes.iter().map(|outcome| outcome.routed.len()).sum();
let total_routed: usize = outcomes
@@ -867,22 +875,16 @@ impl Dashboard {
let agent = self.cfg.default_agent.clone();
let lead_limit = self.sessions.len().max(1);
let outcomes = match manager::rebalance_all_teams(
&self.db,
&self.cfg,
&agent,
true,
lead_limit,
)
.await
{
Ok(outcomes) => outcomes,
Err(error) => {
tracing::warn!("Failed to rebalance teams from dashboard: {error}");
self.set_operator_note(format!("global rebalance failed: {error}"));
return;
}
};
let outcomes =
match manager::rebalance_all_teams(&self.db, &self.cfg, &agent, true, lead_limit).await
{
Ok(outcomes) => outcomes,
Err(error) => {
tracing::warn!("Failed to rebalance teams from dashboard: {error}");
self.set_operator_note(format!("global rebalance failed: {error}"));
return;
}
};
let total_rerouted: usize = outcomes.iter().map(|outcome| outcome.rerouted.len()).sum();
let selected_session_id = self
@@ -914,11 +916,7 @@ impl Dashboard {
let lead_limit = self.sessions.len().max(1);
let outcome = match manager::coordinate_backlog(
&self.db,
&self.cfg,
&agent,
true,
lead_limit,
&self.db, &self.cfg, &agent, true, lead_limit,
)
.await
{
@@ -992,12 +990,18 @@ impl Dashboard {
let session_id = session.id.clone();
if let Err(error) = manager::stop_session(&self.db, &session_id).await {
tracing::warn!("Failed to stop session {}: {error}", session.id);
self.set_operator_note(format!("stop failed for {}: {error}", format_session_id(&session_id)));
self.set_operator_note(format!(
"stop failed for {}: {error}",
format_session_id(&session_id)
));
return;
}
self.refresh();
self.set_operator_note(format!("stopped session {}", format_session_id(&session_id)));
self.set_operator_note(format!(
"stopped session {}",
format_session_id(&session_id)
));
}
pub async fn resume_selected(&mut self) {
@@ -1008,12 +1012,18 @@ impl Dashboard {
let session_id = session.id.clone();
if let Err(error) = manager::resume_session(&self.db, &self.cfg, &session_id).await {
tracing::warn!("Failed to resume session {}: {error}", session.id);
self.set_operator_note(format!("resume failed for {}: {error}", format_session_id(&session_id)));
self.set_operator_note(format!(
"resume failed for {}: {error}",
format_session_id(&session_id)
));
return;
}
self.refresh();
self.set_operator_note(format!("resumed session {}", format_session_id(&session_id)));
self.set_operator_note(format!(
"resumed session {}",
format_session_id(&session_id)
));
}
pub async fn cleanup_selected_worktree(&mut self) {
@@ -1036,7 +1046,10 @@ impl Dashboard {
}
self.refresh();
self.set_operator_note(format!("cleaned worktree for {}", format_session_id(&session_id)));
self.set_operator_note(format!(
"cleaned worktree for {}",
format_session_id(&session_id)
));
}
pub async fn delete_selected_session(&mut self) {
@@ -1047,12 +1060,18 @@ impl Dashboard {
let session_id = session.id.clone();
if let Err(error) = manager::delete_session(&self.db, &session_id).await {
tracing::warn!("Failed to delete session {}: {error}", session.id);
self.set_operator_note(format!("delete failed for {}: {error}", format_session_id(&session_id)));
self.set_operator_note(format!(
"delete failed for {}: {error}",
format_session_id(&session_id)
));
return;
}
self.refresh();
self.set_operator_note(format!("deleted session {}", format_session_id(&session_id)));
self.set_operator_note(format!(
"deleted session {}",
format_session_id(&session_id)
));
}
pub fn refresh(&mut self) {
@@ -1085,7 +1104,8 @@ impl Dashboard {
}
pub fn adjust_auto_dispatch_limit(&mut self, delta: isize) {
let next = (self.cfg.auto_dispatch_limit_per_session as isize + delta).clamp(1, 50) as usize;
let next =
(self.cfg.auto_dispatch_limit_per_session as isize + delta).clamp(1, 50) as usize;
if next == self.cfg.auto_dispatch_limit_per_session {
self.set_operator_note(format!(
"auto-dispatch limit unchanged at {} handoff(s) per lead",
@@ -1162,7 +1182,11 @@ impl Dashboard {
fn sync_selection_by_id(&mut self, selected_id: Option<&str>) {
if let Some(selected_id) = selected_id {
if let Some(index) = self.sessions.iter().position(|session| session.id == selected_id) {
if let Some(index) = self
.sessions
.iter()
.position(|session| session.id == selected_id)
{
self.selected_session = index;
}
}
@@ -1246,7 +1270,11 @@ impl Dashboard {
return;
};
let unread_count = self.unread_message_counts.get(&session_id).copied().unwrap_or(0);
let unread_count = self
.unread_message_counts
.get(&session_id)
.copied()
.unwrap_or(0);
if unread_count > 0 {
match self.db.mark_messages_read(&session_id) {
Ok(_) => {
@@ -1297,7 +1325,8 @@ impl Dashboard {
match self.db.get_session(&child_id) {
Ok(Some(session)) => {
team.total += 1;
let handoff_backlog = match self.db.unread_task_handoff_count(&child_id) {
let handoff_backlog = match self.db.unread_task_handoff_count(&child_id)
{
Ok(count) => count,
Err(error) => {
tracing::warn!(
@@ -1360,7 +1389,9 @@ impl Dashboard {
) -> Option<String> {
if let Some(idle_clear) = delegates
.iter()
.filter(|delegate| delegate.state == SessionState::Idle && delegate.handoff_backlog == 0)
.filter(|delegate| {
delegate.state == SessionState::Idle && delegate.handoff_backlog == 0
})
.min_by_key(|delegate| delegate.session_id.as_str())
{
return Some(format!(
@@ -1387,7 +1418,12 @@ impl Dashboard {
if let Some(active_delegate) = delegates
.iter()
.filter(|delegate| matches!(delegate.state, SessionState::Running | SessionState::Pending))
.filter(|delegate| {
matches!(
delegate.state,
SessionState::Running | SessionState::Pending
)
})
.min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str()))
{
return Some(format!(
@@ -1510,7 +1546,11 @@ impl Dashboard {
"Global handoff backlog {} lead(s) / {} handoff(s) | Auto-dispatch {} @ {}/lead",
self.global_handoff_backlog_leads,
self.global_handoff_backlog_messages,
if self.cfg.auto_dispatch_unread_handoffs { "on" } else { "off" },
if self.cfg.auto_dispatch_unread_handoffs {
"on"
} else {
"off"
},
self.cfg.auto_dispatch_limit_per_session
));
@@ -1529,6 +1569,13 @@ impl Dashboard {
}
));
if self.daemon_activity.chronic_saturation_streak > 0 {
lines.push(format!(
"Chronic saturation streak {} cycle(s)",
self.daemon_activity.chronic_saturation_streak
));
}
if let Some(cleared_at) = self.daemon_activity.chronic_saturation_cleared_at() {
lines.push(format!(
"Chronic saturation cleared @ {}",
@@ -1684,7 +1731,10 @@ impl Dashboard {
fn attention_queue_items(&self, limit: usize) -> Vec<String> {
let mut items = Vec::new();
let suppress_inbox_attention = self.daemon_activity.stabilized_after_recovery_at().is_some();
let suppress_inbox_attention = self
.daemon_activity
.stabilized_after_recovery_at()
.is_some();
for session in &self.sessions {
let handoff_backlog = self
@@ -1914,7 +1964,10 @@ impl SessionSummary {
inbox_sessions: if suppress_inbox_attention {
0
} else {
unread_message_counts.values().filter(|count| **count > 0).count()
unread_message_counts
.values()
.filter(|count| **count > 0)
.count()
},
..Self::default()
},
@@ -1991,7 +2044,9 @@ fn attention_queue_line(summary: &SessionSummary, stabilized: bool) -> Line<'sta
return Line::from(vec![
Span::styled(
"Attention queue clear",
Style::default().fg(Color::Green).add_modifier(Modifier::BOLD),
Style::default()
.fg(Color::Green)
.add_modifier(Modifier::BOLD),
),
Span::raw(if stabilized {
" stabilized backlog absorbed"
@@ -2004,7 +2059,9 @@ fn attention_queue_line(summary: &SessionSummary, stabilized: bool) -> Line<'sta
Line::from(vec![
Span::styled(
"Attention queue ",
Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD),
Style::default()
.fg(Color::Yellow)
.add_modifier(Modifier::BOLD),
),
summary_span("Backlog", summary.unread_messages, Color::Magenta),
summary_span("Failed", summary.failed, Color::Red),
@@ -2141,15 +2198,13 @@ mod tests {
],
0,
);
dashboard
.session_output_cache
.insert(
"focus-12345678".to_string(),
vec![OutputLine {
stream: OutputStream::Stdout,
text: "last useful output".to_string(),
}],
);
dashboard.session_output_cache.insert(
"focus-12345678".to_string(),
vec![OutputLine {
stream: OutputStream::Stdout,
text: "last useful output".to_string(),
}],
);
dashboard.selected_diff_summary = Some("1 file changed, 2 insertions(+)".to_string());
let text = dashboard.selected_session_metrics_text();
@@ -2188,7 +2243,9 @@ mod tests {
let text = dashboard.selected_session_metrics_text();
assert!(text.contains("Team 3/8 | idle 1 | running 1 | pending 1 | failed 0 | stopped 0"));
assert!(text.contains("Global handoff backlog 2 lead(s) / 5 handoff(s) | Auto-dispatch off @ 5/lead"));
assert!(text.contains(
"Global handoff backlog 2 lead(s) / 5 handoff(s) | Auto-dispatch off @ 5/lead"
));
assert!(text.contains("Coordination mode dispatch-first"));
assert!(text.contains("Next route reuse idle worker-1"));
}
@@ -2212,6 +2269,7 @@ mod tests {
last_dispatch_routed: 4,
last_dispatch_deferred: 2,
last_dispatch_leads: 2,
chronic_saturation_streak: 0,
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
last_recovery_dispatch_routed: 1,
last_recovery_dispatch_leads: 1,
@@ -2246,6 +2304,7 @@ mod tests {
last_dispatch_routed: 0,
last_dispatch_deferred: 1,
last_dispatch_leads: 1,
chronic_saturation_streak: 1,
last_recovery_dispatch_at: None,
last_recovery_dispatch_routed: 0,
last_recovery_dispatch_leads: 0,
@@ -2276,6 +2335,7 @@ mod tests {
last_dispatch_routed: 0,
last_dispatch_deferred: 3,
last_dispatch_leads: 1,
chronic_saturation_streak: 3,
last_recovery_dispatch_at: None,
last_recovery_dispatch_routed: 0,
last_recovery_dispatch_leads: 0,
@@ -2286,6 +2346,7 @@ mod tests {
let text = dashboard.selected_session_metrics_text();
assert!(text.contains("Coordination mode rebalance-cooloff (chronic saturation)"));
assert!(text.contains("Chronic saturation streak 3 cycle(s)"));
}
#[test]
@@ -2307,6 +2368,7 @@ mod tests {
last_dispatch_routed: 2,
last_dispatch_deferred: 0,
last_dispatch_leads: 1,
chronic_saturation_streak: 0,
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
last_recovery_dispatch_routed: 1,
last_recovery_dispatch_leads: 1,
@@ -2348,12 +2410,14 @@ mod tests {
let mut dashboard = test_dashboard(sessions, 0);
dashboard.unread_message_counts = unread;
dashboard.handoff_backlog_counts = HashMap::from([(String::from("focus-12345678"), 3usize)]);
dashboard.handoff_backlog_counts =
HashMap::from([(String::from("focus-12345678"), 3usize)]);
dashboard.daemon_activity = DaemonActivity {
last_dispatch_at: Some(now + chrono::Duration::seconds(2)),
last_dispatch_routed: 2,
last_dispatch_deferred: 0,
last_dispatch_leads: 1,
chronic_saturation_streak: 0,
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
last_recovery_dispatch_routed: 1,
last_recovery_dispatch_leads: 1,
@@ -2690,7 +2754,10 @@ mod tests {
let session = db
.get_session("stopped-1")?
.expect("session should exist after cleanup");
assert!(session.worktree.is_none(), "worktree metadata should be cleared");
assert!(
session.worktree.is_none(),
"worktree metadata should be cleared"
);
let _ = std::fs::remove_dir_all(worktree_path);
let _ = std::fs::remove_file(db_path);
@@ -2720,7 +2787,10 @@ mod tests {
let mut dashboard = Dashboard::new(dashboard_store, Config::default());
dashboard.delete_selected_session().await;
assert!(db.get_session("done-1")?.is_none(), "session should be deleted");
assert!(
db.get_session("done-1")?.is_none(),
"session should be deleted"
);
let _ = std::fs::remove_file(db_path);
Ok(())
@@ -2845,7 +2915,10 @@ mod tests {
let mut dashboard = Dashboard::new(dashboard_store, Config::default());
dashboard.coordinate_backlog().await;
assert_eq!(dashboard.operator_note.as_deref(), Some("backlog already clear"));
assert_eq!(
dashboard.operator_note.as_deref(),
Some("backlog already clear")
);
let _ = std::fs::remove_file(db_path);
Ok(())
@@ -2853,7 +2926,17 @@ mod tests {
#[test]
fn grid_layout_renders_four_panes() {
let mut dashboard = test_dashboard(vec![sample_session("grid-1", "claude", SessionState::Running, None, 1, 1)], 0);
let mut dashboard = test_dashboard(
vec![sample_session(
"grid-1",
"claude",
SessionState::Running,
None,
1,
1,
)],
0,
);
dashboard.cfg.pane_layout = PaneLayout::Grid;
dashboard.pane_size_percent = DEFAULT_GRID_SIZE_PERCENT;