From 762297345230e23a250cc31b298a88e7346082e2 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Tue, 7 Apr 2026 12:51:28 -0700 Subject: [PATCH] feat: add ecc2 inbox drain routing --- ecc2/src/main.rs | 81 +++++++++++++++++++++++++++ ecc2/src/session/manager.rs | 109 ++++++++++++++++++++++++++++++++++++ ecc2/src/session/store.rs | 43 ++++++++++++++ 3 files changed, 233 insertions(+) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index be6094b0..9be6cc8d 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -64,6 +64,20 @@ enum Commands { #[arg(short, long, default_value_t = true)] worktree: bool, }, + /// Route unread task handoffs from a lead session inbox through the assignment policy + DrainInbox { + /// Lead session ID or alias + session_id: String, + /// Agent type for routed delegates + #[arg(short, long, default_value = "claude")] + agent: String, + /// Create a dedicated worktree if new delegates must be spawned + #[arg(short, long, default_value_t = true)] + worktree: bool, + /// Maximum unread task handoffs to route + #[arg(long, default_value_t = 5)] + limit: usize, + }, /// List active sessions Sessions, /// Show session details @@ -226,6 +240,45 @@ async fn main() -> Result<()> { } ); } + Some(Commands::DrainInbox { + session_id, + agent, + worktree: use_worktree, + limit, + }) => { + let lead_id = resolve_session_id(&db, &session_id)?; + let outcomes = session::manager::drain_inbox( + &db, + &cfg, + &lead_id, + &agent, + use_worktree, + limit, + ) + .await?; + if outcomes.is_empty() { + println!("No unread task handoffs for {}", short_session(&lead_id)); + } else { + println!( + "Routed {} inbox task handoff(s) from {}", + outcomes.len(), + short_session(&lead_id) + ); + for outcome in outcomes { + println!( + "- {} -> {} ({}) | {}", + outcome.message_id, + short_session(&outcome.session_id), + match outcome.action { + session::manager::AssignmentAction::Spawned => "spawned", + session::manager::AssignmentAction::ReusedIdle => "reused-idle", + session::manager::AssignmentAction::ReusedActive => "reused-active", + }, + outcome.task + ); + } + } + } Some(Commands::Sessions) => { let sessions = session::manager::list_sessions(&db)?; for s in sessions { @@ -542,4 +595,32 @@ mod tests { _ => panic!("expected assign subcommand"), } } + + #[test] + fn cli_parses_drain_inbox_command() { + let cli = Cli::try_parse_from([ + "ecc", + "drain-inbox", + "lead", + "--agent", + "claude", + "--limit", + "3", + ]) + .expect("drain-inbox should parse"); + + match cli.command { + Some(Commands::DrainInbox { + session_id, + agent, + limit, + .. + }) => { + assert_eq!(session_id, "lead"); + assert_eq!(agent, "claude"); + assert_eq!(limit, 3); + } + _ => panic!("expected drain-inbox subcommand"), + } + } } diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index ae0f2360..3502c402 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -9,6 +9,7 @@ use super::output::SessionOutputStore; use super::runtime::capture_command_output; use super::store::StateStore; use super::{Session, SessionMetrics, SessionState}; +use crate::comms::{self, MessageType}; use crate::config::Config; use crate::observability::{log_tool_call, ToolCallEvent, ToolLogEntry, ToolLogPage, ToolLogger}; use crate::worktree; @@ -86,6 +87,52 @@ pub async fn assign_session( .await } +pub async fn drain_inbox( + db: &StateStore, + cfg: &Config, + lead_id: &str, + agent_type: &str, + use_worktree: bool, + limit: usize, +) -> Result> { + let repo_root = + std::env::current_dir().context("Failed to resolve current working directory")?; + let runner_program = std::env::current_exe().context("Failed to resolve ECC executable path")?; + let lead = resolve_session(db, lead_id)?; + let messages = db.unread_task_handoffs_for_session(&lead.id, limit)?; + let mut outcomes = Vec::new(); + + for message in messages { + let task = match comms::parse(&message.content) { + Some(MessageType::TaskHandoff { task, .. }) => task, + _ => extract_legacy_handoff_task(&message.content) + .unwrap_or_else(|| message.content.clone()), + }; + + let outcome = assign_session_in_dir_with_runner_program( + db, + cfg, + &lead.id, + &task, + agent_type, + use_worktree, + &repo_root, + &runner_program, + ) + .await?; + + let _ = db.mark_message_read(message.id)?; + outcomes.push(InboxDrainOutcome { + message_id: message.id, + task, + session_id: outcome.session_id, + action: outcome.action, + }); + } + + Ok(outcomes) +} + pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> { stop_session_with_options(db, id, true).await } @@ -599,6 +646,14 @@ fn send_task_handoff( ) } +fn extract_legacy_handoff_task(content: &str) -> Option { + let value: serde_json::Value = serde_json::from_str(content).ok()?; + value + .get("task") + .and_then(|task| task.as_str()) + .map(ToOwned::to_owned) +} + async fn spawn_session_runner_for_program( task: &str, session_id: &str, @@ -749,6 +804,13 @@ pub struct AssignmentOutcome { pub action: AssignmentAction, } +pub struct InboxDrainOutcome { + pub message_id: i64, + pub task: String, + pub session_id: String, + pub action: AssignmentAction, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum AssignmentAction { Spawned, @@ -1530,4 +1592,51 @@ mod tests { Ok(()) } + + #[tokio::test(flavor = "current_thread")] + async fn drain_inbox_routes_unread_task_handoffs_and_marks_them_read() -> Result<()> { + let tempdir = TestDir::new("manager-drain-inbox")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "lead".to_string(), + task: "lead task".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(42), + worktree: None, + created_at: now - Duration::minutes(3), + updated_at: now - Duration::minutes(3), + metrics: SessionMetrics::default(), + })?; + + db.send_message( + "planner", + "lead", + "{\"task\":\"Review auth changes\",\"context\":\"Inbound request\"}", + "task_handoff", + )?; + + let outcomes = drain_inbox(&db, &cfg, "lead", "claude", true, 5).await?; + assert_eq!(outcomes.len(), 1); + assert_eq!(outcomes[0].task, "Review auth changes"); + assert_eq!(outcomes[0].action, AssignmentAction::Spawned); + + let unread = db.unread_message_counts()?; + assert_eq!(unread.get("lead"), None); + + let messages = db.list_messages_for_session(&outcomes[0].session_id, 10)?; + assert!(messages.iter().any(|message| { + message.msg_type == "task_handoff" + && message.content.contains("Review auth changes") + })); + + Ok(()) + } } diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index e7688a5b..e9519264 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -402,6 +402,40 @@ impl StateStore { Ok(counts) } + pub fn unread_task_handoffs_for_session( + &self, + session_id: &str, + limit: usize, + ) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, from_session, to_session, content, msg_type, read, timestamp + FROM messages + WHERE to_session = ?1 AND msg_type = 'task_handoff' AND read = 0 + ORDER BY id ASC + LIMIT ?2", + )?; + + let messages = stmt.query_map(rusqlite::params![session_id, limit as i64], |row| { + let timestamp: String = row.get(6)?; + + Ok(SessionMessage { + id: row.get(0)?, + from_session: row.get(1)?, + to_session: row.get(2)?, + content: row.get(3)?, + msg_type: row.get(4)?, + read: row.get::<_, i64>(5)? != 0, + timestamp: chrono::DateTime::parse_from_rfc3339(×tamp) + .unwrap_or_default() + .with_timezone(&chrono::Utc), + }) + })?; + + messages + .collect::, _>>() + .map_err(Into::into) + } + pub fn mark_messages_read(&self, session_id: &str) -> Result { let updated = self.conn.execute( "UPDATE messages SET read = 1 WHERE to_session = ?1 AND read = 0", @@ -411,6 +445,15 @@ impl StateStore { Ok(updated) } + pub fn mark_message_read(&self, message_id: i64) -> Result { + let updated = self.conn.execute( + "UPDATE messages SET read = 1 WHERE id = ?1 AND read = 0", + rusqlite::params![message_id], + )?; + + Ok(updated) + } + pub fn latest_task_handoff_source(&self, session_id: &str) -> Result> { self.conn .query_row(