feat: add ecc2 global rebalance controls

This commit is contained in:
Affaan Mustafa
2026-04-08 02:43:45 -07:00
parent 2709694b7b
commit 6dc5577319
3 changed files with 150 additions and 1 deletions

View File

@@ -90,6 +90,18 @@ enum Commands {
#[arg(long, default_value_t = 10)] #[arg(long, default_value_t = 10)]
lead_limit: usize, lead_limit: usize,
}, },
/// Rebalance unread handoffs across lead teams with backed-up delegates
RebalanceAll {
/// Agent type for routed delegates
#[arg(short, long, default_value = "claude")]
agent: String,
/// Create a dedicated worktree if new delegates must be spawned
#[arg(short, long, default_value_t = true)]
worktree: bool,
/// Maximum lead sessions to sweep in one pass
#[arg(long, default_value_t = 10)]
lead_limit: usize,
},
/// Rebalance unread handoffs off backed-up delegates onto clearer team capacity /// Rebalance unread handoffs off backed-up delegates onto clearer team capacity
RebalanceTeam { RebalanceTeam {
/// Lead session ID or alias /// Lead session ID or alias
@@ -337,6 +349,38 @@ async fn main() -> Result<()> {
} }
} }
} }
Some(Commands::RebalanceAll {
agent,
worktree: use_worktree,
lead_limit,
}) => {
let outcomes = session::manager::rebalance_all_teams(
&db,
&cfg,
&agent,
use_worktree,
lead_limit,
)
.await?;
if outcomes.is_empty() {
println!("No delegate backlog needed global rebalancing");
} else {
let total_rerouted: usize =
outcomes.iter().map(|outcome| outcome.rerouted.len()).sum();
println!(
"Rebalanced {} task handoff(s) across {} lead session(s)",
total_rerouted,
outcomes.len()
);
for outcome in outcomes {
println!(
"- {} | rerouted {}",
short_session(&outcome.lead_session_id),
outcome.rerouted.len()
);
}
}
}
Some(Commands::RebalanceTeam { Some(Commands::RebalanceTeam {
session_id, session_id,
agent, agent,
@@ -747,6 +791,31 @@ mod tests {
} }
} }
#[test]
fn cli_parses_rebalance_all_command() {
let cli = Cli::try_parse_from([
"ecc",
"rebalance-all",
"--agent",
"claude",
"--lead-limit",
"6",
])
.expect("rebalance-all should parse");
match cli.command {
Some(Commands::RebalanceAll {
agent,
lead_limit,
..
}) => {
assert_eq!(agent, "claude");
assert_eq!(lead_limit, 6);
}
_ => panic!("expected rebalance-all subcommand"),
}
}
#[test] #[test]
fn cli_parses_rebalance_team_command() { fn cli_parses_rebalance_team_command() {
let cli = Cli::try_parse_from([ let cli = Cli::try_parse_from([

View File

@@ -41,6 +41,7 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> {
(_, KeyCode::Char('n')) => dashboard.new_session().await, (_, KeyCode::Char('n')) => dashboard.new_session().await,
(_, KeyCode::Char('a')) => dashboard.assign_selected().await, (_, KeyCode::Char('a')) => dashboard.assign_selected().await,
(_, KeyCode::Char('b')) => dashboard.rebalance_selected_team().await, (_, KeyCode::Char('b')) => dashboard.rebalance_selected_team().await,
(_, KeyCode::Char('B')) => dashboard.rebalance_all_teams().await,
(_, KeyCode::Char('i')) => dashboard.drain_inbox_selected().await, (_, KeyCode::Char('i')) => dashboard.drain_inbox_selected().await,
(_, KeyCode::Char('g')) => dashboard.auto_dispatch_backlog().await, (_, KeyCode::Char('g')) => dashboard.auto_dispatch_backlog().await,
(_, KeyCode::Char('p')) => dashboard.toggle_auto_dispatch_policy(), (_, KeyCode::Char('p')) => dashboard.toggle_auto_dispatch_policy(),

View File

@@ -407,7 +407,7 @@ impl Dashboard {
fn render_status_bar(&self, frame: &mut Frame, area: Rect) { fn render_status_bar(&self, frame: &mut Frame, area: Rect) {
let text = format!( let text = format!(
" [n]ew session [a]ssign re[b]alance dra[i]n inbox [g]lobal dispatch toggle [p]olicy [,/.] dispatch limit [s]top [u]resume [x]cleanup [d]elete [r]efresh [Tab] switch pane [j/k] scroll [+/-] resize [{}] layout [?] help [q]uit ", " [n]ew session [a]ssign re[b]alance global re[B]alance dra[i]n inbox [g]lobal dispatch toggle [p]olicy [,/.] dispatch limit [s]top [u]resume [x]cleanup [d]elete [r]efresh [Tab] switch pane [j/k] scroll [+/-] resize [{}] layout [?] help [q]uit ",
self.layout_label() self.layout_label()
); );
let text = if let Some(note) = self.operator_note.as_ref() { let text = if let Some(note) = self.operator_note.as_ref() {
@@ -454,6 +454,7 @@ impl Dashboard {
" n New session", " n New session",
" a Assign follow-up work from selected session", " a Assign follow-up work from selected session",
" b Rebalance backed-up delegate inboxes for selected lead", " b Rebalance backed-up delegate inboxes for selected lead",
" B Rebalance backed-up delegate inboxes across lead teams",
" i Drain unread task handoffs from selected session inbox", " i Drain unread task handoffs from selected session inbox",
" g Auto-dispatch unread handoffs across lead sessions", " g Auto-dispatch unread handoffs across lead sessions",
" p Toggle daemon auto-dispatch policy and persist config", " p Toggle daemon auto-dispatch policy and persist config",
@@ -840,6 +841,52 @@ impl Dashboard {
} }
} }
pub async fn rebalance_all_teams(&mut self) {
let agent = self.cfg.default_agent.clone();
let lead_limit = self.sessions.len().max(1);
let outcomes = match manager::rebalance_all_teams(
&self.db,
&self.cfg,
&agent,
true,
lead_limit,
)
.await
{
Ok(outcomes) => outcomes,
Err(error) => {
tracing::warn!("Failed to rebalance teams from dashboard: {error}");
self.set_operator_note(format!("global rebalance failed: {error}"));
return;
}
};
let total_rerouted: usize = outcomes.iter().map(|outcome| outcome.rerouted.len()).sum();
let selected_session_id = self
.sessions
.get(self.selected_session)
.map(|session| session.id.clone());
self.refresh();
self.sync_selection_by_id(selected_session_id.as_deref());
self.sync_selected_output();
self.sync_selected_diff();
self.sync_selected_messages();
self.sync_selected_lineage();
self.refresh_logs();
if total_rerouted == 0 {
self.set_operator_note("no delegate backlog needed global rebalancing".to_string());
} else {
self.set_operator_note(format!(
"rebalanced {} handoff(s) across {} lead session(s)",
total_rerouted,
outcomes.len()
));
}
}
pub async fn stop_selected(&mut self) { pub async fn stop_selected(&mut self) {
let Some(session) = self.sessions.get(self.selected_session) else { let Some(session) = self.sessions.get(self.selected_session) else {
return; return;
@@ -2365,6 +2412,38 @@ mod tests {
Ok(()) Ok(())
} }
#[tokio::test]
async fn rebalance_all_teams_sets_operator_note_when_clear() -> Result<()> {
let db_path = std::env::temp_dir().join(format!("ecc2-dashboard-{}.db", Uuid::new_v4()));
let db = StateStore::open(&db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "lead-1".to_string(),
task: "coordinate".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
metrics: SessionMetrics::default(),
})?;
let dashboard_store = StateStore::open(&db_path)?;
let mut dashboard = Dashboard::new(dashboard_store, Config::default());
dashboard.rebalance_all_teams().await;
assert_eq!(
dashboard.operator_note.as_deref(),
Some("no delegate backlog needed global rebalancing")
);
let _ = std::fs::remove_file(db_path);
Ok(())
}
#[test] #[test]
fn grid_layout_renders_four_panes() { fn grid_layout_renders_four_panes() {
let mut dashboard = test_dashboard(vec![sample_session("grid-1", "claude", SessionState::Running, None, 1, 1)], 0); let mut dashboard = test_dashboard(vec![sample_session("grid-1", "claude", SessionState::Running, None, 1, 1)], 0);