feat: rebalance ecc2 delegate backlog

This commit is contained in:
Affaan Mustafa
2026-04-07 13:15:03 -07:00
parent f450a14ef7
commit 349d3a08cb
2 changed files with 284 additions and 0 deletions

View File

@@ -90,6 +90,20 @@ enum Commands {
#[arg(long, default_value_t = 10)]
lead_limit: usize,
},
/// Rebalance unread handoffs off backed-up delegates onto clearer team capacity
RebalanceTeam {
/// Lead session ID or alias
session_id: String,
/// Agent type for routed delegates
#[arg(short, long, default_value = "claude")]
agent: String,
/// Create a dedicated worktree if new delegates must be spawned
#[arg(short, long, default_value_t = true)]
worktree: bool,
/// Maximum handoffs to reroute in one pass
#[arg(long, default_value_t = 5)]
limit: usize,
},
/// List active sessions
Sessions,
/// Show session details
@@ -323,6 +337,46 @@ async fn main() -> Result<()> {
}
}
}
Some(Commands::RebalanceTeam {
session_id,
agent,
worktree: use_worktree,
limit,
}) => {
let lead_id = resolve_session_id(&db, &session_id)?;
let outcomes = session::manager::rebalance_team_backlog(
&db,
&cfg,
&lead_id,
&agent,
use_worktree,
limit,
)
.await?;
if outcomes.is_empty() {
println!("No delegate backlog needed rebalancing for {}", short_session(&lead_id));
} else {
println!(
"Rebalanced {} task handoff(s) for {}",
outcomes.len(),
short_session(&lead_id)
);
for outcome in outcomes {
println!(
"- {} | {} -> {} ({}) | {}",
outcome.message_id,
short_session(&outcome.from_session_id),
short_session(&outcome.session_id),
match outcome.action {
session::manager::AssignmentAction::Spawned => "spawned",
session::manager::AssignmentAction::ReusedIdle => "reused-idle",
session::manager::AssignmentAction::ReusedActive => "reused-active",
},
outcome.task
);
}
}
}
Some(Commands::Sessions) => {
let sessions = session::manager::list_sessions(&db)?;
for s in sessions {
@@ -692,4 +746,32 @@ mod tests {
_ => panic!("expected auto-dispatch subcommand"),
}
}
#[test]
fn cli_parses_rebalance_team_command() {
let cli = Cli::try_parse_from([
"ecc",
"rebalance-team",
"lead",
"--agent",
"claude",
"--limit",
"2",
])
.expect("rebalance-team should parse");
match cli.command {
Some(Commands::RebalanceTeam {
session_id,
agent,
limit,
..
}) => {
assert_eq!(session_id, "lead");
assert_eq!(agent, "claude");
assert_eq!(limit, 2);
}
_ => panic!("expected rebalance-team subcommand"),
}
}
}

View File

