From e567dc39c8de1efcd04bbc07ddf308d260d43526 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Tue, 7 Apr 2026 13:00:20 -0700 Subject: [PATCH] feat: add ecc2 daemon auto-dispatch pass --- ecc2/src/session/daemon.rs | 123 +++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index b2fb0372..842d23d1 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -1,7 +1,9 @@ use anyhow::Result; +use std::future::Future; use std::time::Duration; use tokio::time; +use super::manager; use super::store::StateStore; use super::SessionState; use crate::config::Config; @@ -20,6 +22,10 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> { tracing::error!("Session check failed: {e}"); } + if let Err(e) = maybe_auto_dispatch(&db, &cfg).await { + tracing::error!("Auto-dispatch pass failed: {e}"); + } + time::sleep(heartbeat_interval).await; } } @@ -83,6 +89,53 @@ fn check_sessions(db: &StateStore, timeout: Duration) -> Result<()> { Ok(()) } +async fn maybe_auto_dispatch(db: &StateStore, cfg: &Config) -> Result { + if !cfg.auto_dispatch_unread_handoffs { + return Ok(0); + } + + let outcomes = manager::auto_dispatch_backlog( + db, + cfg, + &cfg.default_agent, + true, + cfg.max_parallel_sessions, + ) + .await?; + let routed: usize = outcomes.iter().map(|outcome| outcome.routed.len()).sum(); + + if routed > 0 { + tracing::info!( + "Auto-dispatched {routed} task handoff(s) across {} lead session(s)", + outcomes.len() + ); + } + + Ok(routed) +} + +async fn maybe_auto_dispatch_with(cfg: &Config, dispatch: F) -> Result +where + F: Fn() -> Fut, + Fut: Future>>, +{ + if !cfg.auto_dispatch_unread_handoffs { + return Ok(0); + } + + let outcomes = dispatch().await?; + let routed: usize = outcomes.iter().map(|outcome| outcome.routed.len()).sum(); + + if routed > 0 { + tracing::info!( + "Auto-dispatched {routed} task handoff(s) across {} lead session(s)", + outcomes.len() + ); + } + + Ok(routed) +} + #[cfg(unix)] fn pid_is_alive(pid: u32) -> bool { if pid == 0 { @@ -109,6 +162,7 @@ fn pid_is_alive(_pid: u32) -> bool { #[cfg(test)] mod tests { use super::*; + use crate::session::manager::{AssignmentAction, InboxDrainOutcome, LeadDispatchOutcome}; use crate::session::{Session, SessionMetrics, SessionState}; use std::path::PathBuf; @@ -175,4 +229,73 @@ mod tests { let _ = std::fs::remove_file(path); Ok(()) } + + #[tokio::test] + async fn maybe_auto_dispatch_noops_when_disabled() -> Result<()> { + let path = temp_db_path(); + let _store = StateStore::open(&path)?; + let cfg = Config::default(); + let invoked = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let invoked_flag = invoked.clone(); + + let routed = maybe_auto_dispatch_with(&cfg, move || { + let invoked_flag = invoked_flag.clone(); + async move { + invoked_flag.store(true, std::sync::atomic::Ordering::SeqCst); + Ok(Vec::new()) + } + }) + .await?; + + assert_eq!(routed, 0); + assert!(!invoked.load(std::sync::atomic::Ordering::SeqCst)); + let _ = std::fs::remove_file(path); + Ok(()) + } + + #[tokio::test] + async fn maybe_auto_dispatch_reports_total_routed_work() -> Result<()> { + let path = temp_db_path(); + let _store = StateStore::open(&path)?; + let mut cfg = Config::default(); + cfg.auto_dispatch_unread_handoffs = true; + + let routed = maybe_auto_dispatch_with(&cfg, || async move { + Ok(vec![ + LeadDispatchOutcome { + lead_session_id: "lead-a".to_string(), + unread_count: 2, + routed: vec![ + InboxDrainOutcome { + message_id: 1, + task: "Task A".to_string(), + session_id: "worker-a".to_string(), + action: AssignmentAction::Spawned, + }, + InboxDrainOutcome { + message_id: 2, + task: "Task B".to_string(), + session_id: "worker-b".to_string(), + action: AssignmentAction::ReusedIdle, + }, + ], + }, + LeadDispatchOutcome { + lead_session_id: "lead-b".to_string(), + unread_count: 1, + routed: vec![InboxDrainOutcome { + message_id: 3, + task: "Task C".to_string(), + session_id: "worker-c".to_string(), + action: AssignmentAction::ReusedActive, + }], + }, + ]) + }) + .await?; + + assert_eq!(routed, 3); + let _ = std::fs::remove_file(path); + Ok(()) + } }