feat: add ecc2 auto-dispatch backlog sweep

This commit is contained in:
Affaan Mustafa
2026-04-07 12:57:12 -07:00
parent df3ac98ce3
commit 2d5d0e5c1d
4 changed files with 207 additions and 0 deletions

View File

@@ -29,6 +29,8 @@ pub struct Config {
pub session_timeout_secs: u64, pub session_timeout_secs: u64,
pub heartbeat_interval_secs: u64, pub heartbeat_interval_secs: u64,
pub default_agent: String, pub default_agent: String,
pub auto_dispatch_unread_handoffs: bool,
pub auto_dispatch_limit_per_session: usize,
pub cost_budget_usd: f64, pub cost_budget_usd: f64,
pub token_budget: u64, pub token_budget: u64,
pub theme: Theme, pub theme: Theme,
@@ -53,6 +55,8 @@ impl Default for Config {
session_timeout_secs: 3600, session_timeout_secs: 3600,
heartbeat_interval_secs: 30, heartbeat_interval_secs: 30,
default_agent: "claude".to_string(), default_agent: "claude".to_string(),
auto_dispatch_unread_handoffs: false,
auto_dispatch_limit_per_session: 5,
cost_budget_usd: 10.0, cost_budget_usd: 10.0,
token_budget: 500_000, token_budget: 500_000,
theme: Theme::Dark, theme: Theme::Dark,
@@ -123,6 +127,14 @@ theme = "Dark"
assert_eq!(config.token_budget, defaults.token_budget); assert_eq!(config.token_budget, defaults.token_budget);
assert_eq!(config.pane_layout, defaults.pane_layout); assert_eq!(config.pane_layout, defaults.pane_layout);
assert_eq!(config.risk_thresholds, defaults.risk_thresholds); assert_eq!(config.risk_thresholds, defaults.risk_thresholds);
assert_eq!(
config.auto_dispatch_unread_handoffs,
defaults.auto_dispatch_unread_handoffs
);
assert_eq!(
config.auto_dispatch_limit_per_session,
defaults.auto_dispatch_limit_per_session
);
} }
#[test] #[test]

View File

@@ -78,6 +78,18 @@ enum Commands {
#[arg(long, default_value_t = 5)] #[arg(long, default_value_t = 5)]
limit: usize, limit: usize,
}, },
/// Sweep unread task handoffs across lead sessions and route them through the assignment policy
AutoDispatch {
/// Agent type for routed delegates
#[arg(short, long, default_value = "claude")]
agent: String,
/// Create a dedicated worktree if new delegates must be spawned
#[arg(short, long, default_value_t = true)]
worktree: bool,
/// Maximum lead sessions to sweep in one pass
#[arg(long, default_value_t = 10)]
lead_limit: usize,
},
/// List active sessions /// List active sessions
Sessions, Sessions,
/// Show session details /// Show session details
@@ -279,6 +291,38 @@ async fn main() -> Result<()> {
} }
} }
} }
Some(Commands::AutoDispatch {
agent,
worktree: use_worktree,
lead_limit,
}) => {
let outcomes = session::manager::auto_dispatch_backlog(
&db,
&cfg,
&agent,
use_worktree,
lead_limit,
)
.await?;
if outcomes.is_empty() {
println!("No unread task handoff backlog found");
} else {
let total_routed: usize = outcomes.iter().map(|outcome| outcome.routed.len()).sum();
println!(
"Auto-dispatched {} task handoff(s) across {} lead session(s)",
total_routed,
outcomes.len()
);
for outcome in outcomes {
println!(
"- {} | unread {} | routed {}",
short_session(&outcome.lead_session_id),
outcome.unread_count,
outcome.routed.len()
);
}
}
}
Some(Commands::Sessions) => { Some(Commands::Sessions) => {
let sessions = session::manager::list_sessions(&db)?; let sessions = session::manager::list_sessions(&db)?;
for s in sessions { for s in sessions {
@@ -623,4 +667,29 @@ mod tests {
_ => panic!("expected drain-inbox subcommand"), _ => panic!("expected drain-inbox subcommand"),
} }
} }
#[test]
fn cli_parses_auto_dispatch_command() {
let cli = Cli::try_parse_from([
"ecc",
"auto-dispatch",
"--agent",
"claude",
"--lead-limit",
"4",
])
.expect("auto-dispatch should parse");
match cli.command {
Some(Commands::AutoDispatch {
agent,
lead_limit,
..
}) => {
assert_eq!(agent, "claude");
assert_eq!(lead_limit, 4);
}
_ => panic!("expected auto-dispatch subcommand"),
}
}
} }

View File

