From 2d5d0e5c1d5aef026179eccb7ab564794c81fd0d Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Tue, 7 Apr 2026 12:57:12 -0700 Subject: [PATCH] feat: add ecc2 auto-dispatch backlog sweep --- ecc2/src/config/mod.rs | 12 +++++ ecc2/src/main.rs | 69 +++++++++++++++++++++++++ ecc2/src/session/manager.rs | 100 ++++++++++++++++++++++++++++++++++++ ecc2/src/session/store.rs | 26 ++++++++++ 4 files changed, 207 insertions(+) diff --git a/ecc2/src/config/mod.rs b/ecc2/src/config/mod.rs index ec510fd9..1a0b6cea 100644 --- a/ecc2/src/config/mod.rs +++ b/ecc2/src/config/mod.rs @@ -29,6 +29,8 @@ pub struct Config { pub session_timeout_secs: u64, pub heartbeat_interval_secs: u64, pub default_agent: String, + pub auto_dispatch_unread_handoffs: bool, + pub auto_dispatch_limit_per_session: usize, pub cost_budget_usd: f64, pub token_budget: u64, pub theme: Theme, @@ -53,6 +55,8 @@ impl Default for Config { session_timeout_secs: 3600, heartbeat_interval_secs: 30, default_agent: "claude".to_string(), + auto_dispatch_unread_handoffs: false, + auto_dispatch_limit_per_session: 5, cost_budget_usd: 10.0, token_budget: 500_000, theme: Theme::Dark, @@ -123,6 +127,14 @@ theme = "Dark" assert_eq!(config.token_budget, defaults.token_budget); assert_eq!(config.pane_layout, defaults.pane_layout); assert_eq!(config.risk_thresholds, defaults.risk_thresholds); + assert_eq!( + config.auto_dispatch_unread_handoffs, + defaults.auto_dispatch_unread_handoffs + ); + assert_eq!( + config.auto_dispatch_limit_per_session, + defaults.auto_dispatch_limit_per_session + ); } #[test] diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 9be6cc8d..f645c7d5 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -78,6 +78,18 @@ enum Commands { #[arg(long, default_value_t = 5)] limit: usize, }, + /// Sweep unread task handoffs across lead sessions and route them through the assignment policy + AutoDispatch { + /// Agent type for routed delegates + #[arg(short, long, default_value = "claude")] + agent: String, + /// Create a dedicated worktree if new delegates must be spawned + #[arg(short, long, default_value_t = true)] + worktree: bool, + /// Maximum lead sessions to sweep in one pass + #[arg(long, default_value_t = 10)] + lead_limit: usize, + }, /// List active sessions Sessions, /// Show session details @@ -279,6 +291,38 @@ async fn main() -> Result<()> { } } } + Some(Commands::AutoDispatch { + agent, + worktree: use_worktree, + lead_limit, + }) => { + let outcomes = session::manager::auto_dispatch_backlog( + &db, + &cfg, + &agent, + use_worktree, + lead_limit, + ) + .await?; + if outcomes.is_empty() { + println!("No unread task handoff backlog found"); + } else { + let total_routed: usize = outcomes.iter().map(|outcome| outcome.routed.len()).sum(); + println!( + "Auto-dispatched {} task handoff(s) across {} lead session(s)", + total_routed, + outcomes.len() + ); + for outcome in outcomes { + println!( + "- {} | unread {} | routed {}", + short_session(&outcome.lead_session_id), + outcome.unread_count, + outcome.routed.len() + ); + } + } + } Some(Commands::Sessions) => { let sessions = session::manager::list_sessions(&db)?; for s in sessions { @@ -623,4 +667,29 @@ mod tests { _ => panic!("expected drain-inbox subcommand"), } } + + #[test] + fn cli_parses_auto_dispatch_command() { + let cli = Cli::try_parse_from([ + "ecc", + "auto-dispatch", + "--agent", + "claude", + "--lead-limit", + "4", + ]) + .expect("auto-dispatch should parse"); + + match cli.command { + Some(Commands::AutoDispatch { + agent, + lead_limit, + .. + }) => { + assert_eq!(agent, "claude"); + assert_eq!(lead_limit, 4); + } + _ => panic!("expected auto-dispatch subcommand"), + } + } } diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 3502c402..ab2dfd91 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -133,6 +133,39 @@ pub async fn drain_inbox( Ok(outcomes) } +pub async fn auto_dispatch_backlog( + db: &StateStore, + cfg: &Config, + agent_type: &str, + use_worktree: bool, + lead_limit: usize, +) -> Result> { + let targets = db.unread_task_handoff_targets(lead_limit)?; + let mut outcomes = Vec::new(); + + for (lead_id, unread_count) in targets { + let routed = drain_inbox( + db, + cfg, + &lead_id, + agent_type, + use_worktree, + cfg.auto_dispatch_limit_per_session, + ) + .await?; + + if !routed.is_empty() { + outcomes.push(LeadDispatchOutcome { + lead_session_id: lead_id, + unread_count, + routed, + }); + } + } + + Ok(outcomes) +} + pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> { stop_session_with_options(db, id, true).await } @@ -811,6 +844,12 @@ pub struct InboxDrainOutcome { pub action: AssignmentAction, } +pub struct LeadDispatchOutcome { + pub lead_session_id: String, + pub unread_count: usize, + pub routed: Vec, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum AssignmentAction { Spawned, @@ -964,6 +1003,8 @@ mod tests { session_timeout_secs: 60, heartbeat_interval_secs: 5, default_agent: "claude".to_string(), + auto_dispatch_unread_handoffs: false, + auto_dispatch_limit_per_session: 5, cost_budget_usd: 10.0, token_budget: 500_000, theme: Theme::Dark, @@ -1639,4 +1680,63 @@ mod tests { Ok(()) } + + #[tokio::test(flavor = "current_thread")] + async fn auto_dispatch_backlog_routes_multiple_lead_inboxes() -> Result<()> { + let tempdir = TestDir::new("manager-auto-dispatch")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let mut cfg = build_config(tempdir.path()); + cfg.auto_dispatch_limit_per_session = 5; + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + for lead_id in ["lead-a", "lead-b"] { + db.insert_session(&Session { + id: lead_id.to_string(), + task: format!("{lead_id} task"), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(42), + worktree: None, + created_at: now - Duration::minutes(3), + updated_at: now - Duration::minutes(3), + metrics: SessionMetrics::default(), + })?; + } + + db.send_message( + "planner", + "lead-a", + "{\"task\":\"Review auth\",\"context\":\"Inbound\"}", + "task_handoff", + )?; + db.send_message( + "planner", + "lead-b", + "{\"task\":\"Review billing\",\"context\":\"Inbound\"}", + "task_handoff", + )?; + + let outcomes = auto_dispatch_backlog(&db, &cfg, "claude", true, 10).await?; + assert_eq!(outcomes.len(), 2); + assert!(outcomes.iter().any(|outcome| { + outcome.lead_session_id == "lead-a" + && outcome.unread_count == 1 + && outcome.routed.len() == 1 + })); + assert!(outcomes.iter().any(|outcome| { + outcome.lead_session_id == "lead-b" + && outcome.unread_count == 1 + && outcome.routed.len() == 1 + })); + + let unread = db.unread_task_handoff_targets(10)?; + assert!(!unread.iter().any(|(session_id, _)| session_id == "lead-a")); + assert!(!unread.iter().any(|(session_id, _)| session_id == "lead-b")); + + Ok(()) + } } diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index e9519264..d8e187e1 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -436,6 +436,25 @@ impl StateStore { .map_err(Into::into) } + pub fn unread_task_handoff_targets(&self, limit: usize) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT to_session, COUNT(*) as unread_count + FROM messages + WHERE msg_type = 'task_handoff' AND read = 0 + GROUP BY to_session + ORDER BY unread_count DESC, MAX(id) ASC + LIMIT ?1", + )?; + + let targets = stmt.query_map(rusqlite::params![limit as i64], |row| { + Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)? as usize)) + })?; + + targets + .collect::, _>>() + .map_err(Into::into) + } + pub fn mark_messages_read(&self, session_id: &str) -> Result { let updated = self.conn.execute( "UPDATE messages SET read = 1 WHERE to_session = ?1 AND read = 0", @@ -826,6 +845,13 @@ mod tests { "worker-2".to_string(), ] ); + assert_eq!( + db.unread_task_handoff_targets(10)?, + vec![ + ("worker-2".to_string(), 1), + ("worker-3".to_string(), 1), + ] + ); Ok(()) }