From 780951861270057cb3a17dcae76bbae3b6f929c0 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Fri, 10 Apr 2026 09:21:30 -0700 Subject: [PATCH] feat: add ecc2 remote dispatch intake --- ecc2/src/comms/mod.rs | 13 ++ ecc2/src/main.rs | 427 +++++++++++++++++++++++++++++++++++- ecc2/src/session/daemon.rs | 23 ++ ecc2/src/session/manager.rs | 366 ++++++++++++++++++++++++++++++- ecc2/src/session/mod.rs | 51 +++++ ecc2/src/session/store.rs | 224 ++++++++++++++++++- 6 files changed, 1098 insertions(+), 6 deletions(-) diff --git a/ecc2/src/comms/mod.rs b/ecc2/src/comms/mod.rs index 376dfd57..f7838f29 100644 --- a/ecc2/src/comms/mod.rs +++ b/ecc2/src/comms/mod.rs @@ -1,5 +1,6 @@ use anyhow::Result; use serde::{Deserialize, Serialize}; +use std::fmt; use crate::session::store::StateStore; @@ -13,6 +14,18 @@ pub enum TaskPriority { Critical, } +impl fmt::Display for TaskPriority { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let label = match self { + Self::Low => "low", + Self::Normal => "normal", + Self::High => "high", + Self::Critical => "critical", + }; + write!(f, "{label}") + } +} + /// Message types for inter-agent communication. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum MessageType { diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 355a279c..33580245 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -11,7 +11,8 @@ use clap::Parser; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::fs::File; -use std::io::{BufRead, BufReader}; +use std::io::{BufRead, BufReader, Read, Write}; +use std::net::{TcpListener, TcpStream}; use std::path::{Path, PathBuf}; use tracing_subscriber::EnvFilter; @@ -327,6 +328,11 @@ enum Commands { #[command(subcommand)] command: ScheduleCommands, }, + /// Manage remote task intake and dispatch + Remote { + #[command(subcommand)] + command: RemoteCommands, + }, /// Export sessions, tool spans, and metrics in OTLP-compatible JSON ExportOtel { /// Session ID or alias. Omit to export all sessions. @@ -442,6 +448,69 @@ enum ScheduleCommands { }, } +#[derive(clap::Subcommand, Debug)] +enum RemoteCommands { + /// Queue a remote task request + Add { + /// Task description to dispatch + #[arg(short, long)] + task: String, + /// Optional lead session ID or alias to route through + #[arg(long)] + to_session: Option, + /// Task priority + #[arg(long, value_enum, default_value_t = TaskPriorityArg::Normal)] + priority: TaskPriorityArg, + /// Agent type (defaults to ECC default agent) + #[arg(short, long)] + agent: Option, + /// Agent profile defined in ecc2.toml + #[arg(long)] + profile: Option, + #[command(flatten)] + worktree: WorktreePolicyArgs, + /// Optional project grouping override + #[arg(long)] + project: Option, + /// Optional task-group grouping override + #[arg(long)] + task_group: Option, + /// Emit machine-readable JSON instead of the human summary + #[arg(long)] + json: bool, + }, + /// List queued remote task requests + List { + /// Include already dispatched or failed requests + #[arg(long)] + all: bool, + /// Maximum requests to return + #[arg(long, default_value_t = 20)] + limit: usize, + /// Emit machine-readable JSON instead of the human summary + #[arg(long)] + json: bool, + }, + /// Dispatch queued remote task requests now + Run { + /// Maximum queued requests to process + #[arg(long, default_value_t = 20)] + limit: usize, + /// Emit machine-readable JSON instead of the human summary + #[arg(long)] + json: bool, + }, + /// Serve a token-authenticated remote dispatch intake endpoint + Serve { + /// Address to bind, for example 127.0.0.1:8787 + #[arg(long, default_value = "127.0.0.1:8787")] + bind: String, + /// Bearer token required for POST /dispatch + #[arg(long)] + token: String, + }, +} + #[derive(clap::Subcommand, Debug)] enum GraphCommands { /// Create or update a graph entity @@ -656,7 +725,8 @@ enum MessageKindArg { Conflict, } -#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)] +#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] enum TaskPriorityArg { Low, Normal, @@ -734,6 +804,18 @@ struct GraphConnectorStatusReport { connectors: Vec, } +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +struct RemoteDispatchHttpRequest { + task: String, + to_session: Option, + priority: Option, + agent: Option, + profile: Option, + use_worktree: Option, + project: Option, + task_group: Option, +} + #[derive(Debug, Clone, Default, Deserialize)] #[serde(default)] struct JsonlMemoryConnectorRecord { @@ -1870,6 +1952,106 @@ async fn main() -> Result<()> { } } }, + Some(Commands::Remote { command }) => match command { + RemoteCommands::Add { + task, + to_session, + priority, + agent, + profile, + worktree, + project, + task_group, + json, + } => { + let target_session_id = to_session + .as_deref() + .map(|value| resolve_session_id(&db, value)) + .transpose()?; + let request = session::manager::create_remote_dispatch_request( + &db, + &cfg, + &task, + target_session_id.as_deref(), + priority.into(), + agent.as_deref().unwrap_or(&cfg.default_agent), + profile.as_deref(), + worktree.resolve(&cfg), + session::SessionGrouping { + project, + task_group, + }, + "cli", + None, + )?; + if json { + println!("{}", serde_json::to_string_pretty(&request)?); + } else { + println!( + "Queued remote request #{} [{}] {}", + request.id, request.priority, request.task + ); + if let Some(target_session_id) = request.target_session_id.as_deref() { + println!("- target {}", short_session(target_session_id)); + } + } + } + RemoteCommands::List { all, limit, json } => { + let requests = session::manager::list_remote_dispatch_requests(&db, all, limit)?; + if json { + println!("{}", serde_json::to_string_pretty(&requests)?); + } else if requests.is_empty() { + println!("No remote dispatch requests"); + } else { + println!("Remote dispatch requests"); + for request in requests { + let target = request + .target_session_id + .as_deref() + .map(short_session) + .unwrap_or_else(|| "new-session".to_string()); + println!( + "#{} [{}] {} -> {} | {}", + request.id, request.priority, request.status, target, request.task + ); + } + } + } + RemoteCommands::Run { limit, json } => { + let outcomes = + session::manager::run_remote_dispatch_requests(&db, &cfg, limit).await?; + if json { + println!("{}", serde_json::to_string_pretty(&outcomes)?); + } else if outcomes.is_empty() { + println!("No pending remote dispatch requests"); + } else { + println!("Processed {} remote request(s)", outcomes.len()); + for outcome in outcomes { + let target = outcome + .target_session_id + .as_deref() + .map(short_session) + .unwrap_or_else(|| "new-session".to_string()); + let result = outcome + .session_id + .as_deref() + .map(short_session) + .unwrap_or_else(|| "-".to_string()); + println!( + "#{} [{}] {} -> {} | {}", + outcome.request_id, + outcome.priority, + target, + result, + format_remote_dispatch_action(&outcome.action) + ); + } + } + } + RemoteCommands::Serve { bind, token } => { + run_remote_dispatch_server(&db, &cfg, &bind, &token)?; + } + }, Some(Commands::Daemon) => { println!("Starting ECC daemon..."); session::daemon::run(db, cfg).await?; @@ -2894,10 +3076,251 @@ fn build_message( }) } +fn format_remote_dispatch_action(action: &session::manager::RemoteDispatchAction) -> String { + match action { + session::manager::RemoteDispatchAction::SpawnedTopLevel => "spawned top-level".to_string(), + session::manager::RemoteDispatchAction::Assigned(action) => match action { + session::manager::AssignmentAction::Spawned => "spawned delegate".to_string(), + session::manager::AssignmentAction::ReusedIdle => "reused idle delegate".to_string(), + session::manager::AssignmentAction::ReusedActive => { + "reused active delegate".to_string() + } + session::manager::AssignmentAction::DeferredSaturated => { + "deferred (saturated)".to_string() + } + }, + session::manager::RemoteDispatchAction::DeferredSaturated => { + "deferred (saturated)".to_string() + } + session::manager::RemoteDispatchAction::Failed(error) => format!("failed: {error}"), + } +} + fn short_session(session_id: &str) -> String { session_id.chars().take(8).collect() } +fn run_remote_dispatch_server( + db: &session::store::StateStore, + cfg: &config::Config, + bind_addr: &str, + bearer_token: &str, +) -> Result<()> { + let listener = TcpListener::bind(bind_addr) + .with_context(|| format!("Failed to bind remote dispatch server on {bind_addr}"))?; + println!("Remote dispatch server listening on http://{bind_addr}"); + + for stream in listener.incoming() { + match stream { + Ok(mut stream) => { + if let Err(error) = + handle_remote_dispatch_connection(&mut stream, db, cfg, bearer_token) + { + let _ = write_http_response( + &mut stream, + 500, + "application/json", + &serde_json::json!({ + "error": error.to_string(), + }) + .to_string(), + ); + } + } + Err(error) => tracing::warn!("Remote dispatch accept failed: {error}"), + } + } + + Ok(()) +} + +fn handle_remote_dispatch_connection( + stream: &mut TcpStream, + db: &session::store::StateStore, + cfg: &config::Config, + bearer_token: &str, +) -> Result<()> { + let (method, path, headers, body) = read_http_request(stream)?; + match (method.as_str(), path.as_str()) { + ("GET", "/health") => write_http_response( + stream, + 200, + "application/json", + &serde_json::json!({"ok": true}).to_string(), + ), + ("POST", "/dispatch") => { + let auth = headers + .get("authorization") + .map(String::as_str) + .unwrap_or_default(); + let expected = format!("Bearer {bearer_token}"); + if auth != expected { + return write_http_response( + stream, + 401, + "application/json", + &serde_json::json!({"error": "unauthorized"}).to_string(), + ); + } + + let payload: RemoteDispatchHttpRequest = + serde_json::from_slice(&body).context("Invalid remote dispatch JSON body")?; + if payload.task.trim().is_empty() { + return write_http_response( + stream, + 400, + "application/json", + &serde_json::json!({"error": "task is required"}).to_string(), + ); + } + + let target_session_id = match payload + .to_session + .as_deref() + .map(|value| resolve_session_id(db, value)) + .transpose() + { + Ok(value) => value, + Err(error) => { + return write_http_response( + stream, + 400, + "application/json", + &serde_json::json!({"error": error.to_string()}).to_string(), + ); + } + }; + let requester = stream.peer_addr().ok().map(|addr| addr.ip().to_string()); + let request = match session::manager::create_remote_dispatch_request( + db, + cfg, + &payload.task, + target_session_id.as_deref(), + payload.priority.unwrap_or(TaskPriorityArg::Normal).into(), + payload.agent.as_deref().unwrap_or(&cfg.default_agent), + payload.profile.as_deref(), + payload.use_worktree.unwrap_or(cfg.auto_create_worktrees), + session::SessionGrouping { + project: payload.project, + task_group: payload.task_group, + }, + "http", + requester.as_deref(), + ) { + Ok(request) => request, + Err(error) => { + return write_http_response( + stream, + 400, + "application/json", + &serde_json::json!({"error": error.to_string()}).to_string(), + ); + } + }; + + write_http_response( + stream, + 202, + "application/json", + &serde_json::to_string(&request)?, + ) + } + _ => write_http_response( + stream, + 404, + "application/json", + &serde_json::json!({"error": "not found"}).to_string(), + ), + } +} + +fn read_http_request( + stream: &mut TcpStream, +) -> Result<(String, String, BTreeMap, Vec)> { + let mut buffer = Vec::new(); + let mut temp = [0_u8; 1024]; + let header_end = loop { + let read = stream.read(&mut temp)?; + if read == 0 { + anyhow::bail!("Unexpected EOF while reading HTTP request"); + } + buffer.extend_from_slice(&temp[..read]); + if let Some(index) = buffer.windows(4).position(|window| window == b"\r\n\r\n") { + break index + 4; + } + if buffer.len() > 64 * 1024 { + anyhow::bail!("HTTP request headers too large"); + } + }; + + let header_text = String::from_utf8(buffer[..header_end].to_vec()) + .context("HTTP request headers were not valid UTF-8")?; + let mut lines = header_text.split("\r\n"); + let request_line = lines + .next() + .filter(|line| !line.trim().is_empty()) + .ok_or_else(|| anyhow::anyhow!("Missing HTTP request line"))?; + let mut request_parts = request_line.split_whitespace(); + let method = request_parts + .next() + .ok_or_else(|| anyhow::anyhow!("Missing HTTP method"))? + .to_string(); + let path = request_parts + .next() + .ok_or_else(|| anyhow::anyhow!("Missing HTTP path"))? + .to_string(); + + let mut headers = BTreeMap::new(); + for line in lines { + if line.is_empty() { + break; + } + if let Some((key, value)) = line.split_once(':') { + headers.insert(key.trim().to_ascii_lowercase(), value.trim().to_string()); + } + } + + let content_length = headers + .get("content-length") + .and_then(|value| value.parse::().ok()) + .unwrap_or(0); + let mut body = buffer[header_end..].to_vec(); + while body.len() < content_length { + let read = stream.read(&mut temp)?; + if read == 0 { + anyhow::bail!("Unexpected EOF while reading HTTP request body"); + } + body.extend_from_slice(&temp[..read]); + } + body.truncate(content_length); + + Ok((method, path, headers, body)) +} + +fn write_http_response( + stream: &mut TcpStream, + status: u16, + content_type: &str, + body: &str, +) -> Result<()> { + let status_text = match status { + 200 => "OK", + 202 => "Accepted", + 400 => "Bad Request", + 401 => "Unauthorized", + 404 => "Not Found", + _ => "Internal Server Error", + }; + write!( + stream, + "HTTP/1.1 {status} {status_text}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + body.len(), + body + )?; + stream.flush()?; + Ok(()) +} + fn format_coordination_status( status: &session::manager::CoordinationStatus, json: bool, diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index 9f55df04..e62ea0ae 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -31,6 +31,10 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> { tracing::error!("Scheduled task dispatch pass failed: {e}"); } + if let Err(e) = maybe_run_remote_dispatch(&db, &cfg).await { + tracing::error!("Remote dispatch pass failed: {e}"); + } + if let Err(e) = coordinate_backlog_cycle(&db, &cfg).await { tracing::error!("Backlog coordination pass failed: {e}"); } @@ -101,6 +105,25 @@ async fn maybe_run_due_schedules(db: &StateStore, cfg: &Config) -> Result Ok(outcomes.len()) } +async fn maybe_run_remote_dispatch(db: &StateStore, cfg: &Config) -> Result { + let outcomes = + manager::run_remote_dispatch_requests(db, cfg, cfg.max_parallel_sessions).await?; + let routed = outcomes + .iter() + .filter(|outcome| { + matches!( + outcome.action, + manager::RemoteDispatchAction::SpawnedTopLevel + | manager::RemoteDispatchAction::Assigned(_) + ) + }) + .count(); + if routed > 0 { + tracing::info!("Dispatched {} remote request(s)", routed); + } + Ok(routed) +} + async fn maybe_auto_dispatch(db: &StateStore, cfg: &Config) -> Result { let summary = maybe_auto_dispatch_with_recorder( cfg, diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index d5996dc7..e2dccfe5 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -17,7 +17,7 @@ use super::{ ScheduledTask, Session, SessionAgentProfile, SessionGrouping, SessionHarnessInfo, SessionMetrics, SessionState, }; -use crate::comms::{self, MessageType}; +use crate::comms::{self, MessageType, TaskPriority}; use crate::config::Config; use crate::observability::{log_tool_call, ToolCallEvent, ToolLogEntry, ToolLogPage, ToolLogger}; use crate::worktree; @@ -252,6 +252,64 @@ pub fn delete_scheduled_task(db: &StateStore, schedule_id: i64) -> Result Ok(db.delete_scheduled_task(schedule_id)? > 0) } +#[allow(clippy::too_many_arguments)] +pub fn create_remote_dispatch_request( + db: &StateStore, + cfg: &Config, + task: &str, + target_session_id: Option<&str>, + priority: TaskPriority, + agent_type: &str, + profile_name: Option<&str>, + use_worktree: bool, + grouping: SessionGrouping, + source: &str, + requester: Option<&str>, +) -> Result { + let working_dir = + std::env::current_dir().context("Failed to resolve current working directory")?; + let project = grouping + .project + .as_deref() + .and_then(normalize_group_label) + .unwrap_or_else(|| default_project_label(&working_dir)); + let task_group = grouping + .task_group + .as_deref() + .and_then(normalize_group_label) + .unwrap_or_else(|| default_task_group_label(task)); + let agent_type = HarnessKind::canonical_agent_type(agent_type); + + if let Some(profile_name) = profile_name { + cfg.resolve_agent_profile(profile_name)?; + } + if let Some(target_session_id) = target_session_id { + let _ = resolve_session(db, target_session_id)?; + } + + db.insert_remote_dispatch_request( + target_session_id, + task, + priority, + &agent_type, + profile_name, + &working_dir, + &project, + &task_group, + use_worktree, + source, + requester, + ) +} + +pub fn list_remote_dispatch_requests( + db: &StateStore, + include_processed: bool, + limit: usize, +) -> Result> { + db.list_remote_dispatch_requests(include_processed, limit) +} + pub async fn run_due_schedules( db: &StateStore, cfg: &Config, @@ -262,6 +320,133 @@ pub async fn run_due_schedules( run_due_schedules_with_runner_program(db, cfg, limit, &runner_program).await } +pub async fn run_remote_dispatch_requests( + db: &StateStore, + cfg: &Config, + limit: usize, +) -> Result> { + let requests = db.list_pending_remote_dispatch_requests(limit)?; + let runner_program = + std::env::current_exe().context("Failed to resolve ECC executable path")?; + run_remote_dispatch_requests_with_runner_program(db, cfg, requests, &runner_program).await +} + +async fn run_remote_dispatch_requests_with_runner_program( + db: &StateStore, + cfg: &Config, + requests: Vec, + runner_program: &Path, +) -> Result> { + let mut outcomes = Vec::new(); + + for request in requests { + let grouping = SessionGrouping { + project: normalize_group_label(&request.project), + task_group: normalize_group_label(&request.task_group), + }; + + let outcome = if let Some(target_session_id) = request.target_session_id.as_deref() { + match assign_session_in_dir_with_runner_program( + db, + cfg, + target_session_id, + &request.task, + &request.agent_type, + request.use_worktree, + &request.working_dir, + &runner_program, + request.profile_name.as_deref(), + grouping, + ) + .await + { + Ok(assignment) if assignment.action == AssignmentAction::DeferredSaturated => { + RemoteDispatchOutcome { + request_id: request.id, + task: request.task.clone(), + priority: request.priority, + target_session_id: request.target_session_id.clone(), + session_id: None, + action: RemoteDispatchAction::DeferredSaturated, + } + } + Ok(assignment) => { + db.record_remote_dispatch_success( + request.id, + Some(&assignment.session_id), + Some(assignment.action.label()), + )?; + RemoteDispatchOutcome { + request_id: request.id, + task: request.task.clone(), + priority: request.priority, + target_session_id: request.target_session_id.clone(), + session_id: Some(assignment.session_id), + action: RemoteDispatchAction::Assigned(assignment.action), + } + } + Err(error) => { + db.record_remote_dispatch_failure(request.id, &error.to_string())?; + RemoteDispatchOutcome { + request_id: request.id, + task: request.task.clone(), + priority: request.priority, + target_session_id: request.target_session_id.clone(), + session_id: None, + action: RemoteDispatchAction::Failed(error.to_string()), + } + } + } + } else { + match queue_session_in_dir_with_runner_program( + db, + cfg, + &request.task, + &request.agent_type, + request.use_worktree, + &request.working_dir, + &runner_program, + request.profile_name.as_deref(), + None, + grouping, + ) + .await + { + Ok(session_id) => { + db.record_remote_dispatch_success( + request.id, + Some(&session_id), + Some("spawned_top_level"), + )?; + RemoteDispatchOutcome { + request_id: request.id, + task: request.task.clone(), + priority: request.priority, + target_session_id: None, + session_id: Some(session_id), + action: RemoteDispatchAction::SpawnedTopLevel, + } + } + Err(error) => { + db.record_remote_dispatch_failure(request.id, &error.to_string())?; + RemoteDispatchOutcome { + request_id: request.id, + task: request.task.clone(), + priority: request.priority, + target_session_id: None, + session_id: None, + action: RemoteDispatchAction::Failed(error.to_string()), + } + } + } + }; + + outcomes.push(outcome); + } + + Ok(outcomes) +} + #[derive(Debug, Clone, Serialize, PartialEq, Eq)] pub struct TemplateLaunchStepOutcome { pub step_name: String, @@ -3076,6 +3261,25 @@ pub struct ScheduledRunOutcome { pub next_run_at: chrono::DateTime, } +#[derive(Debug, Clone, Serialize)] +pub struct RemoteDispatchOutcome { + pub request_id: i64, + pub task: String, + pub priority: TaskPriority, + pub target_session_id: Option, + pub session_id: Option, + pub action: RemoteDispatchAction, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "snake_case", tag = "type", content = "details")] +pub enum RemoteDispatchAction { + SpawnedTopLevel, + Assigned(AssignmentAction), + DeferredSaturated, + Failed(String), +} + pub struct RebalanceOutcome { pub from_session_id: String, pub message_id: i64, @@ -3130,7 +3334,8 @@ pub enum CoordinationHealth { EscalationRequired, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] pub enum AssignmentAction { Spawned, ReusedIdle, @@ -3138,6 +3343,17 @@ pub enum AssignmentAction { DeferredSaturated, } +impl AssignmentAction { + fn label(self) -> &'static str { + match self { + Self::Spawned => "spawned", + Self::ReusedIdle => "reused_idle", + Self::ReusedActive => "reused_active", + Self::DeferredSaturated => "deferred_saturated", + } + } +} + pub fn preview_assignment_for_task( db: &StateStore, cfg: &Config, @@ -4341,6 +4557,152 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "current_thread")] + async fn run_remote_dispatch_requests_prioritizes_critical_targeted_work() -> Result<()> { + let tempdir = TestDir::new("manager-run-remote-dispatch-priority")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let (fake_runner, _log_path) = write_fake_claude(tempdir.path())?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "lead".to_string(), + task: "Lead orchestration".to_string(), + project: "repo".to_string(), + task_group: "Lead orchestration".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: SessionMetrics::default(), + })?; + + let low = create_remote_dispatch_request( + &db, + &cfg, + "Low priority cleanup", + Some("lead"), + TaskPriority::Low, + "claude", + None, + true, + SessionGrouping::default(), + "cli", + None, + )?; + let critical = create_remote_dispatch_request( + &db, + &cfg, + "Critical production incident", + Some("lead"), + TaskPriority::Critical, + "claude", + None, + true, + SessionGrouping::default(), + "cli", + None, + )?; + + let outcomes = run_remote_dispatch_requests_with_runner_program( + &db, + &cfg, + db.list_pending_remote_dispatch_requests(1)?, + &fake_runner, + ) + .await?; + assert_eq!(outcomes.len(), 1); + assert_eq!(outcomes[0].request_id, critical.id); + assert!(matches!( + outcomes[0].action, + RemoteDispatchAction::Assigned(AssignmentAction::Spawned) + )); + + let low_request = db + .get_remote_dispatch_request(low.id)? + .context("low priority request should still exist")?; + assert_eq!( + low_request.status, + crate::session::RemoteDispatchStatus::Pending + ); + + let critical_request = db + .get_remote_dispatch_request(critical.id)? + .context("critical request should still exist")?; + assert_eq!( + critical_request.status, + crate::session::RemoteDispatchStatus::Dispatched + ); + assert!(critical_request.result_session_id.is_some()); + + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn run_remote_dispatch_requests_spawns_top_level_session_when_untargeted() -> Result<()> { + let tempdir = TestDir::new("manager-run-remote-dispatch-top-level")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let (fake_runner, _log_path) = write_fake_claude(tempdir.path())?; + + let request = db.insert_remote_dispatch_request( + None, + "Remote phone triage", + TaskPriority::High, + "claude", + None, + &repo_root, + "ecc-core", + "phone dispatch", + true, + "http", + Some("127.0.0.1"), + )?; + + let outcomes = run_remote_dispatch_requests_with_runner_program( + &db, + &cfg, + db.list_pending_remote_dispatch_requests(10)?, + &fake_runner, + ) + .await?; + assert_eq!(outcomes.len(), 1); + assert_eq!(outcomes[0].request_id, request.id); + assert!(matches!( + outcomes[0].action, + RemoteDispatchAction::SpawnedTopLevel + )); + + let request = db + .get_remote_dispatch_request(request.id)? + .context("remote request should still exist")?; + assert_eq!( + request.status, + crate::session::RemoteDispatchStatus::Dispatched + ); + let session_id = request + .result_session_id + .clone() + .context("spawned top-level request should record a session id")?; + let session = db + .get_session(&session_id)? + .context("spawned session should exist")?; + assert_eq!(session.project, "ecc-core"); + assert_eq!(session.task_group, "phone dispatch"); + + Ok(()) + } + #[tokio::test(flavor = "current_thread")] async fn stop_session_kills_process_and_optionally_cleans_worktree() -> Result<()> { let tempdir = TestDir::new("manager-stop-session")?; diff --git a/ecc2/src/session/mod.rs b/ecc2/src/session/mod.rs index 3b774a70..5175d9d9 100644 --- a/ecc2/src/session/mod.rs +++ b/ecc2/src/session/mod.rs @@ -395,6 +395,57 @@ pub struct ScheduledTask { pub updated_at: DateTime, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RemoteDispatchRequest { + pub id: i64, + pub target_session_id: Option, + pub task: String, + pub priority: crate::comms::TaskPriority, + pub agent_type: String, + pub profile_name: Option, + pub working_dir: PathBuf, + pub project: String, + pub task_group: String, + pub use_worktree: bool, + pub source: String, + pub requester: Option, + pub status: RemoteDispatchStatus, + pub result_session_id: Option, + pub result_action: Option, + pub error: Option, + pub created_at: DateTime, + pub updated_at: DateTime, + pub dispatched_at: Option>, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum RemoteDispatchStatus { + Pending, + Dispatched, + Failed, +} + +impl fmt::Display for RemoteDispatchStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Pending => write!(f, "pending"), + Self::Dispatched => write!(f, "dispatched"), + Self::Failed => write!(f, "failed"), + } + } +} + +impl RemoteDispatchStatus { + pub fn from_db_value(value: &str) -> Self { + match value { + "dispatched" => Self::Dispatched, + "failed" => Self::Failed, + _ => Self::Pending, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct FileActivityEntry { pub session_id: String, diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 7414f99a..d23187c9 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -18,8 +18,9 @@ use super::{ ContextGraphCompactionStats, ContextGraphEntity, ContextGraphEntityDetail, ContextGraphObservation, ContextGraphRecallEntry, ContextGraphRelation, ContextGraphSyncStats, ContextObservationPriority, DecisionLogEntry, FileActivityAction, FileActivityEntry, - HarnessKind, ScheduledTask, Session, SessionAgentProfile, SessionHarnessInfo, SessionMessage, - SessionMetrics, SessionState, WorktreeInfo, + HarnessKind, RemoteDispatchRequest, RemoteDispatchStatus, ScheduledTask, Session, + SessionAgentProfile, SessionHarnessInfo, SessionMessage, SessionMetrics, SessionState, + WorktreeInfo, }; pub struct StateStore { @@ -315,6 +316,28 @@ impl StateStore { updated_at TEXT NOT NULL ); + CREATE TABLE IF NOT EXISTS remote_dispatch_requests ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + target_session_id TEXT REFERENCES sessions(id) ON DELETE SET NULL, + task TEXT NOT NULL, + priority INTEGER NOT NULL DEFAULT 1, + agent_type TEXT NOT NULL, + profile_name TEXT, + working_dir TEXT NOT NULL, + project TEXT NOT NULL DEFAULT '', + task_group TEXT NOT NULL DEFAULT '', + use_worktree INTEGER NOT NULL DEFAULT 1, + source TEXT NOT NULL DEFAULT '', + requester TEXT, + status TEXT NOT NULL DEFAULT 'pending', + result_session_id TEXT, + result_action TEXT, + error TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + dispatched_at TEXT + ); + CREATE TABLE IF NOT EXISTS conflict_incidents ( id INTEGER PRIMARY KEY AUTOINCREMENT, conflict_key TEXT NOT NULL UNIQUE, @@ -377,6 +400,8 @@ impl StateStore { ON conflict_incidents(first_session_id, second_session_id, resolved_at, updated_at); CREATE INDEX IF NOT EXISTS idx_pending_worktree_queue_requested_at ON pending_worktree_queue(requested_at, session_id); + CREATE INDEX IF NOT EXISTS idx_remote_dispatch_requests_status_priority + ON remote_dispatch_requests(status, priority DESC, created_at, id); INSERT OR IGNORE INTO daemon_activity (id) VALUES (1); ", @@ -1164,6 +1189,153 @@ impl StateStore { Ok(()) } + #[allow(clippy::too_many_arguments)] + pub fn insert_remote_dispatch_request( + &self, + target_session_id: Option<&str>, + task: &str, + priority: crate::comms::TaskPriority, + agent_type: &str, + profile_name: Option<&str>, + working_dir: &Path, + project: &str, + task_group: &str, + use_worktree: bool, + source: &str, + requester: Option<&str>, + ) -> Result { + let now = chrono::Utc::now(); + self.conn.execute( + "INSERT INTO remote_dispatch_requests ( + target_session_id, + task, + priority, + agent_type, + profile_name, + working_dir, + project, + task_group, + use_worktree, + source, + requester, + status, + created_at, + updated_at + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, 'pending', ?12, ?13)", + rusqlite::params![ + target_session_id, + task, + task_priority_db_value(priority), + agent_type, + profile_name, + working_dir.display().to_string(), + project, + task_group, + if use_worktree { 1_i64 } else { 0_i64 }, + source, + requester, + now.to_rfc3339(), + now.to_rfc3339(), + ], + )?; + let id = self.conn.last_insert_rowid(); + self.get_remote_dispatch_request(id)?.ok_or_else(|| { + anyhow::anyhow!("Remote dispatch request {id} was not found after insert") + }) + } + + pub fn list_remote_dispatch_requests( + &self, + include_processed: bool, + limit: usize, + ) -> Result> { + let sql = if include_processed { + "SELECT id, target_session_id, task, priority, agent_type, profile_name, working_dir, + project, task_group, use_worktree, source, requester, status, + result_session_id, result_action, error, created_at, updated_at, dispatched_at + FROM remote_dispatch_requests + ORDER BY CASE status WHEN 'pending' THEN 0 WHEN 'failed' THEN 1 ELSE 2 END ASC, + priority DESC, created_at ASC, id ASC + LIMIT ?1" + } else { + "SELECT id, target_session_id, task, priority, agent_type, profile_name, working_dir, + project, task_group, use_worktree, source, requester, status, + result_session_id, result_action, error, created_at, updated_at, dispatched_at + FROM remote_dispatch_requests + WHERE status = 'pending' + ORDER BY priority DESC, created_at ASC, id ASC + LIMIT ?1" + }; + + let mut stmt = self.conn.prepare(sql)?; + let rows = stmt.query_map([limit as i64], map_remote_dispatch_request)?; + rows.collect::, _>>().map_err(Into::into) + } + + pub fn list_pending_remote_dispatch_requests( + &self, + limit: usize, + ) -> Result> { + self.list_remote_dispatch_requests(false, limit) + } + + pub fn get_remote_dispatch_request( + &self, + request_id: i64, + ) -> Result> { + self.conn + .query_row( + "SELECT id, target_session_id, task, priority, agent_type, profile_name, working_dir, + project, task_group, use_worktree, source, requester, status, + result_session_id, result_action, error, created_at, updated_at, dispatched_at + FROM remote_dispatch_requests + WHERE id = ?1", + [request_id], + map_remote_dispatch_request, + ) + .optional() + .map_err(Into::into) + } + + pub fn record_remote_dispatch_success( + &self, + request_id: i64, + result_session_id: Option<&str>, + result_action: Option<&str>, + ) -> Result<()> { + let now = chrono::Utc::now(); + self.conn.execute( + "UPDATE remote_dispatch_requests + SET status = 'dispatched', + result_session_id = ?2, + result_action = ?3, + error = NULL, + dispatched_at = ?4, + updated_at = ?4 + WHERE id = ?1", + rusqlite::params![ + request_id, + result_session_id, + result_action, + now.to_rfc3339() + ], + )?; + Ok(()) + } + + pub fn record_remote_dispatch_failure(&self, request_id: i64, error: &str) -> Result<()> { + let now = chrono::Utc::now(); + self.conn.execute( + "UPDATE remote_dispatch_requests + SET status = 'failed', + error = ?2, + updated_at = ?3 + WHERE id = ?1", + rusqlite::params![request_id, error, now.to_rfc3339()], + )?; + Ok(()) + } + pub fn update_metrics(&self, session_id: &str, metrics: &SessionMetrics) -> Result<()> { self.conn.execute( "UPDATE sessions @@ -3727,6 +3899,36 @@ fn map_scheduled_task(row: &rusqlite::Row<'_>) -> rusqlite::Result) -> rusqlite::Result { + let created_at = parse_store_timestamp(row.get::<_, String>(16)?, 16)?; + let updated_at = parse_store_timestamp(row.get::<_, String>(17)?, 17)?; + let dispatched_at = row + .get::<_, Option>(18)? + .map(|value| parse_store_timestamp(value, 18)) + .transpose()?; + Ok(RemoteDispatchRequest { + id: row.get(0)?, + target_session_id: normalize_optional_string(row.get(1)?), + task: row.get(2)?, + priority: task_priority_from_db_value(row.get::<_, i64>(3)?), + agent_type: row.get(4)?, + profile_name: normalize_optional_string(row.get(5)?), + working_dir: PathBuf::from(row.get::<_, String>(6)?), + project: row.get(7)?, + task_group: row.get(8)?, + use_worktree: row.get::<_, i64>(9)? != 0, + source: row.get(10)?, + requester: normalize_optional_string(row.get(11)?), + status: RemoteDispatchStatus::from_db_value(&row.get::<_, String>(12)?), + result_session_id: normalize_optional_string(row.get(13)?), + result_action: normalize_optional_string(row.get(14)?), + error: normalize_optional_string(row.get(15)?), + created_at, + updated_at, + dispatched_at, + }) +} + fn parse_timestamp_column( value: String, index: usize, @@ -3769,6 +3971,24 @@ fn default_input_params_json() -> String { "{}".to_string() } +fn task_priority_db_value(priority: crate::comms::TaskPriority) -> i64 { + match priority { + crate::comms::TaskPriority::Low => 0, + crate::comms::TaskPriority::Normal => 1, + crate::comms::TaskPriority::High => 2, + crate::comms::TaskPriority::Critical => 3, + } +} + +fn task_priority_from_db_value(value: i64) -> crate::comms::TaskPriority { + match value { + 0 => crate::comms::TaskPriority::Low, + 2 => crate::comms::TaskPriority::High, + 3 => crate::comms::TaskPriority::Critical, + _ => crate::comms::TaskPriority::Normal, + } +} + fn infer_file_activity_action(tool_name: &str) -> FileActivityAction { let tool_name = tool_name.trim().to_ascii_lowercase(); if tool_name.contains("read") {