feat: add ecc2 inbox drain routing

This commit is contained in:
Affaan Mustafa
2026-04-07 12:51:28 -07:00
parent 8ff5e736cd
commit 7622973452
3 changed files with 233 additions and 0 deletions

View File

@@ -64,6 +64,20 @@ enum Commands {
#[arg(short, long, default_value_t = true)]
worktree: bool,
},
/// Route unread task handoffs from a lead session inbox through the assignment policy
DrainInbox {
/// Lead session ID or alias
session_id: String,
/// Agent type for routed delegates
#[arg(short, long, default_value = "claude")]
agent: String,
/// Create a dedicated worktree if new delegates must be spawned
#[arg(short, long, default_value_t = true)]
worktree: bool,
/// Maximum unread task handoffs to route
#[arg(long, default_value_t = 5)]
limit: usize,
},
/// List active sessions
Sessions,
/// Show session details
@@ -226,6 +240,45 @@ async fn main() -> Result<()> {
}
);
}
Some(Commands::DrainInbox {
session_id,
agent,
worktree: use_worktree,
limit,
}) => {
let lead_id = resolve_session_id(&db, &session_id)?;
let outcomes = session::manager::drain_inbox(
&db,
&cfg,
&lead_id,
&agent,
use_worktree,
limit,
)
.await?;
if outcomes.is_empty() {
println!("No unread task handoffs for {}", short_session(&lead_id));
} else {
println!(
"Routed {} inbox task handoff(s) from {}",
outcomes.len(),
short_session(&lead_id)
);
for outcome in outcomes {
println!(
"- {} -> {} ({}) | {}",
outcome.message_id,
short_session(&outcome.session_id),
match outcome.action {
session::manager::AssignmentAction::Spawned => "spawned",
session::manager::AssignmentAction::ReusedIdle => "reused-idle",
session::manager::AssignmentAction::ReusedActive => "reused-active",
},
outcome.task
);
}
}
}
Some(Commands::Sessions) => {
let sessions = session::manager::list_sessions(&db)?;
for s in sessions {
@@ -542,4 +595,32 @@ mod tests {
_ => panic!("expected assign subcommand"),
}
}
#[test]
fn cli_parses_drain_inbox_command() {
let cli = Cli::try_parse_from([
"ecc",
"drain-inbox",
"lead",
"--agent",
"claude",
"--limit",
"3",
])
.expect("drain-inbox should parse");
match cli.command {
Some(Commands::DrainInbox {
session_id,
agent,
limit,
..
}) => {
assert_eq!(session_id, "lead");
assert_eq!(agent, "claude");
assert_eq!(limit, 3);
}
_ => panic!("expected drain-inbox subcommand"),
}
}
}

View File

