From 6dc557731958b824b080ceedae62069145fc5765 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 02:43:45 -0700 Subject: [PATCH] feat: add ecc2 global rebalance controls --- ecc2/src/main.rs | 69 +++++++++++++++++++++++++++++++++ ecc2/src/tui/app.rs | 1 + ecc2/src/tui/dashboard.rs | 81 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 150 insertions(+), 1 deletion(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 23e4a50b..51e396a1 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -90,6 +90,18 @@ enum Commands { #[arg(long, default_value_t = 10)] lead_limit: usize, }, + /// Rebalance unread handoffs across lead teams with backed-up delegates + RebalanceAll { + /// Agent type for routed delegates + #[arg(short, long, default_value = "claude")] + agent: String, + /// Create a dedicated worktree if new delegates must be spawned + #[arg(short, long, default_value_t = true)] + worktree: bool, + /// Maximum lead sessions to sweep in one pass + #[arg(long, default_value_t = 10)] + lead_limit: usize, + }, /// Rebalance unread handoffs off backed-up delegates onto clearer team capacity RebalanceTeam { /// Lead session ID or alias @@ -337,6 +349,38 @@ async fn main() -> Result<()> { } } } + Some(Commands::RebalanceAll { + agent, + worktree: use_worktree, + lead_limit, + }) => { + let outcomes = session::manager::rebalance_all_teams( + &db, + &cfg, + &agent, + use_worktree, + lead_limit, + ) + .await?; + if outcomes.is_empty() { + println!("No delegate backlog needed global rebalancing"); + } else { + let total_rerouted: usize = + outcomes.iter().map(|outcome| outcome.rerouted.len()).sum(); + println!( + "Rebalanced {} task handoff(s) across {} lead session(s)", + total_rerouted, + outcomes.len() + ); + for outcome in outcomes { + println!( + "- {} | rerouted {}", + short_session(&outcome.lead_session_id), + outcome.rerouted.len() + ); + } + } + } Some(Commands::RebalanceTeam { session_id, agent, @@ -747,6 +791,31 @@ mod tests { } } + #[test] + fn cli_parses_rebalance_all_command() { + let cli = Cli::try_parse_from([ + "ecc", + "rebalance-all", + "--agent", + "claude", + "--lead-limit", + "6", + ]) + .expect("rebalance-all should parse"); + + match cli.command { + Some(Commands::RebalanceAll { + agent, + lead_limit, + .. + }) => { + assert_eq!(agent, "claude"); + assert_eq!(lead_limit, 6); + } + _ => panic!("expected rebalance-all subcommand"), + } + } + #[test] fn cli_parses_rebalance_team_command() { let cli = Cli::try_parse_from([ diff --git a/ecc2/src/tui/app.rs b/ecc2/src/tui/app.rs index 971e382f..207b2964 100644 --- a/ecc2/src/tui/app.rs +++ b/ecc2/src/tui/app.rs @@ -41,6 +41,7 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> { (_, KeyCode::Char('n')) => dashboard.new_session().await, (_, KeyCode::Char('a')) => dashboard.assign_selected().await, (_, KeyCode::Char('b')) => dashboard.rebalance_selected_team().await, + (_, KeyCode::Char('B')) => dashboard.rebalance_all_teams().await, (_, KeyCode::Char('i')) => dashboard.drain_inbox_selected().await, (_, KeyCode::Char('g')) => dashboard.auto_dispatch_backlog().await, (_, KeyCode::Char('p')) => dashboard.toggle_auto_dispatch_policy(), diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index ae9bdf6e..70b86f1a 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -407,7 +407,7 @@ impl Dashboard { fn render_status_bar(&self, frame: &mut Frame, area: Rect) { let text = format!( - " [n]ew session [a]ssign re[b]alance dra[i]n inbox [g]lobal dispatch toggle [p]olicy [,/.] dispatch limit [s]top [u]resume [x]cleanup [d]elete [r]efresh [Tab] switch pane [j/k] scroll [+/-] resize [{}] layout [?] help [q]uit ", + " [n]ew session [a]ssign re[b]alance global re[B]alance dra[i]n inbox [g]lobal dispatch toggle [p]olicy [,/.] dispatch limit [s]top [u]resume [x]cleanup [d]elete [r]efresh [Tab] switch pane [j/k] scroll [+/-] resize [{}] layout [?] help [q]uit ", self.layout_label() ); let text = if let Some(note) = self.operator_note.as_ref() { @@ -454,6 +454,7 @@ impl Dashboard { " n New session", " a Assign follow-up work from selected session", " b Rebalance backed-up delegate inboxes for selected lead", + " B Rebalance backed-up delegate inboxes across lead teams", " i Drain unread task handoffs from selected session inbox", " g Auto-dispatch unread handoffs across lead sessions", " p Toggle daemon auto-dispatch policy and persist config", @@ -840,6 +841,52 @@ impl Dashboard { } } + pub async fn rebalance_all_teams(&mut self) { + let agent = self.cfg.default_agent.clone(); + let lead_limit = self.sessions.len().max(1); + + let outcomes = match manager::rebalance_all_teams( + &self.db, + &self.cfg, + &agent, + true, + lead_limit, + ) + .await + { + Ok(outcomes) => outcomes, + Err(error) => { + tracing::warn!("Failed to rebalance teams from dashboard: {error}"); + self.set_operator_note(format!("global rebalance failed: {error}")); + return; + } + }; + + let total_rerouted: usize = outcomes.iter().map(|outcome| outcome.rerouted.len()).sum(); + let selected_session_id = self + .sessions + .get(self.selected_session) + .map(|session| session.id.clone()); + + self.refresh(); + self.sync_selection_by_id(selected_session_id.as_deref()); + self.sync_selected_output(); + self.sync_selected_diff(); + self.sync_selected_messages(); + self.sync_selected_lineage(); + self.refresh_logs(); + + if total_rerouted == 0 { + self.set_operator_note("no delegate backlog needed global rebalancing".to_string()); + } else { + self.set_operator_note(format!( + "rebalanced {} handoff(s) across {} lead session(s)", + total_rerouted, + outcomes.len() + )); + } + } + pub async fn stop_selected(&mut self) { let Some(session) = self.sessions.get(self.selected_session) else { return; @@ -2365,6 +2412,38 @@ mod tests { Ok(()) } + #[tokio::test] + async fn rebalance_all_teams_sets_operator_note_when_clear() -> Result<()> { + let db_path = std::env::temp_dir().join(format!("ecc2-dashboard-{}.db", Uuid::new_v4())); + let db = StateStore::open(&db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "lead-1".to_string(), + task: "coordinate".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + })?; + + let dashboard_store = StateStore::open(&db_path)?; + let mut dashboard = Dashboard::new(dashboard_store, Config::default()); + dashboard.rebalance_all_teams().await; + + assert_eq!( + dashboard.operator_note.as_deref(), + Some("no delegate backlog needed global rebalancing") + ); + + let _ = std::fs::remove_file(db_path); + Ok(()) + } + #[test] fn grid_layout_renders_four_panes() { let mut dashboard = test_dashboard(vec![sample_session("grid-1", "claude", SessionState::Running, None, 1, 1)], 0);