feat: add ecc2 remote dispatch intake

This commit is contained in:
Affaan Mustafa
2026-04-10 09:21:30 -07:00
parent bbed46d3eb
commit 7809518612
6 changed files with 1098 additions and 6 deletions

View File

@@ -11,7 +11,8 @@ use clap::Parser;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::{Path, PathBuf};
use tracing_subscriber::EnvFilter;
@@ -327,6 +328,11 @@ enum Commands {
#[command(subcommand)]
command: ScheduleCommands,
},
/// Manage remote task intake and dispatch
Remote {
#[command(subcommand)]
command: RemoteCommands,
},
/// Export sessions, tool spans, and metrics in OTLP-compatible JSON
ExportOtel {
/// Session ID or alias. Omit to export all sessions.
@@ -442,6 +448,69 @@ enum ScheduleCommands {
},
}
#[derive(clap::Subcommand, Debug)]
enum RemoteCommands {
/// Queue a remote task request
Add {
/// Task description to dispatch
#[arg(short, long)]
task: String,
/// Optional lead session ID or alias to route through
#[arg(long)]
to_session: Option<String>,
/// Task priority
#[arg(long, value_enum, default_value_t = TaskPriorityArg::Normal)]
priority: TaskPriorityArg,
/// Agent type (defaults to ECC default agent)
#[arg(short, long)]
agent: Option<String>,
/// Agent profile defined in ecc2.toml
#[arg(long)]
profile: Option<String>,
#[command(flatten)]
worktree: WorktreePolicyArgs,
/// Optional project grouping override
#[arg(long)]
project: Option<String>,
/// Optional task-group grouping override
#[arg(long)]
task_group: Option<String>,
/// Emit machine-readable JSON instead of the human summary
#[arg(long)]
json: bool,
},
/// List queued remote task requests
List {
/// Include already dispatched or failed requests
#[arg(long)]
all: bool,
/// Maximum requests to return
#[arg(long, default_value_t = 20)]
limit: usize,
/// Emit machine-readable JSON instead of the human summary
#[arg(long)]
json: bool,
},
/// Dispatch queued remote task requests now
Run {
/// Maximum queued requests to process
#[arg(long, default_value_t = 20)]
limit: usize,
/// Emit machine-readable JSON instead of the human summary
#[arg(long)]
json: bool,
},
/// Serve a token-authenticated remote dispatch intake endpoint
Serve {
/// Address to bind, for example 127.0.0.1:8787
#[arg(long, default_value = "127.0.0.1:8787")]
bind: String,
/// Bearer token required for POST /dispatch
#[arg(long)]
token: String,
},
}
#[derive(clap::Subcommand, Debug)]
enum GraphCommands {
/// Create or update a graph entity
@@ -656,7 +725,8 @@ enum MessageKindArg {
Conflict,
}
#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)]
#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
enum TaskPriorityArg {
Low,
Normal,
@@ -734,6 +804,18 @@ struct GraphConnectorStatusReport {
connectors: Vec<GraphConnectorStatus>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
struct RemoteDispatchHttpRequest {
task: String,
to_session: Option<String>,
priority: Option<TaskPriorityArg>,
agent: Option<String>,
profile: Option<String>,
use_worktree: Option<bool>,
project: Option<String>,
task_group: Option<String>,
}
#[derive(Debug, Clone, Default, Deserialize)]
#[serde(default)]
struct JsonlMemoryConnectorRecord {
@@ -1870,6 +1952,106 @@ async fn main() -> Result<()> {
}
}
},
Some(Commands::Remote { command }) => match command {
RemoteCommands::Add {
task,
to_session,
priority,
agent,
profile,
worktree,
project,
task_group,
json,
} => {
let target_session_id = to_session
.as_deref()
.map(|value| resolve_session_id(&db, value))
.transpose()?;
let request = session::manager::create_remote_dispatch_request(
&db,
&cfg,
&task,
target_session_id.as_deref(),
priority.into(),
agent.as_deref().unwrap_or(&cfg.default_agent),
profile.as_deref(),
worktree.resolve(&cfg),
session::SessionGrouping {
project,
task_group,
},
"cli",
None,
)?;
if json {
println!("{}", serde_json::to_string_pretty(&request)?);
} else {
println!(
"Queued remote request #{} [{}] {}",
request.id, request.priority, request.task
);
if let Some(target_session_id) = request.target_session_id.as_deref() {
println!("- target {}", short_session(target_session_id));
}
}
}
RemoteCommands::List { all, limit, json } => {
let requests = session::manager::list_remote_dispatch_requests(&db, all, limit)?;
if json {
println!("{}", serde_json::to_string_pretty(&requests)?);
} else if requests.is_empty() {
println!("No remote dispatch requests");
} else {
println!("Remote dispatch requests");
for request in requests {
let target = request
.target_session_id
.as_deref()
.map(short_session)
.unwrap_or_else(|| "new-session".to_string());
println!(
"#{} [{}] {} -> {} | {}",
request.id, request.priority, request.status, target, request.task
);
}
}
}
RemoteCommands::Run { limit, json } => {
let outcomes =
session::manager::run_remote_dispatch_requests(&db, &cfg, limit).await?;
if json {
println!("{}", serde_json::to_string_pretty(&outcomes)?);
} else if outcomes.is_empty() {
println!("No pending remote dispatch requests");
} else {
println!("Processed {} remote request(s)", outcomes.len());
for outcome in outcomes {
let target = outcome
.target_session_id
.as_deref()
.map(short_session)
.unwrap_or_else(|| "new-session".to_string());
let result = outcome
.session_id
.as_deref()
.map(short_session)
.unwrap_or_else(|| "-".to_string());
println!(
"#{} [{}] {} -> {} | {}",
outcome.request_id,
outcome.priority,
target,
result,
format_remote_dispatch_action(&outcome.action)
);
}
}
}
RemoteCommands::Serve { bind, token } => {
run_remote_dispatch_server(&db, &cfg, &bind, &token)?;
}
},
Some(Commands::Daemon) => {
println!("Starting ECC daemon...");
session::daemon::run(db, cfg).await?;
@@ -2894,10 +3076,251 @@ fn build_message(
})
}
fn format_remote_dispatch_action(action: &session::manager::RemoteDispatchAction) -> String {
match action {
session::manager::RemoteDispatchAction::SpawnedTopLevel => "spawned top-level".to_string(),
session::manager::RemoteDispatchAction::Assigned(action) => match action {
session::manager::AssignmentAction::Spawned => "spawned delegate".to_string(),
session::manager::AssignmentAction::ReusedIdle => "reused idle delegate".to_string(),
session::manager::AssignmentAction::ReusedActive => {
"reused active delegate".to_string()
}
session::manager::AssignmentAction::DeferredSaturated => {
"deferred (saturated)".to_string()
}
},
session::manager::RemoteDispatchAction::DeferredSaturated => {
"deferred (saturated)".to_string()
}
session::manager::RemoteDispatchAction::Failed(error) => format!("failed: {error}"),
}
}
fn short_session(session_id: &str) -> String {
session_id.chars().take(8).collect()
}
fn run_remote_dispatch_server(
db: &session::store::StateStore,
cfg: &config::Config,
bind_addr: &str,
bearer_token: &str,
) -> Result<()> {
let listener = TcpListener::bind(bind_addr)
.with_context(|| format!("Failed to bind remote dispatch server on {bind_addr}"))?;
println!("Remote dispatch server listening on http://{bind_addr}");
for stream in listener.incoming() {
match stream {
Ok(mut stream) => {
if let Err(error) =
handle_remote_dispatch_connection(&mut stream, db, cfg, bearer_token)
{
let _ = write_http_response(
&mut stream,
500,
"application/json",
&serde_json::json!({
"error": error.to_string(),
})
.to_string(),
);
}
}
Err(error) => tracing::warn!("Remote dispatch accept failed: {error}"),
}
}
Ok(())
}
fn handle_remote_dispatch_connection(
stream: &mut TcpStream,
db: &session::store::StateStore,
cfg: &config::Config,
bearer_token: &str,
) -> Result<()> {
let (method, path, headers, body) = read_http_request(stream)?;
match (method.as_str(), path.as_str()) {
("GET", "/health") => write_http_response(
stream,
200,
"application/json",
&serde_json::json!({"ok": true}).to_string(),
),
("POST", "/dispatch") => {
let auth = headers
.get("authorization")
.map(String::as_str)
.unwrap_or_default();
let expected = format!("Bearer {bearer_token}");
if auth != expected {
return write_http_response(
stream,
401,
"application/json",
&serde_json::json!({"error": "unauthorized"}).to_string(),
);
}
let payload: RemoteDispatchHttpRequest =
serde_json::from_slice(&body).context("Invalid remote dispatch JSON body")?;
if payload.task.trim().is_empty() {
return write_http_response(
stream,
400,
"application/json",
&serde_json::json!({"error": "task is required"}).to_string(),
);
}
let target_session_id = match payload
.to_session
.as_deref()
.map(|value| resolve_session_id(db, value))
.transpose()
{
Ok(value) => value,
Err(error) => {
return write_http_response(
stream,
400,
"application/json",
&serde_json::json!({"error": error.to_string()}).to_string(),
);
}
};
let requester = stream.peer_addr().ok().map(|addr| addr.ip().to_string());
let request = match session::manager::create_remote_dispatch_request(
db,
cfg,
&payload.task,
target_session_id.as_deref(),
payload.priority.unwrap_or(TaskPriorityArg::Normal).into(),
payload.agent.as_deref().unwrap_or(&cfg.default_agent),
payload.profile.as_deref(),
payload.use_worktree.unwrap_or(cfg.auto_create_worktrees),
session::SessionGrouping {
project: payload.project,
task_group: payload.task_group,
},
"http",
requester.as_deref(),
) {
Ok(request) => request,
Err(error) => {
return write_http_response(
stream,
400,
"application/json",
&serde_json::json!({"error": error.to_string()}).to_string(),
);
}
};
write_http_response(
stream,
202,
"application/json",
&serde_json::to_string(&request)?,
)
}
_ => write_http_response(
stream,
404,
"application/json",
&serde_json::json!({"error": "not found"}).to_string(),
),
}
}
fn read_http_request(
stream: &mut TcpStream,
) -> Result<(String, String, BTreeMap<String, String>, Vec<u8>)> {
let mut buffer = Vec::new();
let mut temp = [0_u8; 1024];
let header_end = loop {
let read = stream.read(&mut temp)?;
if read == 0 {
anyhow::bail!("Unexpected EOF while reading HTTP request");
}
buffer.extend_from_slice(&temp[..read]);
if let Some(index) = buffer.windows(4).position(|window| window == b"\r\n\r\n") {
break index + 4;
}
if buffer.len() > 64 * 1024 {
anyhow::bail!("HTTP request headers too large");
}
};
let header_text = String::from_utf8(buffer[..header_end].to_vec())
.context("HTTP request headers were not valid UTF-8")?;
let mut lines = header_text.split("\r\n");
let request_line = lines
.next()
.filter(|line| !line.trim().is_empty())
.ok_or_else(|| anyhow::anyhow!("Missing HTTP request line"))?;
let mut request_parts = request_line.split_whitespace();
let method = request_parts
.next()
.ok_or_else(|| anyhow::anyhow!("Missing HTTP method"))?
.to_string();
let path = request_parts
.next()
.ok_or_else(|| anyhow::anyhow!("Missing HTTP path"))?
.to_string();
let mut headers = BTreeMap::new();
for line in lines {
if line.is_empty() {
break;
}
if let Some((key, value)) = line.split_once(':') {
headers.insert(key.trim().to_ascii_lowercase(), value.trim().to_string());
}
}
let content_length = headers
.get("content-length")
.and_then(|value| value.parse::<usize>().ok())
.unwrap_or(0);
let mut body = buffer[header_end..].to_vec();
while body.len() < content_length {
let read = stream.read(&mut temp)?;
if read == 0 {
anyhow::bail!("Unexpected EOF while reading HTTP request body");
}
body.extend_from_slice(&temp[..read]);
}
body.truncate(content_length);
Ok((method, path, headers, body))
}
fn write_http_response(
stream: &mut TcpStream,
status: u16,
content_type: &str,
body: &str,
) -> Result<()> {
let status_text = match status {
200 => "OK",
202 => "Accepted",
400 => "Bad Request",
401 => "Unauthorized",
404 => "Not Found",
_ => "Internal Server Error",
};
write!(
stream,
"HTTP/1.1 {status} {status_text}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
body.len(),
body
)?;
stream.flush()?;
Ok(())
}
fn format_coordination_status(
status: &session::manager::CoordinationStatus,
json: bool,