@@ -133,6 +133,39 @@ pub async fn drain_inbox(
Ok(outcomes) Ok(outcomes)
} }
pub async fn auto_dispatch_backlog(
db: &StateStore,
cfg: &Config,
agent_type: &str,
use_worktree: bool,
lead_limit: usize,
) -> Result<Vec<LeadDispatchOutcome>> {
let targets = db.unread_task_handoff_targets(lead_limit)?;
let mut outcomes = Vec::new();
for (lead_id, unread_count) in targets {
let routed = drain_inbox(
db,
cfg,
&lead_id,
agent_type,
use_worktree,
cfg.auto_dispatch_limit_per_session,
)
.await?;
if !routed.is_empty() {
outcomes.push(LeadDispatchOutcome {
lead_session_id: lead_id,
unread_count,
routed,
});
}
}
Ok(outcomes)
}
pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> { pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> {
stop_session_with_options(db, id, true).await stop_session_with_options(db, id, true).await
} }
@@ -811,6 +844,12 @@ pub struct InboxDrainOutcome {
pub action: AssignmentAction, pub action: AssignmentAction,
} }
pub struct LeadDispatchOutcome {
pub lead_session_id: String,
pub unread_count: usize,
pub routed: Vec<InboxDrainOutcome>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AssignmentAction { pub enum AssignmentAction {
Spawned, Spawned,
@@ -964,6 +1003,8 @@ mod tests {
session_timeout_secs: 60, session_timeout_secs: 60,
heartbeat_interval_secs: 5, heartbeat_interval_secs: 5,
default_agent: "claude".to_string(), default_agent: "claude".to_string(),
auto_dispatch_unread_handoffs: false,
auto_dispatch_limit_per_session: 5,
cost_budget_usd: 10.0, cost_budget_usd: 10.0,
token_budget: 500_000, token_budget: 500_000,
theme: Theme::Dark, theme: Theme::Dark,
@@ -1639,4 +1680,63 @@ mod tests {
Ok(()) Ok(())
} }
#[tokio::test(flavor = "current_thread")]
async fn auto_dispatch_backlog_routes_multiple_lead_inboxes() -> Result<()> {
let tempdir = TestDir::new("manager-auto-dispatch")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let mut cfg = build_config(tempdir.path());
cfg.auto_dispatch_limit_per_session = 5;
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
for lead_id in ["lead-a", "lead-b"] {
db.insert_session(&Session {
id: lead_id.to_string(),
task: format!("{lead_id} task"),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
pid: Some(42),
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
}
db.send_message(
"planner",
"lead-a",
"{\"task\":\"Review auth\",\"context\":\"Inbound\"}",
"task_handoff",
)?;
db.send_message(
"planner",
"lead-b",
"{\"task\":\"Review billing\",\"context\":\"Inbound\"}",
"task_handoff",
)?;
let outcomes = auto_dispatch_backlog(&db, &cfg, "claude", true, 10).await?;
assert_eq!(outcomes.len(), 2);
assert!(outcomes.iter().any(|outcome| {
outcome.lead_session_id == "lead-a"
&& outcome.unread_count == 1
&& outcome.routed.len() == 1
}));
assert!(outcomes.iter().any(|outcome| {
outcome.lead_session_id == "lead-b"
&& outcome.unread_count == 1
&& outcome.routed.len() == 1
}));
let unread = db.unread_task_handoff_targets(10)?;
assert!(!unread.iter().any(|(session_id, _)| session_id == "lead-a"));
assert!(!unread.iter().any(|(session_id, _)| session_id == "lead-b"));
Ok(())
}
} }

View File

@@ -436,6 +436,25 @@ impl StateStore {
.map_err(Into::into) .map_err(Into::into)
} }
pub fn unread_task_handoff_targets(&self, limit: usize) -> Result<Vec<(String, usize)>> {
let mut stmt = self.conn.prepare(
"SELECT to_session, COUNT(*) as unread_count
FROM messages
WHERE msg_type = 'task_handoff' AND read = 0
GROUP BY to_session
ORDER BY unread_count DESC, MAX(id) ASC
LIMIT ?1",
)?;
let targets = stmt.query_map(rusqlite::params![limit as i64], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)? as usize))
})?;
targets
.collect::<Result<Vec<_>, _>>()
.map_err(Into::into)
}
pub fn mark_messages_read(&self, session_id: &str) -> Result<usize> { pub fn mark_messages_read(&self, session_id: &str) -> Result<usize> {
let updated = self.conn.execute( let updated = self.conn.execute(
"UPDATE messages SET read = 1 WHERE to_session = ?1 AND read = 0", "UPDATE messages SET read = 1 WHERE to_session = ?1 AND read = 0",
@@ -826,6 +845,13 @@ mod tests {
"worker-2".to_string(), "worker-2".to_string(),
] ]
); );
assert_eq!(
db.unread_task_handoff_targets(10)?,
vec![
("worker-2".to_string(), 1),
("worker-3".to_string(), 1),
]
);
Ok(()) Ok(())
} }