feat: default ecc2 worktrees through policy

This commit is contained in:
Affaan Mustafa
2026-04-08 15:58:31 -07:00
parent 86cbe3d616
commit b3f781a648
8 changed files with 576 additions and 326 deletions

View File

@@ -101,7 +101,8 @@ pub async fn drain_inbox(
) -> Result<Vec<InboxDrainOutcome>> {
let repo_root =
std::env::current_dir().context("Failed to resolve current working directory")?;
let runner_program = std::env::current_exe().context("Failed to resolve ECC executable path")?;
let runner_program =
std::env::current_exe().context("Failed to resolve ECC executable path")?;
let lead = resolve_session(db, lead_id)?;
let messages = db.unread_task_handoffs_for_session(&lead.id, limit)?;
let mut outcomes = Vec::new();
@@ -184,7 +185,12 @@ pub async fn rebalance_all_teams(
for session in sessions
.into_iter()
.filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending | SessionState::Idle))
.filter(|session| {
matches!(
session.state,
SessionState::Running | SessionState::Pending | SessionState::Idle
)
})
.take(lead_limit)
{
let rerouted = rebalance_team_backlog(
@@ -245,7 +251,8 @@ pub async fn rebalance_team_backlog(
) -> Result<Vec<RebalanceOutcome>> {
let repo_root =
std::env::current_dir().context("Failed to resolve current working directory")?;
let runner_program = std::env::current_exe().context("Failed to resolve ECC executable path")?;
let runner_program =
std::env::current_exe().context("Failed to resolve ECC executable path")?;
let lead = resolve_session(db, lead_id)?;
let mut outcomes = Vec::new();
@@ -888,7 +895,15 @@ async fn queue_session_in_dir_with_runner_program(
.map(|worktree| worktree.path.as_path())
.unwrap_or(repo_root);
match spawn_session_runner_for_program(task, &session.id, agent_type, working_dir, runner_program).await {
match spawn_session_runner_for_program(
task,
&session.id,
agent_type,
working_dir,
runner_program,
)
.await
{
Ok(()) => Ok(session.id),
Err(error) => {
db.update_state(&session.id, &SessionState::Failed)?;
@@ -989,7 +1004,11 @@ async fn spawn_session_runner(
.await
}
fn direct_delegate_sessions(db: &StateStore, lead_id: &str, agent_type: &str) -> Result<Vec<Session>> {
fn direct_delegate_sessions(
db: &StateStore,
lead_id: &str,
agent_type: &str,
) -> Result<Vec<Session>> {
let mut sessions = Vec::new();
for child_id in db.delegated_children(lead_id, 50)? {
let Some(session) = db.get_session(&child_id)? else {
@@ -1101,12 +1120,7 @@ async fn spawn_session_runner_for_program(
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.with_context(|| {
format!(
"Failed to spawn ECC runner from {}",
current_exe.display()
)
})?;
.with_context(|| format!("Failed to spawn ECC runner from {}", current_exe.display()))?;
child
.id()
@@ -1114,7 +1128,12 @@ async fn spawn_session_runner_for_program(
Ok(())
}
fn build_agent_command(agent_program: &Path, task: &str, session_id: &str, working_dir: &Path) -> Command {
fn build_agent_command(
agent_program: &Path,
task: &str,
session_id: &str,
working_dir: &Path,
) -> Command {
let mut command = Command::new(agent_program);
command
.arg("--print")
@@ -1414,7 +1433,11 @@ impl fmt::Display for TeamStatus {
writeln!(f, "Branch: {}", worktree.branch)?;
}
let lead_handoff_backlog = self.handoff_backlog.get(&self.root.id).copied().unwrap_or(0);
let lead_handoff_backlog = self
.handoff_backlog
.get(&self.root.id)
.copied()
.unwrap_or(0);
writeln!(f, "Backlog: {}", lead_handoff_backlog)?;
if self.descendants.is_empty() {
@@ -1424,7 +1447,8 @@ impl fmt::Display for TeamStatus {
writeln!(f, "Board:")?;
let mut lanes: BTreeMap<&'static str, Vec<&DelegatedSessionSummary>> = BTreeMap::new();
for summary in &self.descendants {
lanes.entry(session_state_label(&summary.session.state))
lanes
.entry(session_state_label(&summary.session.state))
.or_default()
.push(summary);
}
@@ -1502,18 +1526,11 @@ impl fmt::Display for CoordinationStatus {
}
if self.operator_escalation_required {
writeln!(
f,
"Operator escalation: chronic saturation is not clearing"
)?;
writeln!(f, "Operator escalation: chronic saturation is not clearing")?;
}
if let Some(cleared_at) = self.daemon_activity.chronic_saturation_cleared_at() {
writeln!(
f,
"Chronic saturation cleared: {}",
cleared_at.to_rfc3339()
)?;
writeln!(f, "Chronic saturation cleared: {}", cleared_at.to_rfc3339())?;
}
if let Some(stabilized_at) = stabilized {
@@ -1631,6 +1648,7 @@ mod tests {
default_agent: "claude".to_string(),
auto_dispatch_unread_handoffs: false,
auto_dispatch_limit_per_session: 5,
auto_create_worktrees: true,
auto_merge_ready_worktrees: false,
cost_budget_usd: 10.0,
token_budget: 500_000,
@@ -1685,14 +1703,7 @@ mod tests {
run_git(path, ["config", "user.email", "ecc-tests@example.com"])?;
fs::write(path.join("README.md"), "hello\n")?;
run_git(path, ["add", "README.md"])?;
run_git(
path,
[
"commit",
"-qm",
"init",
],
)?;
run_git(path, ["commit", "-qm", "init"])?;
Ok(())
}
@@ -1885,7 +1896,13 @@ mod tests {
assert!(log.contains("--session-id"));
assert!(log.contains("deadbeef"));
assert!(log.contains("resume previous task"));
assert!(log.contains(tempdir.path().join("resume-working-dir").to_string_lossy().as_ref()));
assert!(log.contains(
tempdir
.path()
.join("resume-working-dir")
.to_string_lossy()
.as_ref()
));
Ok(())
}
@@ -1920,14 +1937,20 @@ mod tests {
.clone()
.context("stopped session worktree missing")?
.path;
assert!(worktree_path.exists(), "worktree should still exist before cleanup");
assert!(
worktree_path.exists(),
"worktree should still exist before cleanup"
);
cleanup_session_worktree(&db, &session_id).await?;
let cleaned = db
.get_session(&session_id)?
.context("cleaned session should still exist")?;
assert!(cleaned.worktree.is_none(), "worktree metadata should be cleared");
assert!(
cleaned.worktree.is_none(),
"worktree metadata should be cleared"
);
assert!(!worktree_path.exists(), "worktree path should be removed");
Ok(())
@@ -2051,12 +2074,18 @@ mod tests {
assert_eq!(outcome.base_branch, worktree.base_branch);
assert!(outcome.cleaned_worktree);
assert!(!outcome.already_up_to_date);
assert_eq!(fs::read_to_string(repo_root.join("feature.txt"))?, "ready to merge\n");
assert_eq!(
fs::read_to_string(repo_root.join("feature.txt"))?,
"ready to merge\n"
);
let merged = db
.get_session(&outcome.session_id)?
.context("merged session should still exist")?;
assert!(merged.worktree.is_none(), "worktree metadata should be cleared");
assert!(
merged.worktree.is_none(),
"worktree metadata should be cleared"
);
assert!(!worktree.path.exists(), "worktree path should be removed");
let branch_output = StdCommand::new("git")
@@ -2065,7 +2094,9 @@ mod tests {
.args(["branch", "--list", &worktree.branch])
.output()?;
assert!(
String::from_utf8_lossy(&branch_output.stdout).trim().is_empty(),
String::from_utf8_lossy(&branch_output.stdout)
.trim()
.is_empty(),
"merged worktree branch should be deleted"
);
@@ -2136,8 +2167,14 @@ mod tests {
assert_eq!(outcome.merged.len(), 1);
assert_eq!(outcome.merged[0].session_id, "merge-ready");
assert_eq!(outcome.active_with_worktree_ids, vec!["active-worktree".to_string()]);
assert_eq!(outcome.dirty_worktree_ids, vec!["dirty-worktree".to_string()]);
assert_eq!(
outcome.active_with_worktree_ids,
vec!["active-worktree".to_string()]
);
assert_eq!(
outcome.dirty_worktree_ids,
vec!["dirty-worktree".to_string()]
);
assert!(outcome.conflicted_session_ids.is_empty());
assert!(outcome.failures.is_empty());
@@ -2145,24 +2182,21 @@ mod tests {
fs::read_to_string(repo_root.join("merged.txt"))?,
"bulk merge\n"
);
assert!(
db.get_session("merge-ready")?
.context("merged session should still exist")?
.worktree
.is_none()
);
assert!(
db.get_session("active-worktree")?
.context("active session should still exist")?
.worktree
.is_some()
);
assert!(
db.get_session("dirty-worktree")?
.context("dirty session should still exist")?
.worktree
.is_some()
);
assert!(db
.get_session("merge-ready")?
.context("merged session should still exist")?
.worktree
.is_none());
assert!(db
.get_session("active-worktree")?
.context("active session should still exist")?
.worktree
.is_some());
assert!(db
.get_session("dirty-worktree")?
.context("dirty session should still exist")?
.worktree
.is_some());
assert!(!merged_worktree.path.exists());
assert!(active_worktree.path.exists());
assert!(dirty_worktree.path.exists());
@@ -2203,7 +2237,10 @@ mod tests {
delete_session(&db, &session_id).await?;
assert!(db.get_session(&session_id)?.is_none(), "session should be deleted");
assert!(
db.get_session(&session_id)?.is_none(),
"session should be deleted"
);
assert!(!worktree_path.exists(), "worktree path should be removed");
Ok(())
@@ -2233,8 +2270,16 @@ mod tests {
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&build_session("parent", SessionState::Running, now - Duration::minutes(2)))?;
db.insert_session(&build_session("child", SessionState::Pending, now - Duration::minutes(1)))?;
db.insert_session(&build_session(
"parent",
SessionState::Running,
now - Duration::minutes(2),
))?;
db.insert_session(&build_session(
"child",
SessionState::Pending,
now - Duration::minutes(1),
))?;
db.insert_session(&build_session("sibling", SessionState::Idle, now))?;
db.send_message(
@@ -2270,9 +2315,21 @@ mod tests {
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let now = Utc::now();
db.insert_session(&build_session("lead", SessionState::Running, now - Duration::minutes(3)))?;
db.insert_session(&build_session("worker-a", SessionState::Running, now - Duration::minutes(2)))?;
db.insert_session(&build_session("worker-b", SessionState::Pending, now - Duration::minutes(1)))?;
db.insert_session(&build_session(
"lead",
SessionState::Running,
now - Duration::minutes(3),
))?;
db.insert_session(&build_session(
"worker-a",
SessionState::Running,
now - Duration::minutes(2),
))?;
db.insert_session(&build_session(
"worker-b",
SessionState::Pending,
now - Duration::minutes(1),
))?;
db.insert_session(&build_session("reviewer", SessionState::Completed, now))?;
db.send_message(
@@ -2444,15 +2501,15 @@ mod tests {
let spawned_messages = db.list_messages_for_session(&outcome.session_id, 10)?;
assert!(spawned_messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("Fresh delegated task")
message.msg_type == "task_handoff" && message.content.contains("Fresh delegated task")
}));
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn assign_session_reuses_idle_delegate_when_only_non_handoff_messages_are_unread() -> Result<()> {
async fn assign_session_reuses_idle_delegate_when_only_non_handoff_messages_are_unread(
) -> Result<()> {
let tempdir = TestDir::new("manager-assign-reuse-idle-info-inbox")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
@@ -2512,8 +2569,7 @@ mod tests {
let idle_messages = db.list_messages_for_session("idle-worker", 10)?;
assert!(idle_messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("Fresh delegated task")
message.msg_type == "task_handoff" && message.content.contains("Fresh delegated task")
}));
Ok(())
@@ -2583,8 +2639,7 @@ mod tests {
let messages = db.list_messages_for_session(&outcome.session_id, 10)?;
assert!(messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("New delegated task")
message.msg_type == "task_handoff" && message.content.contains("New delegated task")
}));
Ok(())
@@ -2650,8 +2705,7 @@ mod tests {
let busy_messages = db.list_messages_for_session("busy-worker", 10)?;
assert!(!busy_messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("New delegated task")
message.msg_type == "task_handoff" && message.content.contains("New delegated task")
}));
Ok(())
@@ -2697,8 +2751,7 @@ mod tests {
let messages = db.list_messages_for_session(&outcomes[0].session_id, 10)?;
assert!(messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("Review auth changes")
message.msg_type == "task_handoff" && message.content.contains("Review auth changes")
}));
Ok(())
@@ -2764,8 +2817,7 @@ mod tests {
let messages = db.list_messages_for_session("busy-worker", 10)?;
assert!(!messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("Review auth changes")
message.msg_type == "task_handoff" && message.content.contains("Review auth changes")
}));
Ok(())
@@ -3030,8 +3082,7 @@ mod tests {
let worker_b_messages = db.list_messages_for_session("worker-b", 10)?;
assert!(worker_b_messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("Review auth flow")
message.msg_type == "task_handoff" && message.content.contains("Review auth flow")
}));
Ok(())
@@ -3108,22 +3159,18 @@ mod tests {
};
let rendered = status.to_string();
assert!(
rendered.contains(
"Global handoff backlog: 2 lead(s) / 5 handoff(s) [1 absorbable, 1 saturated]"
)
);
assert!(rendered.contains(
"Global handoff backlog: 2 lead(s) / 5 handoff(s) [1 absorbable, 1 saturated]"
));
assert!(rendered.contains("Auto-dispatch: on @ 4/lead"));
assert!(rendered.contains("Coordination mode: rebalance-first (chronic saturation)"));
assert!(rendered.contains("Chronic saturation streak: 2 cycle(s)"));
assert!(rendered.contains("Last daemon dispatch: 3 routed / 1 deferred across 2 lead(s)"));
assert!(rendered.contains("Last daemon recovery dispatch: 2 handoff(s) across 1 lead(s)"));
assert!(rendered.contains("Last daemon rebalance: 0 handoff(s) across 1 lead(s)"));
assert!(
rendered.contains(
"Last daemon auto-merge: 1 merged / 1 active / 0 conflicted / 0 dirty / 0 failed"
)
);
assert!(rendered.contains(
"Last daemon auto-merge: 1 merged / 1 active / 0 conflicted / 0 dirty / 0 failed"
));
}
#[test]
@@ -3174,7 +3221,10 @@ mod tests {
assert_eq!(status.backlog_messages, 3);
assert_eq!(status.absorbable_sessions, 2);
assert_eq!(status.saturated_sessions, 1);
assert_eq!(status.mode, CoordinationMode::RebalanceFirstChronicSaturation);
assert_eq!(
status.mode,
CoordinationMode::RebalanceFirstChronicSaturation
);
assert_eq!(status.health, CoordinationHealth::Saturated);
assert!(!status.operator_escalation_required);
assert_eq!(status.daemon_activity.last_dispatch_routed, 1);