From 5bff920bf8bc2b06534e7e3618010d5572a8bd79 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Tue, 7 Apr 2026 12:31:02 -0700 Subject: [PATCH] feat: add ecc2 delegated assignment routing --- ecc2/src/main.rs | 69 ++++++++ ecc2/src/session/manager.rs | 334 +++++++++++++++++++++++++++++++++++- 2 files changed, 402 insertions(+), 1 deletion(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 6af0f2f6..be6094b0 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -50,6 +50,20 @@ enum Commands { #[arg(short, long, default_value_t = true)] worktree: bool, }, + /// Route work to an existing delegate when possible, otherwise spawn a new one + Assign { + /// Lead session ID or alias + from_session: String, + /// Task description for the assignment + #[arg(short, long)] + task: String, + /// Agent type (claude, codex, custom) + #[arg(short, long, default_value = "claude")] + agent: String, + /// Create a dedicated worktree if a new delegate must be spawned + #[arg(short, long, default_value_t = true)] + worktree: bool, + }, /// List active sessions Sessions, /// Show session details @@ -185,6 +199,33 @@ async fn main() -> Result<()> { short_session(&source.id) ); } + Some(Commands::Assign { + from_session, + task, + agent, + worktree: use_worktree, + }) => { + let lead_id = resolve_session_id(&db, &from_session)?; + let outcome = session::manager::assign_session( + &db, + &cfg, + &lead_id, + &task, + &agent, + use_worktree, + ) + .await?; + println!( + "Assignment routed: {} -> {} ({})", + short_session(&lead_id), + short_session(&outcome.session_id), + match outcome.action { + session::manager::AssignmentAction::Spawned => "spawned", + session::manager::AssignmentAction::ReusedIdle => "reused-idle", + session::manager::AssignmentAction::ReusedActive => "reused-active", + } + ); + } Some(Commands::Sessions) => { let sessions = session::manager::list_sessions(&db)?; for s in sessions { @@ -473,4 +514,32 @@ mod tests { _ => panic!("expected team subcommand"), } } + + #[test] + fn cli_parses_assign_command() { + let cli = Cli::try_parse_from([ + "ecc", + "assign", + "lead", + "--task", + "Review auth changes", + "--agent", + "claude", + ]) + .expect("assign should parse"); + + match cli.command { + Some(Commands::Assign { + from_session, + task, + agent, + .. + }) => { + assert_eq!(from_session, "lead"); + assert_eq!(task, "Review auth changes"); + assert_eq!(agent, "claude"); + } + _ => panic!("expected assign subcommand"), + } + } } diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 5f36cbd4..e1a6bf64 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -63,6 +63,29 @@ pub fn get_team_status(db: &StateStore, id: &str, depth: usize) -> Result Result { + let repo_root = + std::env::current_dir().context("Failed to resolve current working directory")?; + assign_session_in_dir_with_runner_program( + db, + cfg, + lead_id, + task, + agent_type, + use_worktree, + &repo_root, + &std::env::current_exe().context("Failed to resolve ECC executable path")?, + ) + .await +} + pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> { stop_session_with_options(db, id, true).await } @@ -141,6 +164,84 @@ async fn resume_session_with_program( Ok(session.id) } +async fn assign_session_in_dir_with_runner_program( + db: &StateStore, + cfg: &Config, + lead_id: &str, + task: &str, + agent_type: &str, + use_worktree: bool, + repo_root: &Path, + runner_program: &Path, +) -> Result { + let lead = resolve_session(db, lead_id)?; + let delegates = direct_delegate_sessions(db, &lead.id, agent_type)?; + + if let Some(idle_delegate) = delegates + .iter() + .filter(|session| session.state == SessionState::Idle) + .min_by_key(|session| session.updated_at) + { + send_task_handoff(db, &lead, &idle_delegate.id, task, "reused idle delegate")?; + return Ok(AssignmentOutcome { + session_id: idle_delegate.id.clone(), + action: AssignmentAction::ReusedIdle, + }); + } + + if delegates.len() < cfg.max_parallel_sessions { + let session_id = queue_session_in_dir_with_runner_program( + db, + cfg, + task, + agent_type, + use_worktree, + repo_root, + runner_program, + ) + .await?; + send_task_handoff(db, &lead, &session_id, task, "spawned new delegate")?; + return Ok(AssignmentOutcome { + session_id, + action: AssignmentAction::Spawned, + }); + } + + if let Some(active_delegate) = delegates + .iter() + .filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending)) + .min_by_key(|session| session.updated_at) + { + send_task_handoff( + db, + &lead, + &active_delegate.id, + task, + "reused active delegate at capacity", + )?; + return Ok(AssignmentOutcome { + session_id: active_delegate.id.clone(), + action: AssignmentAction::ReusedActive, + }); + } + + let session_id = queue_session_in_dir_with_runner_program( + db, + cfg, + task, + agent_type, + use_worktree, + repo_root, + runner_program, + ) + .await?; + send_task_handoff(db, &lead, &session_id, task, "spawned fallback delegate")?; + Ok(AssignmentOutcome { + session_id, + action: AssignmentAction::Spawned, + }) +} + fn collect_delegation_descendants( db: &StateStore, session_id: &str, @@ -277,6 +378,27 @@ async fn queue_session_in_dir( agent_type: &str, use_worktree: bool, repo_root: &Path, +) -> Result { + queue_session_in_dir_with_runner_program( + db, + cfg, + task, + agent_type, + use_worktree, + repo_root, + &std::env::current_exe().context("Failed to resolve ECC executable path")?, + ) + .await +} + +async fn queue_session_in_dir_with_runner_program( + db: &StateStore, + cfg: &Config, + task: &str, + agent_type: &str, + use_worktree: bool, + repo_root: &Path, + runner_program: &Path, ) -> Result { let session = build_session_record(task, agent_type, use_worktree, cfg, repo_root)?; db.insert_session(&session)?; @@ -287,7 +409,7 @@ async fn queue_session_in_dir( .map(|worktree| worktree.path.as_path()) .unwrap_or(repo_root); - match spawn_session_runner(task, &session.id, agent_type, working_dir).await { + match spawn_session_runner_for_program(task, &session.id, agent_type, working_dir, runner_program).await { Ok(()) => Ok(session.id), Err(error) => { db.update_state(&session.id, &SessionState::Failed)?; @@ -388,6 +510,63 @@ async fn spawn_session_runner( .await } +fn direct_delegate_sessions(db: &StateStore, lead_id: &str, agent_type: &str) -> Result> { + let mut sessions = Vec::new(); + for child_id in db.delegated_children(lead_id, 50)? { + let Some(session) = db.get_session(&child_id)? else { + continue; + }; + + if session.agent_type != agent_type { + continue; + } + + if matches!( + session.state, + SessionState::Pending | SessionState::Running | SessionState::Idle + ) { + sessions.push(session); + } + } + + Ok(sessions) +} + +fn send_task_handoff( + db: &StateStore, + from_session: &Session, + to_session_id: &str, + task: &str, + routing_reason: &str, +) -> Result<()> { + let context = format!( + "Assigned by {} [{}] | cwd {}{} | {}", + from_session.id, + from_session.agent_type, + from_session.working_dir.display(), + from_session + .worktree + .as_ref() + .map(|worktree| format!( + " | worktree {} ({})", + worktree.branch, + worktree.path.display() + )) + .unwrap_or_default(), + routing_reason + ); + + crate::comms::send( + db, + &from_session.id, + to_session_id, + &crate::comms::MessageType::TaskHandoff { + task: task.to_string(), + context, + }, + ) +} + async fn spawn_session_runner_for_program( task: &str, session_id: &str, @@ -533,6 +712,18 @@ pub struct TeamStatus { descendants: Vec, } +pub struct AssignmentOutcome { + pub session_id: String, + pub action: AssignmentAction, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AssignmentAction { + Spawned, + ReusedIdle, + ReusedActive, +} + struct DelegatedSessionSummary { depth: usize, unread_messages: usize, @@ -1093,4 +1284,145 @@ mod tests { Ok(()) } + + #[tokio::test(flavor = "current_thread")] + async fn assign_session_reuses_idle_delegate_when_available() -> Result<()> { + let tempdir = TestDir::new("manager-assign-reuse-idle")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "lead".to_string(), + task: "lead task".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(42), + worktree: None, + created_at: now - Duration::minutes(2), + updated_at: now - Duration::minutes(2), + metrics: SessionMetrics::default(), + })?; + db.insert_session(&Session { + id: "idle-worker".to_string(), + task: "old worker task".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Idle, + pid: Some(99), + worktree: None, + created_at: now - Duration::minutes(1), + updated_at: now - Duration::minutes(1), + metrics: SessionMetrics::default(), + })?; + db.send_message( + "lead", + "idle-worker", + "{\"task\":\"old worker task\",\"context\":\"Delegated from lead\"}", + "task_handoff", + )?; + + let (fake_runner, _) = write_fake_claude(tempdir.path())?; + let outcome = assign_session_in_dir_with_runner_program( + &db, + &cfg, + "lead", + "Review billing edge cases", + "claude", + true, + &repo_root, + &fake_runner, + ) + .await?; + + assert_eq!(outcome.session_id, "idle-worker"); + assert_eq!(outcome.action, AssignmentAction::ReusedIdle); + + let messages = db.list_messages_for_session("idle-worker", 10)?; + assert!(messages.iter().any(|message| { + message.msg_type == "task_handoff" + && message.content.contains("Review billing edge cases") + })); + + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn assign_session_spawns_when_team_has_capacity() -> Result<()> { + let tempdir = TestDir::new("manager-assign-spawn")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "lead".to_string(), + task: "lead task".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(42), + worktree: None, + created_at: now - Duration::minutes(3), + updated_at: now - Duration::minutes(3), + metrics: SessionMetrics::default(), + })?; + db.insert_session(&Session { + id: "busy-worker".to_string(), + task: "existing work".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(55), + worktree: None, + created_at: now - Duration::minutes(2), + updated_at: now - Duration::minutes(2), + metrics: SessionMetrics::default(), + })?; + db.send_message( + "lead", + "busy-worker", + "{\"task\":\"existing work\",\"context\":\"Delegated from lead\"}", + "task_handoff", + )?; + + let (fake_runner, log_path) = write_fake_claude(tempdir.path())?; + let outcome = assign_session_in_dir_with_runner_program( + &db, + &cfg, + "lead", + "New delegated task", + "claude", + true, + &repo_root, + &fake_runner, + ) + .await?; + + assert_eq!(outcome.action, AssignmentAction::Spawned); + assert_ne!(outcome.session_id, "busy-worker"); + + let spawned = db + .get_session(&outcome.session_id)? + .context("spawned delegated session missing")?; + assert_eq!(spawned.state, SessionState::Pending); + + let messages = db.list_messages_for_session(&outcome.session_id, 10)?; + assert!(messages.iter().any(|message| { + message.msg_type == "task_handoff" + && message.content.contains("New delegated task") + })); + + let log = wait_for_file(&log_path)?; + assert!(log.contains("run-session")); + assert!(log.contains("New delegated task")); + + Ok(()) + } }