@@ -166,6 +166,113 @@ pub async fn auto_dispatch_backlog(
Ok(outcomes)
}
pub async fn rebalance_team_backlog(
db: &StateStore,
cfg: &Config,
lead_id: &str,
agent_type: &str,
use_worktree: bool,
limit: usize,
) -> Result<Vec<RebalanceOutcome>> {
let repo_root =
std::env::current_dir().context("Failed to resolve current working directory")?;
let runner_program = std::env::current_exe().context("Failed to resolve ECC executable path")?;
let lead = resolve_session(db, lead_id)?;
let mut outcomes = Vec::new();
if limit == 0 {
return Ok(outcomes);
}
let delegates = direct_delegate_sessions(db, &lead.id, agent_type)?;
let unread_counts = db.unread_message_counts()?;
let team_has_capacity = delegates.len() < cfg.max_parallel_sessions;
for delegate in &delegates {
if outcomes.len() >= limit {
break;
}
let unread_count = unread_counts.get(&delegate.id).copied().unwrap_or(0);
if unread_count <= 1 {
continue;
}
let has_clear_idle_elsewhere = delegates.iter().any(|candidate| {
candidate.id != delegate.id
&& candidate.state == SessionState::Idle
&& unread_counts.get(&candidate.id).copied().unwrap_or(0) == 0
});
if !has_clear_idle_elsewhere && !team_has_capacity {
continue;
}
let message_budget = limit.saturating_sub(outcomes.len());
let messages = db.unread_task_handoffs_for_session(&delegate.id, message_budget)?;
for message in messages {
if outcomes.len() >= limit {
break;
}
let current_delegates = direct_delegate_sessions(db, &lead.id, agent_type)?;
let current_unread_counts = db.unread_message_counts()?;
let current_team_has_capacity = current_delegates.len() < cfg.max_parallel_sessions;
let current_has_clear_idle_elsewhere = current_delegates.iter().any(|candidate| {
candidate.id != delegate.id
&& candidate.state == SessionState::Idle
&& current_unread_counts
.get(&candidate.id)
.copied()
.unwrap_or(0)
== 0
});
if !current_has_clear_idle_elsewhere && !current_team_has_capacity {
break;
}
if message.from_session != lead.id {
continue;
}
let task = match comms::parse(&message.content) {
Some(MessageType::TaskHandoff { task, .. }) => task,
_ => extract_legacy_handoff_task(&message.content)
.unwrap_or_else(|| message.content.clone()),
};
let outcome = assign_session_in_dir_with_runner_program(
db,
cfg,
&lead.id,
&task,
agent_type,
use_worktree,
&repo_root,
&runner_program,
)
.await?;
if outcome.session_id == delegate.id {
continue;
}
let _ = db.mark_message_read(message.id)?;
outcomes.push(RebalanceOutcome {
from_session_id: delegate.id.clone(),
message_id: message.id,
task,
session_id: outcome.session_id,
action: outcome.action,
});
}
}
Ok(outcomes)
}
pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> {
stop_session_with_options(db, id, true).await
}
@@ -850,6 +957,14 @@ pub struct LeadDispatchOutcome {
pub routed: Vec<InboxDrainOutcome>,
}
pub struct RebalanceOutcome {
pub from_session_id: String,
pub message_id: i64,
pub task: String,
pub session_id: String,
pub action: AssignmentAction,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AssignmentAction {
Spawned,
@@ -1739,4 +1854,91 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn rebalance_team_backlog_moves_work_off_backed_up_delegate() -> Result<()> {
let tempdir = TestDir::new("manager-rebalance-team")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let mut cfg = build_config(tempdir.path());
cfg.max_parallel_sessions = 2;
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
pid: Some(42),
worktree: None,
created_at: now - Duration::minutes(4),
updated_at: now - Duration::minutes(4),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "worker-a".to_string(),
task: "auth lane".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Idle,
pid: None,
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "worker-b".to_string(),
task: "billing lane".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Idle,
pid: None,
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.send_message(
"lead",
"worker-a",
"{\"task\":\"Review auth flow\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)?;
db.send_message(
"lead",
"worker-a",
"{\"task\":\"Check billing integration\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)?;
db.send_message(
"lead",
"worker-b",
"{\"task\":\"Existing clear lane\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)?;
let _ = db.mark_messages_read("worker-b")?;
let outcomes = rebalance_team_backlog(&db, &cfg, "lead", "claude", true, 5).await?;
assert_eq!(outcomes.len(), 1);
assert_eq!(outcomes[0].from_session_id, "worker-a");
assert_eq!(outcomes[0].session_id, "worker-b");
assert_eq!(outcomes[0].action, AssignmentAction::ReusedIdle);
let unread = db.unread_message_counts()?;
assert_eq!(unread.get("worker-a"), Some(&1));
assert_eq!(unread.get("worker-b"), Some(&1));
let worker_b_messages = db.list_messages_for_session("worker-b", 10)?;
assert!(worker_b_messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("Review auth flow")
}));
Ok(())
}
}