@@ -9,6 +9,7 @@ use super::output::SessionOutputStore;
use super::runtime::capture_command_output;
use super::store::StateStore;
use super::{Session, SessionMetrics, SessionState};
use crate::comms::{self, MessageType};
use crate::config::Config;
use crate::observability::{log_tool_call, ToolCallEvent, ToolLogEntry, ToolLogPage, ToolLogger};
use crate::worktree;
@@ -86,6 +87,52 @@ pub async fn assign_session(
.await
}
pub async fn drain_inbox(
db: &StateStore,
cfg: &Config,
lead_id: &str,
agent_type: &str,
use_worktree: bool,
limit: usize,
) -> Result<Vec<InboxDrainOutcome>> {
let repo_root =
std::env::current_dir().context("Failed to resolve current working directory")?;
let runner_program = std::env::current_exe().context("Failed to resolve ECC executable path")?;
let lead = resolve_session(db, lead_id)?;
let messages = db.unread_task_handoffs_for_session(&lead.id, limit)?;
let mut outcomes = Vec::new();
for message in messages {
let task = match comms::parse(&message.content) {
Some(MessageType::TaskHandoff { task, .. }) => task,
_ => extract_legacy_handoff_task(&message.content)
.unwrap_or_else(|| message.content.clone()),
};
let outcome = assign_session_in_dir_with_runner_program(
db,
cfg,
&lead.id,
&task,
agent_type,
use_worktree,
&repo_root,
&runner_program,
)
.await?;
let _ = db.mark_message_read(message.id)?;
outcomes.push(InboxDrainOutcome {
message_id: message.id,
task,
session_id: outcome.session_id,
action: outcome.action,
});
}
Ok(outcomes)
}
pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> {
stop_session_with_options(db, id, true).await
}
@@ -599,6 +646,14 @@ fn send_task_handoff(
)
}
fn extract_legacy_handoff_task(content: &str) -> Option<String> {
let value: serde_json::Value = serde_json::from_str(content).ok()?;
value
.get("task")
.and_then(|task| task.as_str())
.map(ToOwned::to_owned)
}
async fn spawn_session_runner_for_program(
task: &str,
session_id: &str,
@@ -749,6 +804,13 @@ pub struct AssignmentOutcome {
pub action: AssignmentAction,
}
pub struct InboxDrainOutcome {
pub message_id: i64,
pub task: String,
pub session_id: String,
pub action: AssignmentAction,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AssignmentAction {
Spawned,
@@ -1530,4 +1592,51 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn drain_inbox_routes_unread_task_handoffs_and_marks_them_read() -> Result<()> {
let tempdir = TestDir::new("manager-drain-inbox")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
pid: Some(42),
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.send_message(
"planner",
"lead",
"{\"task\":\"Review auth changes\",\"context\":\"Inbound request\"}",
"task_handoff",
)?;
let outcomes = drain_inbox(&db, &cfg, "lead", "claude", true, 5).await?;
assert_eq!(outcomes.len(), 1);
assert_eq!(outcomes[0].task, "Review auth changes");
assert_eq!(outcomes[0].action, AssignmentAction::Spawned);
let unread = db.unread_message_counts()?;
assert_eq!(unread.get("lead"), None);
let messages = db.list_messages_for_session(&outcomes[0].session_id, 10)?;
assert!(messages.iter().any(|message| {
message.msg_type == "task_handoff"
&& message.content.contains("Review auth changes")
}));
Ok(())
}
}

View File

@@ -402,6 +402,40 @@ impl StateStore {
Ok(counts)
}
pub fn unread_task_handoffs_for_session(
&self,
session_id: &str,
limit: usize,
) -> Result<Vec<SessionMessage>> {
let mut stmt = self.conn.prepare(
"SELECT id, from_session, to_session, content, msg_type, read, timestamp
FROM messages
WHERE to_session = ?1 AND msg_type = 'task_handoff' AND read = 0
ORDER BY id ASC
LIMIT ?2",
)?;
let messages = stmt.query_map(rusqlite::params![session_id, limit as i64], |row| {
let timestamp: String = row.get(6)?;
Ok(SessionMessage {
id: row.get(0)?,
from_session: row.get(1)?,
to_session: row.get(2)?,
content: row.get(3)?,
msg_type: row.get(4)?,
read: row.get::<_, i64>(5)? != 0,
timestamp: chrono::DateTime::parse_from_rfc3339(&timestamp)
.unwrap_or_default()
.with_timezone(&chrono::Utc),
})
})?;
messages
.collect::<Result<Vec<_>, _>>()
.map_err(Into::into)
}
pub fn mark_messages_read(&self, session_id: &str) -> Result<usize> {
let updated = self.conn.execute(
"UPDATE messages SET read = 1 WHERE to_session = ?1 AND read = 0",
@@ -411,6 +445,15 @@ impl StateStore {
Ok(updated)
}
pub fn mark_message_read(&self, message_id: i64) -> Result<usize> {
let updated = self.conn.execute(
"UPDATE messages SET read = 1 WHERE id = ?1 AND read = 0",
rusqlite::params![message_id],
)?;
Ok(updated)
}
pub fn latest_task_handoff_source(&self, session_id: &str) -> Result<Option<String>> {
self.conn
.query_row(