From 10e34aa47a5250289c4d5bb05f4a31c07b352e8f Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 12:36:32 -0700 Subject: [PATCH] feat: track ecc2 chronic saturation streak --- ecc2/src/session/daemon.rs | 176 +++++++++++++++++------- ecc2/src/session/store.rs | 122 ++++++++++++----- ecc2/src/tui/dashboard.rs | 271 ++++++++++++++++++++++++------------- 3 files changed, 394 insertions(+), 175 deletions(-) diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index c63195e3..e8986db5 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -97,15 +97,19 @@ fn check_sessions(db: &StateStore, timeout: Duration) -> Result<()> { } async fn maybe_auto_dispatch(db: &StateStore, cfg: &Config) -> Result { - let summary = maybe_auto_dispatch_with_recorder(cfg, || { - manager::auto_dispatch_backlog( - db, - cfg, - &cfg.default_agent, - true, - cfg.max_parallel_sessions, - ) - }, |routed, deferred, leads| db.record_daemon_dispatch_pass(routed, deferred, leads)) + let summary = maybe_auto_dispatch_with_recorder( + cfg, + || { + manager::auto_dispatch_backlog( + db, + cfg, + &cfg.default_agent, + true, + cfg.max_parallel_sessions, + ) + }, + |routed, deferred, leads| db.record_daemon_dispatch_pass(routed, deferred, leads), + ) .await?; Ok(summary.routed) } @@ -116,26 +120,34 @@ async fn coordinate_backlog_cycle(db: &StateStore, cfg: &Config) -> Result<()> { cfg, &activity, || { - maybe_auto_dispatch_with_recorder(cfg, || { - manager::auto_dispatch_backlog( - db, - cfg, - &cfg.default_agent, - true, - cfg.max_parallel_sessions, - ) - }, |routed, deferred, leads| db.record_daemon_dispatch_pass(routed, deferred, leads)) + maybe_auto_dispatch_with_recorder( + cfg, + || { + manager::auto_dispatch_backlog( + db, + cfg, + &cfg.default_agent, + true, + cfg.max_parallel_sessions, + ) + }, + |routed, deferred, leads| db.record_daemon_dispatch_pass(routed, deferred, leads), + ) }, || { - maybe_auto_rebalance_with_recorder(cfg, || { - manager::rebalance_all_teams( - db, - cfg, - &cfg.default_agent, - true, - cfg.max_parallel_sessions, - ) - }, |rerouted, leads| db.record_daemon_rebalance_pass(rerouted, leads)) + maybe_auto_rebalance_with_recorder( + cfg, + || { + manager::rebalance_all_teams( + db, + cfg, + &cfg.default_agent, + true, + cfg.max_parallel_sessions, + ) + }, + |rerouted, leads| db.record_daemon_rebalance_pass(rerouted, leads), + ) }, |routed, leads| db.record_daemon_recovery_dispatch_pass(routed, leads), ) @@ -163,7 +175,11 @@ where tracing::warn!( "Skipping immediate dispatch retry because chronic saturation cooloff is active" ); - return Ok((DispatchPassSummary::default(), rebalanced, DispatchPassSummary::default())); + return Ok(( + DispatchPassSummary::default(), + rebalanced, + DispatchPassSummary::default(), + )); } let first_dispatch = dispatch().await?; if first_dispatch.routed > 0 { @@ -206,7 +222,11 @@ where F: Fn() -> Fut, Fut: Future>>, { - Ok(maybe_auto_dispatch_with_recorder(cfg, dispatch, |_, _, _| Ok(())).await?.routed) + Ok( + maybe_auto_dispatch_with_recorder(cfg, dispatch, |_, _, _| Ok(())) + .await? + .routed, + ) } async fn maybe_auto_dispatch_with_recorder( @@ -254,9 +274,7 @@ where ); } if deferred > 0 { - tracing::warn!( - "Deferred {deferred} task handoff(s) because delegate teams were saturated" - ); + tracing::warn!("Deferred {deferred} task handoff(s) because delegate teams were saturated"); } Ok(DispatchPassSummary { @@ -267,15 +285,19 @@ where } async fn maybe_auto_rebalance(db: &StateStore, cfg: &Config) -> Result { - maybe_auto_rebalance_with_recorder(cfg, || { - manager::rebalance_all_teams( - db, - cfg, - &cfg.default_agent, - true, - cfg.max_parallel_sessions, - ) - }, |rerouted, leads| db.record_daemon_rebalance_pass(rerouted, leads)) + maybe_auto_rebalance_with_recorder( + cfg, + || { + manager::rebalance_all_teams( + db, + cfg, + &cfg.default_agent, + true, + cfg.max_parallel_sessions, + ) + }, + |rerouted, leads| db.record_daemon_rebalance_pass(rerouted, leads), + ) .await } @@ -528,7 +550,8 @@ mod tests { } #[tokio::test] - async fn coordinate_backlog_cycle_retries_after_rebalance_when_dispatch_deferred() -> Result<()> { + async fn coordinate_backlog_cycle_retries_after_rebalance_when_dispatch_deferred() -> Result<()> + { let cfg = Config { auto_dispatch_unread_handoffs: true, ..Config::default() @@ -607,7 +630,8 @@ mod tests { } #[tokio::test] - async fn coordinate_backlog_cycle_records_recovery_dispatch_when_it_routes_work() -> Result<()> { + async fn coordinate_backlog_cycle_records_recovery_dispatch_when_it_routes_work() -> Result<()> + { let cfg = Config { auto_dispatch_unread_handoffs: true, ..Config::default() @@ -653,7 +677,8 @@ mod tests { } #[tokio::test] - async fn coordinate_backlog_cycle_rebalances_first_after_unrecovered_deferred_pressure() -> Result<()> { + async fn coordinate_backlog_cycle_rebalances_first_after_unrecovered_deferred_pressure( + ) -> Result<()> { let cfg = Config { auto_dispatch_unread_handoffs: true, ..Config::default() @@ -664,6 +689,7 @@ mod tests { last_dispatch_routed: 0, last_dispatch_deferred: 2, last_dispatch_leads: 1, + chronic_saturation_streak: 1, last_recovery_dispatch_at: None, last_recovery_dispatch_routed: 0, last_recovery_dispatch_leads: 0, @@ -708,7 +734,8 @@ mod tests { } #[tokio::test] - async fn coordinate_backlog_cycle_records_recovery_when_rebalance_first_dispatch_routes_work() -> Result<()> { + async fn coordinate_backlog_cycle_records_recovery_when_rebalance_first_dispatch_routes_work( + ) -> Result<()> { let cfg = Config { auto_dispatch_unread_handoffs: true, ..Config::default() @@ -719,6 +746,7 @@ mod tests { last_dispatch_routed: 0, last_dispatch_deferred: 2, last_dispatch_leads: 1, + chronic_saturation_streak: 1, last_recovery_dispatch_at: None, last_recovery_dispatch_routed: 0, last_recovery_dispatch_leads: 0, @@ -755,7 +783,8 @@ mod tests { } #[tokio::test] - async fn coordinate_backlog_cycle_skips_dispatch_during_chronic_cooloff_when_rebalance_does_not_help() -> Result<()> { + async fn coordinate_backlog_cycle_skips_dispatch_during_chronic_cooloff_when_rebalance_does_not_help( + ) -> Result<()> { let cfg = Config { auto_dispatch_unread_handoffs: true, ..Config::default() @@ -766,6 +795,7 @@ mod tests { last_dispatch_routed: 0, last_dispatch_deferred: 3, last_dispatch_leads: 1, + chronic_saturation_streak: 1, last_recovery_dispatch_at: None, last_recovery_dispatch_routed: 0, last_recovery_dispatch_leads: 0, @@ -803,7 +833,58 @@ mod tests { } #[tokio::test] - async fn coordinate_backlog_cycle_skips_rebalance_when_stabilized_and_dispatch_is_healthy() -> Result<()> { + async fn coordinate_backlog_cycle_skips_dispatch_when_persistent_saturation_streak_hits_cooloff( + ) -> Result<()> { + let cfg = Config { + auto_dispatch_unread_handoffs: true, + ..Config::default() + }; + let now = chrono::Utc::now(); + let activity = DaemonActivity { + last_dispatch_at: Some(now), + last_dispatch_routed: 0, + last_dispatch_deferred: 1, + last_dispatch_leads: 1, + chronic_saturation_streak: 3, + last_recovery_dispatch_at: None, + last_recovery_dispatch_routed: 0, + last_recovery_dispatch_leads: 0, + last_rebalance_at: Some(now - chrono::Duration::seconds(1)), + last_rebalance_rerouted: 0, + last_rebalance_leads: 1, + }; + let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let calls_clone = calls.clone(); + + let (first, rebalanced, recovery) = coordinate_backlog_cycle_with( + &cfg, + &activity, + move || { + let calls_clone = calls_clone.clone(); + async move { + calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(DispatchPassSummary { + routed: 1, + deferred: 0, + leads: 1, + }) + } + }, + || async move { Ok(0) }, + |_, _| Ok(()), + ) + .await?; + + assert_eq!(first, DispatchPassSummary::default()); + assert_eq!(rebalanced, 0); + assert_eq!(recovery, DispatchPassSummary::default()); + assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 0); + Ok(()) + } + + #[tokio::test] + async fn coordinate_backlog_cycle_skips_rebalance_when_stabilized_and_dispatch_is_healthy( + ) -> Result<()> { let cfg = Config { auto_dispatch_unread_handoffs: true, ..Config::default() @@ -814,6 +895,7 @@ mod tests { last_dispatch_routed: 2, last_dispatch_deferred: 0, last_dispatch_leads: 1, + chronic_saturation_streak: 0, last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)), last_recovery_dispatch_routed: 1, last_recovery_dispatch_leads: 1, diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 793fa6d3..096ca52a 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -19,6 +19,7 @@ pub struct DaemonActivity { pub last_dispatch_routed: usize, pub last_dispatch_deferred: usize, pub last_dispatch_leads: usize, + pub chronic_saturation_streak: usize, pub last_recovery_dispatch_at: Option>, pub last_recovery_dispatch_routed: usize, pub last_recovery_dispatch_leads: usize, @@ -44,12 +45,11 @@ impl DaemonActivity { } pub fn dispatch_cooloff_active(&self) -> bool { - self.prefers_rebalance_first() && self.last_dispatch_deferred >= 2 + self.prefers_rebalance_first() + && (self.last_dispatch_deferred >= 2 || self.chronic_saturation_streak >= 3) } - pub fn chronic_saturation_cleared_at( - &self, - ) -> Option<&chrono::DateTime> { + pub fn chronic_saturation_cleared_at(&self) -> Option<&chrono::DateTime> { if self.prefers_rebalance_first() { return None; } @@ -58,14 +58,14 @@ impl DaemonActivity { self.last_dispatch_at.as_ref(), self.last_recovery_dispatch_at.as_ref(), ) { - (Some(dispatch_at), Some(recovery_at)) if recovery_at > dispatch_at => Some(recovery_at), + (Some(dispatch_at), Some(recovery_at)) if recovery_at > dispatch_at => { + Some(recovery_at) + } _ => None, } } - pub fn stabilized_after_recovery_at( - &self, - ) -> Option<&chrono::DateTime> { + pub fn stabilized_after_recovery_at(&self) -> Option<&chrono::DateTime> { if self.last_dispatch_deferred != 0 { return None; } @@ -74,7 +74,9 @@ impl DaemonActivity { self.last_dispatch_at.as_ref(), self.last_recovery_dispatch_at.as_ref(), ) { - (Some(dispatch_at), Some(recovery_at)) if dispatch_at > recovery_at => Some(dispatch_at), + (Some(dispatch_at), Some(recovery_at)) if dispatch_at > recovery_at => { + Some(dispatch_at) + } _ => None, } } @@ -147,6 +149,7 @@ impl StateStore { last_dispatch_routed INTEGER NOT NULL DEFAULT 0, last_dispatch_deferred INTEGER NOT NULL DEFAULT 0, last_dispatch_leads INTEGER NOT NULL DEFAULT 0, + chronic_saturation_streak INTEGER NOT NULL DEFAULT 0, last_recovery_dispatch_at TEXT, last_recovery_dispatch_routed INTEGER NOT NULL DEFAULT 0, last_recovery_dispatch_leads INTEGER NOT NULL DEFAULT 0, @@ -199,7 +202,9 @@ impl StateStore { "ALTER TABLE daemon_activity ADD COLUMN last_recovery_dispatch_at TEXT", [], ) - .context("Failed to add last_recovery_dispatch_at column to daemon_activity table")?; + .context( + "Failed to add last_recovery_dispatch_at column to daemon_activity table", + )?; } if !self.has_column("daemon_activity", "last_recovery_dispatch_routed")? { @@ -220,6 +225,15 @@ impl StateStore { .context("Failed to add last_recovery_dispatch_leads column to daemon_activity table")?; } + if !self.has_column("daemon_activity", "chronic_saturation_streak")? { + self.conn + .execute( + "ALTER TABLE daemon_activity ADD COLUMN chronic_saturation_streak INTEGER NOT NULL DEFAULT 0", + [], + ) + .context("Failed to add chronic_saturation_streak column to daemon_activity table")?; + } + Ok(()) } @@ -550,9 +564,7 @@ impl StateStore { }) })?; - messages - .collect::, _>>() - .map_err(Into::into) + messages.collect::, _>>().map_err(Into::into) } pub fn unread_task_handoff_count(&self, session_id: &str) -> Result { @@ -582,9 +594,7 @@ impl StateStore { Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)? as usize)) })?; - targets - .collect::, _>>() - .map_err(Into::into) + targets.collect::, _>>().map_err(Into::into) } pub fn mark_messages_read(&self, session_id: &str) -> Result { @@ -624,6 +634,7 @@ impl StateStore { self.conn .query_row( "SELECT last_dispatch_at, last_dispatch_routed, last_dispatch_deferred, last_dispatch_leads, + chronic_saturation_streak, last_recovery_dispatch_at, last_recovery_dispatch_routed, last_recovery_dispatch_leads, last_rebalance_at, last_rebalance_rerouted, last_rebalance_leads FROM daemon_activity @@ -652,12 +663,13 @@ impl StateStore { last_dispatch_routed: row.get::<_, i64>(1)? as usize, last_dispatch_deferred: row.get::<_, i64>(2)? as usize, last_dispatch_leads: row.get::<_, i64>(3)? as usize, - last_recovery_dispatch_at: parse_ts(row.get(4)?)?, - last_recovery_dispatch_routed: row.get::<_, i64>(5)? as usize, - last_recovery_dispatch_leads: row.get::<_, i64>(6)? as usize, - last_rebalance_at: parse_ts(row.get(7)?)?, - last_rebalance_rerouted: row.get::<_, i64>(8)? as usize, - last_rebalance_leads: row.get::<_, i64>(9)? as usize, + chronic_saturation_streak: row.get::<_, i64>(4)? as usize, + last_recovery_dispatch_at: parse_ts(row.get(5)?)?, + last_recovery_dispatch_routed: row.get::<_, i64>(6)? as usize, + last_recovery_dispatch_leads: row.get::<_, i64>(7)? as usize, + last_rebalance_at: parse_ts(row.get(8)?)?, + last_rebalance_rerouted: row.get::<_, i64>(9)? as usize, + last_rebalance_leads: row.get::<_, i64>(10)? as usize, }) }, ) @@ -675,7 +687,11 @@ impl StateStore { SET last_dispatch_at = ?1, last_dispatch_routed = ?2, last_dispatch_deferred = ?3, - last_dispatch_leads = ?4 + last_dispatch_leads = ?4, + chronic_saturation_streak = CASE + WHEN ?3 > 0 THEN chronic_saturation_streak + 1 + ELSE 0 + END WHERE id = 1", rusqlite::params![ chrono::Utc::now().to_rfc3339(), @@ -693,7 +709,8 @@ impl StateStore { "UPDATE daemon_activity SET last_recovery_dispatch_at = ?1, last_recovery_dispatch_routed = ?2, - last_recovery_dispatch_leads = ?3 + last_recovery_dispatch_leads = ?3, + chronic_saturation_streak = 0 WHERE id = 1", rusqlite::params![chrono::Utc::now().to_rfc3339(), routed as i64, leads as i64], )?; @@ -708,7 +725,11 @@ impl StateStore { last_rebalance_rerouted = ?2, last_rebalance_leads = ?3 WHERE id = 1", - rusqlite::params![chrono::Utc::now().to_rfc3339(), rerouted as i64, leads as i64], + rusqlite::params![ + chrono::Utc::now().to_rfc3339(), + rerouted as i64, + leads as i64 + ], )?; Ok(()) @@ -1023,7 +1044,12 @@ mod tests { db.insert_session(&build_session("planner", SessionState::Running))?; db.insert_session(&build_session("worker", SessionState::Pending))?; - db.send_message("planner", "worker", "{\"question\":\"Need context\"}", "query")?; + db.send_message( + "planner", + "worker", + "{\"question\":\"Need context\"}", + "query", + )?; db.send_message( "worker", "planner", @@ -1066,17 +1092,11 @@ mod tests { ); assert_eq!( db.delegated_children("planner", 10)?, - vec![ - "worker-3".to_string(), - "worker-2".to_string(), - ] + vec!["worker-3".to_string(), "worker-2".to_string(),] ); assert_eq!( db.unread_task_handoff_targets(10)?, - vec![ - ("worker-2".to_string(), 1), - ("worker-3".to_string(), 1), - ] + vec![("worker-2".to_string(), 1), ("worker-3".to_string(), 1),] ); Ok(()) @@ -1095,6 +1115,7 @@ mod tests { assert_eq!(activity.last_dispatch_routed, 4); assert_eq!(activity.last_dispatch_deferred, 1); assert_eq!(activity.last_dispatch_leads, 2); + assert_eq!(activity.chronic_saturation_streak, 0); assert_eq!(activity.last_recovery_dispatch_routed, 2); assert_eq!(activity.last_recovery_dispatch_leads, 1); assert_eq!(activity.last_rebalance_rerouted, 3); @@ -1121,6 +1142,7 @@ mod tests { last_dispatch_routed: 0, last_dispatch_deferred: 2, last_dispatch_leads: 1, + chronic_saturation_streak: 1, last_recovery_dispatch_at: None, last_recovery_dispatch_routed: 0, last_recovery_dispatch_leads: 0, @@ -1133,9 +1155,18 @@ mod tests { assert!(unresolved.chronic_saturation_cleared_at().is_none()); assert!(unresolved.stabilized_after_recovery_at().is_none()); + let persistent = DaemonActivity { + last_dispatch_deferred: 1, + chronic_saturation_streak: 3, + ..unresolved.clone() + }; + assert!(persistent.prefers_rebalance_first()); + assert!(persistent.dispatch_cooloff_active()); + let recovered = DaemonActivity { last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)), last_recovery_dispatch_routed: 1, + chronic_saturation_streak: 0, ..unresolved }; assert!(!recovered.prefers_rebalance_first()); @@ -1161,4 +1192,27 @@ mod tests { stabilized.last_dispatch_at.as_ref() ); } + + #[test] + fn daemon_activity_tracks_chronic_saturation_streak() -> Result<()> { + let tempdir = TestDir::new("store-daemon-streak")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + + db.record_daemon_dispatch_pass(0, 1, 1)?; + db.record_daemon_dispatch_pass(0, 1, 1)?; + let saturated = db.daemon_activity()?; + assert_eq!(saturated.chronic_saturation_streak, 2); + assert!(!saturated.dispatch_cooloff_active()); + + db.record_daemon_dispatch_pass(0, 1, 1)?; + let chronic = db.daemon_activity()?; + assert_eq!(chronic.chronic_saturation_streak, 3); + assert!(chronic.dispatch_cooloff_active()); + + db.record_daemon_recovery_dispatch_pass(1, 1)?; + let recovered = db.daemon_activity()?; + assert_eq!(recovered.chronic_saturation_streak, 0); + + Ok(()) + } } diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index 36ce08e6..18d28320 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -1,28 +1,28 @@ -use std::collections::HashMap; use ratatui::{ prelude::*, widgets::{ Block, Borders, Cell, HighlightSpacing, Paragraph, Row, Table, TableState, Tabs, Wrap, }, }; +use std::collections::HashMap; use tokio::sync::broadcast; use super::widgets::{budget_state, format_currency, format_token_count, BudgetState, TokenMeter}; use crate::comms; use crate::config::{Config, PaneLayout}; use crate::observability::ToolLogEntry; -use crate::session::output::{OutputEvent, OutputLine, SessionOutputStore, OUTPUT_BUFFER_LIMIT}; use crate::session::manager; +use crate::session::output::{OutputEvent, OutputLine, SessionOutputStore, OUTPUT_BUFFER_LIMIT}; use crate::session::store::{DaemonActivity, StateStore}; use crate::session::{Session, SessionMessage, SessionState}; use crate::worktree; -#[cfg(test)] -use std::path::Path; #[cfg(test)] use crate::session::output::OutputStream; #[cfg(test)] use crate::session::{SessionMetrics, WorktreeInfo}; +#[cfg(test)] +use std::path::Path; const DEFAULT_PANE_SIZE_PERCENT: u16 = 35; const DEFAULT_GRID_SIZE_PERCENT: u16 = 50; @@ -122,7 +122,11 @@ impl Dashboard { Self::with_output_store(db, cfg, SessionOutputStore::default()) } - pub fn with_output_store(db: StateStore, cfg: Config, output_store: SessionOutputStore) -> Self { + pub fn with_output_store( + db: StateStore, + cfg: Config, + output_store: SessionOutputStore, + ) -> Self { let pane_size_percent = match cfg.pane_layout { PaneLayout::Grid => DEFAULT_GRID_SIZE_PERCENT, PaneLayout::Horizontal | PaneLayout::Vertical => DEFAULT_PANE_SIZE_PERCENT, @@ -221,13 +225,13 @@ impl Dashboard { .map(|pane| pane.title()) .collect::>(), ) - .block(Block::default().borders(Borders::ALL).title(title)) - .select(self.selected_pane_index()) - .highlight_style( - Style::default() - .fg(Color::Cyan) - .add_modifier(Modifier::BOLD), - ); + .block(Block::default().borders(Borders::ALL).title(title)) + .select(self.selected_pane_index()) + .highlight_style( + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + ); frame.render_widget(tabs, area); } @@ -244,7 +248,10 @@ impl Dashboard { return; } - let stabilized = self.daemon_activity.stabilized_after_recovery_at().is_some(); + let stabilized = self + .daemon_activity + .stabilized_after_recovery_at() + .is_some(); let summary = SessionSummary::from_sessions(&self.sessions, &self.handoff_backlog_counts, stabilized); let chunks = Layout::default() @@ -269,8 +276,10 @@ impl Dashboard { .unwrap_or(0), ) }); - let header = Row::new(["ID", "Agent", "State", "Branch", "Backlog", "Tokens", "Duration"]) - .style(Style::default().add_modifier(Modifier::BOLD)); + let header = Row::new([ + "ID", "Agent", "State", "Branch", "Backlog", "Tokens", "Duration", + ]) + .style(Style::default().add_modifier(Modifier::BOLD)); let widths = [ Constraint::Length(8), Constraint::Length(10), @@ -600,14 +609,15 @@ impl Dashboard { let task = self.new_session_task(); let agent = self.cfg.default_agent.clone(); - let session_id = match manager::create_session(&self.db, &self.cfg, &task, &agent, true).await { - Ok(session_id) => session_id, - Err(error) => { - tracing::warn!("Failed to create new session from dashboard: {error}"); - self.set_operator_note(format!("new session failed: {error}")); - return; - } - }; + let session_id = + match manager::create_session(&self.db, &self.cfg, &task, &agent, true).await { + Ok(session_id) => session_id, + Err(error) => { + tracing::warn!("Failed to create new session from dashboard: {error}"); + self.set_operator_note(format!("new session failed: {error}")); + return; + } + }; if let Some(source_session) = self.sessions.get(self.selected_session) { let context = format!( @@ -644,7 +654,10 @@ impl Dashboard { self.refresh(); self.sync_selection_by_id(Some(&session_id)); - self.set_operator_note(format!("spawned session {}", format_session_id(&session_id))); + self.set_operator_note(format!( + "spawned session {}", + format_session_id(&session_id) + )); self.reset_output_view(); self.sync_selected_output(); self.sync_selected_diff(); @@ -808,22 +821,17 @@ impl Dashboard { let agent = self.cfg.default_agent.clone(); let lead_limit = self.sessions.len().max(1); - let outcomes = match manager::auto_dispatch_backlog( - &self.db, - &self.cfg, - &agent, - true, - lead_limit, - ) - .await - { - Ok(outcomes) => outcomes, - Err(error) => { - tracing::warn!("Failed to auto-dispatch backlog from dashboard: {error}"); - self.set_operator_note(format!("global auto-dispatch failed: {error}")); - return; - } - }; + let outcomes = + match manager::auto_dispatch_backlog(&self.db, &self.cfg, &agent, true, lead_limit) + .await + { + Ok(outcomes) => outcomes, + Err(error) => { + tracing::warn!("Failed to auto-dispatch backlog from dashboard: {error}"); + self.set_operator_note(format!("global auto-dispatch failed: {error}")); + return; + } + }; let total_processed: usize = outcomes.iter().map(|outcome| outcome.routed.len()).sum(); let total_routed: usize = outcomes @@ -867,22 +875,16 @@ impl Dashboard { let agent = self.cfg.default_agent.clone(); let lead_limit = self.sessions.len().max(1); - let outcomes = match manager::rebalance_all_teams( - &self.db, - &self.cfg, - &agent, - true, - lead_limit, - ) - .await - { - Ok(outcomes) => outcomes, - Err(error) => { - tracing::warn!("Failed to rebalance teams from dashboard: {error}"); - self.set_operator_note(format!("global rebalance failed: {error}")); - return; - } - }; + let outcomes = + match manager::rebalance_all_teams(&self.db, &self.cfg, &agent, true, lead_limit).await + { + Ok(outcomes) => outcomes, + Err(error) => { + tracing::warn!("Failed to rebalance teams from dashboard: {error}"); + self.set_operator_note(format!("global rebalance failed: {error}")); + return; + } + }; let total_rerouted: usize = outcomes.iter().map(|outcome| outcome.rerouted.len()).sum(); let selected_session_id = self @@ -914,11 +916,7 @@ impl Dashboard { let lead_limit = self.sessions.len().max(1); let outcome = match manager::coordinate_backlog( - &self.db, - &self.cfg, - &agent, - true, - lead_limit, + &self.db, &self.cfg, &agent, true, lead_limit, ) .await { @@ -992,12 +990,18 @@ impl Dashboard { let session_id = session.id.clone(); if let Err(error) = manager::stop_session(&self.db, &session_id).await { tracing::warn!("Failed to stop session {}: {error}", session.id); - self.set_operator_note(format!("stop failed for {}: {error}", format_session_id(&session_id))); + self.set_operator_note(format!( + "stop failed for {}: {error}", + format_session_id(&session_id) + )); return; } self.refresh(); - self.set_operator_note(format!("stopped session {}", format_session_id(&session_id))); + self.set_operator_note(format!( + "stopped session {}", + format_session_id(&session_id) + )); } pub async fn resume_selected(&mut self) { @@ -1008,12 +1012,18 @@ impl Dashboard { let session_id = session.id.clone(); if let Err(error) = manager::resume_session(&self.db, &self.cfg, &session_id).await { tracing::warn!("Failed to resume session {}: {error}", session.id); - self.set_operator_note(format!("resume failed for {}: {error}", format_session_id(&session_id))); + self.set_operator_note(format!( + "resume failed for {}: {error}", + format_session_id(&session_id) + )); return; } self.refresh(); - self.set_operator_note(format!("resumed session {}", format_session_id(&session_id))); + self.set_operator_note(format!( + "resumed session {}", + format_session_id(&session_id) + )); } pub async fn cleanup_selected_worktree(&mut self) { @@ -1036,7 +1046,10 @@ impl Dashboard { } self.refresh(); - self.set_operator_note(format!("cleaned worktree for {}", format_session_id(&session_id))); + self.set_operator_note(format!( + "cleaned worktree for {}", + format_session_id(&session_id) + )); } pub async fn delete_selected_session(&mut self) { @@ -1047,12 +1060,18 @@ impl Dashboard { let session_id = session.id.clone(); if let Err(error) = manager::delete_session(&self.db, &session_id).await { tracing::warn!("Failed to delete session {}: {error}", session.id); - self.set_operator_note(format!("delete failed for {}: {error}", format_session_id(&session_id))); + self.set_operator_note(format!( + "delete failed for {}: {error}", + format_session_id(&session_id) + )); return; } self.refresh(); - self.set_operator_note(format!("deleted session {}", format_session_id(&session_id))); + self.set_operator_note(format!( + "deleted session {}", + format_session_id(&session_id) + )); } pub fn refresh(&mut self) { @@ -1085,7 +1104,8 @@ impl Dashboard { } pub fn adjust_auto_dispatch_limit(&mut self, delta: isize) { - let next = (self.cfg.auto_dispatch_limit_per_session as isize + delta).clamp(1, 50) as usize; + let next = + (self.cfg.auto_dispatch_limit_per_session as isize + delta).clamp(1, 50) as usize; if next == self.cfg.auto_dispatch_limit_per_session { self.set_operator_note(format!( "auto-dispatch limit unchanged at {} handoff(s) per lead", @@ -1162,7 +1182,11 @@ impl Dashboard { fn sync_selection_by_id(&mut self, selected_id: Option<&str>) { if let Some(selected_id) = selected_id { - if let Some(index) = self.sessions.iter().position(|session| session.id == selected_id) { + if let Some(index) = self + .sessions + .iter() + .position(|session| session.id == selected_id) + { self.selected_session = index; } } @@ -1246,7 +1270,11 @@ impl Dashboard { return; }; - let unread_count = self.unread_message_counts.get(&session_id).copied().unwrap_or(0); + let unread_count = self + .unread_message_counts + .get(&session_id) + .copied() + .unwrap_or(0); if unread_count > 0 { match self.db.mark_messages_read(&session_id) { Ok(_) => { @@ -1297,7 +1325,8 @@ impl Dashboard { match self.db.get_session(&child_id) { Ok(Some(session)) => { team.total += 1; - let handoff_backlog = match self.db.unread_task_handoff_count(&child_id) { + let handoff_backlog = match self.db.unread_task_handoff_count(&child_id) + { Ok(count) => count, Err(error) => { tracing::warn!( @@ -1360,7 +1389,9 @@ impl Dashboard { ) -> Option { if let Some(idle_clear) = delegates .iter() - .filter(|delegate| delegate.state == SessionState::Idle && delegate.handoff_backlog == 0) + .filter(|delegate| { + delegate.state == SessionState::Idle && delegate.handoff_backlog == 0 + }) .min_by_key(|delegate| delegate.session_id.as_str()) { return Some(format!( @@ -1387,7 +1418,12 @@ impl Dashboard { if let Some(active_delegate) = delegates .iter() - .filter(|delegate| matches!(delegate.state, SessionState::Running | SessionState::Pending)) + .filter(|delegate| { + matches!( + delegate.state, + SessionState::Running | SessionState::Pending + ) + }) .min_by_key(|delegate| (delegate.handoff_backlog, delegate.session_id.as_str())) { return Some(format!( @@ -1510,7 +1546,11 @@ impl Dashboard { "Global handoff backlog {} lead(s) / {} handoff(s) | Auto-dispatch {} @ {}/lead", self.global_handoff_backlog_leads, self.global_handoff_backlog_messages, - if self.cfg.auto_dispatch_unread_handoffs { "on" } else { "off" }, + if self.cfg.auto_dispatch_unread_handoffs { + "on" + } else { + "off" + }, self.cfg.auto_dispatch_limit_per_session )); @@ -1529,6 +1569,13 @@ impl Dashboard { } )); + if self.daemon_activity.chronic_saturation_streak > 0 { + lines.push(format!( + "Chronic saturation streak {} cycle(s)", + self.daemon_activity.chronic_saturation_streak + )); + } + if let Some(cleared_at) = self.daemon_activity.chronic_saturation_cleared_at() { lines.push(format!( "Chronic saturation cleared @ {}", @@ -1684,7 +1731,10 @@ impl Dashboard { fn attention_queue_items(&self, limit: usize) -> Vec { let mut items = Vec::new(); - let suppress_inbox_attention = self.daemon_activity.stabilized_after_recovery_at().is_some(); + let suppress_inbox_attention = self + .daemon_activity + .stabilized_after_recovery_at() + .is_some(); for session in &self.sessions { let handoff_backlog = self @@ -1914,7 +1964,10 @@ impl SessionSummary { inbox_sessions: if suppress_inbox_attention { 0 } else { - unread_message_counts.values().filter(|count| **count > 0).count() + unread_message_counts + .values() + .filter(|count| **count > 0) + .count() }, ..Self::default() }, @@ -1991,7 +2044,9 @@ fn attention_queue_line(summary: &SessionSummary, stabilized: bool) -> Line<'sta return Line::from(vec![ Span::styled( "Attention queue clear", - Style::default().fg(Color::Green).add_modifier(Modifier::BOLD), + Style::default() + .fg(Color::Green) + .add_modifier(Modifier::BOLD), ), Span::raw(if stabilized { " stabilized backlog absorbed" @@ -2004,7 +2059,9 @@ fn attention_queue_line(summary: &SessionSummary, stabilized: bool) -> Line<'sta Line::from(vec![ Span::styled( "Attention queue ", - Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD), + Style::default() + .fg(Color::Yellow) + .add_modifier(Modifier::BOLD), ), summary_span("Backlog", summary.unread_messages, Color::Magenta), summary_span("Failed", summary.failed, Color::Red), @@ -2141,15 +2198,13 @@ mod tests { ], 0, ); - dashboard - .session_output_cache - .insert( - "focus-12345678".to_string(), - vec![OutputLine { - stream: OutputStream::Stdout, - text: "last useful output".to_string(), - }], - ); + dashboard.session_output_cache.insert( + "focus-12345678".to_string(), + vec![OutputLine { + stream: OutputStream::Stdout, + text: "last useful output".to_string(), + }], + ); dashboard.selected_diff_summary = Some("1 file changed, 2 insertions(+)".to_string()); let text = dashboard.selected_session_metrics_text(); @@ -2188,7 +2243,9 @@ mod tests { let text = dashboard.selected_session_metrics_text(); assert!(text.contains("Team 3/8 | idle 1 | running 1 | pending 1 | failed 0 | stopped 0")); - assert!(text.contains("Global handoff backlog 2 lead(s) / 5 handoff(s) | Auto-dispatch off @ 5/lead")); + assert!(text.contains( + "Global handoff backlog 2 lead(s) / 5 handoff(s) | Auto-dispatch off @ 5/lead" + )); assert!(text.contains("Coordination mode dispatch-first")); assert!(text.contains("Next route reuse idle worker-1")); } @@ -2212,6 +2269,7 @@ mod tests { last_dispatch_routed: 4, last_dispatch_deferred: 2, last_dispatch_leads: 2, + chronic_saturation_streak: 0, last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)), last_recovery_dispatch_routed: 1, last_recovery_dispatch_leads: 1, @@ -2246,6 +2304,7 @@ mod tests { last_dispatch_routed: 0, last_dispatch_deferred: 1, last_dispatch_leads: 1, + chronic_saturation_streak: 1, last_recovery_dispatch_at: None, last_recovery_dispatch_routed: 0, last_recovery_dispatch_leads: 0, @@ -2276,6 +2335,7 @@ mod tests { last_dispatch_routed: 0, last_dispatch_deferred: 3, last_dispatch_leads: 1, + chronic_saturation_streak: 3, last_recovery_dispatch_at: None, last_recovery_dispatch_routed: 0, last_recovery_dispatch_leads: 0, @@ -2286,6 +2346,7 @@ mod tests { let text = dashboard.selected_session_metrics_text(); assert!(text.contains("Coordination mode rebalance-cooloff (chronic saturation)")); + assert!(text.contains("Chronic saturation streak 3 cycle(s)")); } #[test] @@ -2307,6 +2368,7 @@ mod tests { last_dispatch_routed: 2, last_dispatch_deferred: 0, last_dispatch_leads: 1, + chronic_saturation_streak: 0, last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)), last_recovery_dispatch_routed: 1, last_recovery_dispatch_leads: 1, @@ -2348,12 +2410,14 @@ mod tests { let mut dashboard = test_dashboard(sessions, 0); dashboard.unread_message_counts = unread; - dashboard.handoff_backlog_counts = HashMap::from([(String::from("focus-12345678"), 3usize)]); + dashboard.handoff_backlog_counts = + HashMap::from([(String::from("focus-12345678"), 3usize)]); dashboard.daemon_activity = DaemonActivity { last_dispatch_at: Some(now + chrono::Duration::seconds(2)), last_dispatch_routed: 2, last_dispatch_deferred: 0, last_dispatch_leads: 1, + chronic_saturation_streak: 0, last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)), last_recovery_dispatch_routed: 1, last_recovery_dispatch_leads: 1, @@ -2690,7 +2754,10 @@ mod tests { let session = db .get_session("stopped-1")? .expect("session should exist after cleanup"); - assert!(session.worktree.is_none(), "worktree metadata should be cleared"); + assert!( + session.worktree.is_none(), + "worktree metadata should be cleared" + ); let _ = std::fs::remove_dir_all(worktree_path); let _ = std::fs::remove_file(db_path); @@ -2720,7 +2787,10 @@ mod tests { let mut dashboard = Dashboard::new(dashboard_store, Config::default()); dashboard.delete_selected_session().await; - assert!(db.get_session("done-1")?.is_none(), "session should be deleted"); + assert!( + db.get_session("done-1")?.is_none(), + "session should be deleted" + ); let _ = std::fs::remove_file(db_path); Ok(()) @@ -2845,7 +2915,10 @@ mod tests { let mut dashboard = Dashboard::new(dashboard_store, Config::default()); dashboard.coordinate_backlog().await; - assert_eq!(dashboard.operator_note.as_deref(), Some("backlog already clear")); + assert_eq!( + dashboard.operator_note.as_deref(), + Some("backlog already clear") + ); let _ = std::fs::remove_file(db_path); Ok(()) @@ -2853,7 +2926,17 @@ mod tests { #[test] fn grid_layout_renders_four_panes() { - let mut dashboard = test_dashboard(vec![sample_session("grid-1", "claude", SessionState::Running, None, 1, 1)], 0); + let mut dashboard = test_dashboard( + vec![sample_session( + "grid-1", + "claude", + SessionState::Running, + None, + 1, + 1, + )], + 0, + ); dashboard.cfg.pane_layout = PaneLayout::Grid; dashboard.pane_size_percent = DEFAULT_GRID_SIZE_PERCENT;