feat: add ecc2 delegated assignment routing

This commit is contained in:
Affaan Mustafa
2026-04-07 12:31:02 -07:00
parent 3469773b32
commit 5bff920bf8
2 changed files with 402 additions and 1 deletions

View File

@@ -50,6 +50,20 @@ enum Commands {
#[arg(short, long, default_value_t = true)]
worktree: bool,
},
/// Route work to an existing delegate when possible, otherwise spawn a new one
Assign {
/// Lead session ID or alias
from_session: String,
/// Task description for the assignment
#[arg(short, long)]
task: String,
/// Agent type (claude, codex, custom)
#[arg(short, long, default_value = "claude")]
agent: String,
/// Create a dedicated worktree if a new delegate must be spawned
#[arg(short, long, default_value_t = true)]
worktree: bool,
},
/// List active sessions
Sessions,
/// Show session details
@@ -185,6 +199,33 @@ async fn main() -> Result<()> {
short_session(&source.id)
);
}
Some(Commands::Assign {
from_session,
task,
agent,
worktree: use_worktree,
}) => {
let lead_id = resolve_session_id(&db, &from_session)?;
let outcome = session::manager::assign_session(
&db,
&cfg,
&lead_id,
&task,
&agent,
use_worktree,
)
.await?;
println!(
"Assignment routed: {} -> {} ({})",
short_session(&lead_id),
short_session(&outcome.session_id),
match outcome.action {
session::manager::AssignmentAction::Spawned => "spawned",
session::manager::AssignmentAction::ReusedIdle => "reused-idle",
session::manager::AssignmentAction::ReusedActive => "reused-active",
}
);
}
Some(Commands::Sessions) => {
let sessions = session::manager::list_sessions(&db)?;
for s in sessions {
@@ -473,4 +514,32 @@ mod tests {
_ => panic!("expected team subcommand"),
}
}
#[test]
fn cli_parses_assign_command() {
let cli = Cli::try_parse_from([
"ecc",
"assign",
"lead",
"--task",
"Review auth changes",
"--agent",
"claude",
])
.expect("assign should parse");
match cli.command {
Some(Commands::Assign {
from_session,
task,
agent,
..
}) => {
assert_eq!(from_session, "lead");
assert_eq!(task, "Review auth changes");
assert_eq!(agent, "claude");
}
_ => panic!("expected assign subcommand"),
}
}
}

View File

@@ -63,6 +63,29 @@ pub fn get_team_status(db: &StateStore, id: &str, depth: usize) -> Result<TeamSt
})
}
pub async fn assign_session(
db: &StateStore,
cfg: &Config,
lead_id: &str,
task: &str,
agent_type: &str,
use_worktree: bool,
) -> Result<AssignmentOutcome> {
let repo_root =
std::env::current_dir().context("Failed to resolve current working directory")?;
assign_session_in_dir_with_runner_program(
db,
cfg,
lead_id,
task,
agent_type,
use_worktree,
&repo_root,
&std::env::current_exe().context("Failed to resolve ECC executable path")?,
)
.await
}
pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> {
stop_session_with_options(db, id, true).await
}
@@ -141,6 +164,84 @@ async fn resume_session_with_program(
Ok(session.id)
}
async fn assign_session_in_dir_with_runner_program(
db: &StateStore,
cfg: &Config,
lead_id: &str,
task: &str,
agent_type: &str,
use_worktree: bool,
repo_root: &Path,
runner_program: &Path,
) -> Result<AssignmentOutcome> {
let lead = resolve_session(db, lead_id)?;
let delegates = direct_delegate_sessions(db, &lead.id, agent_type)?;
if let Some(idle_delegate) = delegates
.iter()
.filter(|session| session.state == SessionState::Idle)
.min_by_key(|session| session.updated_at)
{
send_task_handoff(db, &lead, &idle_delegate.id, task, "reused idle delegate")?;
return Ok(AssignmentOutcome {
session_id: idle_delegate.id.clone(),
action: AssignmentAction::ReusedIdle,
});
}
if delegates.len() < cfg.max_parallel_sessions {
let session_id = queue_session_in_dir_with_runner_program(
db,
cfg,
task,
agent_type,
use_worktree,
repo_root,
runner_program,
)
.await?;
send_task_handoff(db, &lead, &session_id, task, "spawned new delegate")?;
return Ok(AssignmentOutcome {
session_id,
action: AssignmentAction::Spawned,
});
}
if let Some(active_delegate) = delegates
.iter()
.filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending))
.min_by_key(|session| session.updated_at)
{
send_task_handoff(
db,
&lead,
&active_delegate.id,
task,
"reused active delegate at capacity",
)?;
return Ok(AssignmentOutcome {
session_id: active_delegate.id.clone(),
action: AssignmentAction::ReusedActive,
});
}
let session_id = queue_session_in_dir_with_runner_program(
db,
cfg,
task,
agent_type,
use_worktree,
repo_root,
runner_program,
)
.await?;
send_task_handoff(db, &lead, &session_id, task, "spawned fallback delegate")?;
Ok(AssignmentOutcome {
session_id,
action: AssignmentAction::Spawned,
})
}
fn collect_delegation_descendants(
db: &StateStore,
session_id: &str,
@@ -277,6 +378,27 @@ async fn queue_session_in_dir(
agent_type: &str,
use_worktree: bool,
repo_root: &Path,
) -> Result<String> {
queue_session_in_dir_with_runner_program(
db,
cfg,
task,
agent_type,
use_worktree,
repo_root,
&std::env::current_exe().context("Failed to resolve ECC executable path")?,
)
.await
}
async fn queue_session_in_dir_with_runner_program(
db: &StateStore,
cfg: &Config,
task: &str,
agent_type: &str,
use_worktree: bool,
repo_root: &Path,
runner_program: &Path,
) -> Result<String> {
let session = build_session_record(task, agent_type, use_worktree, cfg, repo_root)?;
db.insert_session(&session)?;
@@ -287,7 +409,7 @@ async fn queue_session_in_dir(
.map(|worktree| worktree.path.as_path())
.unwrap_or(repo_root);
match spawn_session_runner(task, &session.id, agent_type, working_dir).await {
match spawn_session_runner_for_program(task, &session.id, agent_type, working_dir, runner_program).await {
Ok(()) => Ok(session.id),
Err(error) => {
db.update_state(&session.id, &SessionState::Failed)?;
@@ -388,6 +510,63 @@ async fn spawn_session_runner(
.await
}
fn direct_delegate_sessions(db: &StateStore, lead_id: &str, agent_type: &str) -> Result<Vec<Session>> {
let mut sessions = Vec::new();
for child_id in db.delegated_children(lead_id, 50)? {
let Some(session) = db.get_session(&child_id)? else {
continue;
};
if session.agent_type != agent_type {
continue;
}
if matches!(
session.state,
SessionState::Pending | SessionState::Running | SessionState::Idle
) {
sessions.push(session);
}
}
Ok(sessions)
}
fn send_task_handoff(
db: &StateStore,
from_session: &Session,
to_session_id: &str,
task: &str,
routing_reason: &str,
) -> Result<()> {
let context = format!(
"Assigned by {} [{}] | cwd {}{} | {}",
from_session.id,
from_session.agent_type,
from_session.working_dir.display(),
from_session
.worktree
.as_ref()
.map(|worktree| format!(
" | worktree {} ({})",
worktree.branch,
worktree.path.display()
))
.unwrap_or_default(),
routing_reason
);
crate::comms::send(
db,
&from_session.id,
to_session_id,
&crate::comms::MessageType::TaskHandoff {
task: task.to_string(),
context,
},
)
}
async fn spawn_session_runner_for_program(
task: &str,
session_id: &str,
@@ -533,6 +712,18 @@ pub struct TeamStatus {
descendants: Vec<DelegatedSessionSummary>,
}
pub struct AssignmentOutcome {
pub session_id: String,
pub action: AssignmentAction,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AssignmentAction {
Spawned,
ReusedIdle,
ReusedActive,
}
struct DelegatedSessionSummary {
depth: usize,
unread_messages: usize,
@@ -1093,4 +1284,145 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn assign_session_reuses_idle_delegate_when_available() -> Result<()> {
let tempdir = TestDir::new("manager-assign-reuse-idle")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
pid: Some(42),
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "idle-worker".to_string(),
task: "old worker task".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Idle,
pid: Some(99),
worktree: None,
created_at: now - Duration::minutes(1),
updated_at: now - Duration::minutes(1),
metrics: SessionMetrics::default(),
})?;
db.send_message(
"lead",
"idle-worker",
"{\"task\":\"old worker task\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)?;
let (fake_runner, _) = write_fake_claude(tempdir.path())?;
let outcome = assign_session_in_dir_with_runner_program(
&db,
&cfg,
"lead",
"Review billing edge cases",
"claude",
true,
&repo_root,
&fake_runner,
)
.await?;
assert_eq!(outcome.session_id, "idle-worker");
assert_eq!(outcome.action, AssignmentAction::ReusedIdle);
let messages = db.list_messages_for_session("idle-worker", 10)?;
assert!(messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("Review billing edge cases")
}));
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn assign_session_spawns_when_team_has_capacity() -> Result<()> {
let tempdir = TestDir::new("manager-assign-spawn")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
pid: Some(42),
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "busy-worker".to_string(),
task: "existing work".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
pid: Some(55),
worktree: None,
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.send_message(
"lead",
"busy-worker",
"{\"task\":\"existing work\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)?;
let (fake_runner, log_path) = write_fake_claude(tempdir.path())?;
let outcome = assign_session_in_dir_with_runner_program(
&db,
&cfg,
"lead",
"New delegated task",
"claude",
true,
&repo_root,
&fake_runner,
)
.await?;
assert_eq!(outcome.action, AssignmentAction::Spawned);
assert_ne!(outcome.session_id, "busy-worker");
let spawned = db
.get_session(&outcome.session_id)?
.context("spawned delegated session missing")?;
assert_eq!(spawned.state, SessionState::Pending);
let messages = db.list_messages_for_session(&outcome.session_id, 10)?;
assert!(messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("New delegated task")
}));
let log = wait_for_file(&log_path)?;
assert!(log.contains("run-session"));
assert!(log.contains("New delegated task"));
Ok(())
}
}