From 491f213fbd2fb6fba52faa5854fd4d0e4bc093e1 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Thu, 9 Apr 2026 08:23:01 -0700 Subject: [PATCH] feat: enforce queued parallel worktree limits --- ecc2/src/config/mod.rs | 63 ++++++++- ecc2/src/session/daemon.rs | 4 + ecc2/src/session/manager.rs | 259 +++++++++++++++++++++++++++++++++++- ecc2/src/session/store.rs | 124 ++++++++++++++++- ecc2/src/tui/dashboard.rs | 43 +++++- 5 files changed, 473 insertions(+), 20 deletions(-) diff --git a/ecc2/src/config/mod.rs b/ecc2/src/config/mod.rs index a60f07f1..bc692582 100644 --- a/ecc2/src/config/mod.rs +++ b/ecc2/src/config/mod.rs @@ -68,6 +68,11 @@ pub struct PaneNavigationConfig { pub move_right: String, } +#[derive(Debug, Default, Deserialize)] +struct ProjectWorktreeConfigOverride { + max_parallel_worktrees: Option, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PaneNavigationAction { FocusSlot(usize), @@ -155,14 +160,47 @@ impl Config { pub fn load() -> Result { let config_path = Self::config_path(); + let project_path = std::env::current_dir() + .ok() + .and_then(|cwd| Self::project_config_path_from(&cwd)); + Self::load_from_paths(&config_path, project_path.as_deref()) + } - if config_path.exists() { - let content = std::fs::read_to_string(&config_path)?; - let config: Config = toml::from_str(&content)?; - Ok(config) + fn load_from_paths( + config_path: &std::path::Path, + project_override_path: Option<&std::path::Path>, + ) -> Result { + let mut config = if config_path.exists() { + let content = std::fs::read_to_string(config_path)?; + toml::from_str(&content)? } else { - Ok(Config::default()) + Config::default() + }; + + if let Some(project_path) = project_override_path.filter(|path| path.exists()) { + let content = std::fs::read_to_string(project_path)?; + let overrides: ProjectWorktreeConfigOverride = toml::from_str(&content)?; + if let Some(limit) = overrides.max_parallel_worktrees { + config.max_parallel_worktrees = limit; + } } + + Ok(config) + } + + fn project_config_path_from(start: &std::path::Path) -> Option { + let global = Self::config_path(); + let mut current = Some(start); + + while let Some(path) = current { + let candidate = path.join(".claude").join("ecc2.toml"); + if candidate.exists() && candidate != global { + return Some(candidate); + } + current = path.parent(); + } + + None } pub fn save(&self) -> Result<()> { @@ -419,6 +457,21 @@ theme = "Dark" assert_eq!(config.worktree_branch_prefix, "bots/ecc"); } + #[test] + fn project_worktree_limit_override_replaces_global_limit() { + let tempdir = std::env::temp_dir().join(format!("ecc2-config-{}", Uuid::new_v4())); + let global_path = tempdir.join("global.toml"); + let project_path = tempdir.join("project.toml"); + std::fs::create_dir_all(&tempdir).unwrap(); + std::fs::write(&global_path, "max_parallel_worktrees = 6\n").unwrap(); + std::fs::write(&project_path, "max_parallel_worktrees = 2\n").unwrap(); + + let config = Config::load_from_paths(&global_path, Some(&project_path)).unwrap(); + assert_eq!(config.max_parallel_worktrees, 2); + + let _ = std::fs::remove_dir_all(tempdir); + } + #[test] fn pane_navigation_deserializes_from_toml() { let config: Config = toml::from_str( diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index b8e4d7a3..e88a92bb 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -39,6 +39,10 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> { tracing::error!("Worktree auto-prune pass failed: {e}"); } + if let Err(e) = manager::activate_pending_worktree_sessions(&db, &cfg).await { + tracing::error!("Queued worktree activation pass failed: {e}"); + } + time::sleep(heartbeat_interval).await; } } diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 016abde9..964e16d2 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -963,6 +963,114 @@ pub async fn run_session( Ok(()) } +pub async fn activate_pending_worktree_sessions( + db: &StateStore, + cfg: &Config, +) -> Result> { + activate_pending_worktree_sessions_with( + db, + cfg, + |cfg, session_id, task, agent_type, cwd| async move { + tokio::spawn(async move { + if let Err(error) = run_session(&cfg, &session_id, &task, &agent_type, &cwd).await { + tracing::error!( + "Failed to start queued worktree session {}: {error}", + session_id + ); + } + }); + Ok(()) + }, + ) + .await +} + +async fn activate_pending_worktree_sessions_with( + db: &StateStore, + cfg: &Config, + spawn: F, +) -> Result> +where + F: Fn(Config, String, String, String, PathBuf) -> Fut, + Fut: std::future::Future>, +{ + let mut available_slots = cfg + .max_parallel_worktrees + .saturating_sub(attached_worktree_count(db)?); + if available_slots == 0 { + return Ok(Vec::new()); + } + + let mut started = Vec::new(); + for request in db.pending_worktree_queue(available_slots)? { + let Some(session) = db.get_session(&request.session_id)? else { + db.dequeue_pending_worktree(&request.session_id)?; + continue; + }; + + if session.worktree.is_some() + || session.pid.is_some() + || session.state != SessionState::Pending + { + db.dequeue_pending_worktree(&session.id)?; + continue; + } + + let worktree = + match worktree::create_for_session_in_repo(&session.id, cfg, &request.repo_root) { + Ok(worktree) => worktree, + Err(error) => { + db.dequeue_pending_worktree(&session.id)?; + db.update_state(&session.id, &SessionState::Failed)?; + tracing::warn!( + "Failed to create queued worktree for session {}: {error}", + session.id + ); + continue; + } + }; + + if let Err(error) = db.attach_worktree(&session.id, &worktree) { + let _ = worktree::remove(&worktree); + db.dequeue_pending_worktree(&session.id)?; + db.update_state(&session.id, &SessionState::Failed)?; + return Err(error.context(format!( + "Failed to attach queued worktree for session {}", + session.id + ))); + } + + if let Err(error) = spawn( + cfg.clone(), + session.id.clone(), + session.task.clone(), + session.agent_type.clone(), + worktree.path.clone(), + ) + .await + { + let _ = worktree::remove(&worktree); + let _ = db.clear_worktree_to_dir(&session.id, &request.repo_root); + db.dequeue_pending_worktree(&session.id)?; + db.update_state(&session.id, &SessionState::Failed)?; + tracing::warn!( + "Failed to start queued worktree session {}: {error}", + session.id + ); + continue; + } + + db.dequeue_pending_worktree(&session.id)?; + started.push(session.id); + available_slots = available_slots.saturating_sub(1); + if available_slots == 0 { + break; + } + } + + Ok(started) +} + async fn queue_session_in_dir( db: &StateStore, cfg: &Config, @@ -992,9 +1100,14 @@ async fn queue_session_in_dir_with_runner_program( repo_root: &Path, runner_program: &Path, ) -> Result { - let session = build_session_record(task, agent_type, use_worktree, cfg, repo_root)?; + let session = build_session_record(db, task, agent_type, use_worktree, cfg, repo_root)?; db.insert_session(&session)?; + if use_worktree && session.worktree.is_none() { + db.enqueue_pending_worktree(&session.id, repo_root)?; + return Ok(session.id); + } + let working_dir = session .worktree .as_ref() @@ -1024,6 +1137,7 @@ async fn queue_session_in_dir_with_runner_program( } fn build_session_record( + db: &StateStore, task: &str, agent_type: &str, use_worktree: bool, @@ -1033,7 +1147,7 @@ fn build_session_record( let id = uuid::Uuid::new_v4().to_string()[..8].to_string(); let now = chrono::Utc::now(); - let worktree = if use_worktree { + let worktree = if use_worktree && attached_worktree_count(db)? < cfg.max_parallel_worktrees { Some(worktree::create_for_session_in_repo(&id, cfg, repo_root)?) } else { None @@ -1067,10 +1181,15 @@ async fn create_session_in_dir( repo_root: &Path, agent_program: &Path, ) -> Result { - let session = build_session_record(task, agent_type, use_worktree, cfg, repo_root)?; + let session = build_session_record(db, task, agent_type, use_worktree, cfg, repo_root)?; db.insert_session(&session)?; + if use_worktree && session.worktree.is_none() { + db.enqueue_pending_worktree(&session.id, repo_root)?; + return Ok(session.id); + } + let working_dir = session .worktree .as_ref() @@ -1095,6 +1214,14 @@ async fn create_session_in_dir( } } +fn attached_worktree_count(db: &StateStore) -> Result { + Ok(db + .list_sessions()? + .into_iter() + .filter(|session| session.worktree.is_some()) + .count()) +} + async fn spawn_session_runner( task: &str, session_id: &str, @@ -1296,6 +1423,7 @@ fn stop_session_recorded(db: &StateStore, session: &Session, cleanup_worktree: b if cleanup_worktree { if let Some(worktree) = session.worktree.as_ref() { crate::worktree::remove(worktree)?; + db.clear_worktree_to_dir(&session.id, &session.working_dir)?; } } @@ -2095,6 +2223,131 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "current_thread")] + async fn create_session_with_worktree_limit_queues_without_starting_runner() -> Result<()> { + let tempdir = TestDir::new("manager-worktree-limit-queue")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let mut cfg = build_config(tempdir.path()); + cfg.max_parallel_worktrees = 1; + let db = StateStore::open(&cfg.db_path)?; + let (fake_claude, log_path) = write_fake_claude(tempdir.path())?; + + let first_id = create_session_in_dir( + &db, + &cfg, + "active worktree", + "claude", + true, + &repo_root, + &fake_claude, + ) + .await?; + let second_id = create_session_in_dir( + &db, + &cfg, + "queued worktree", + "claude", + true, + &repo_root, + &fake_claude, + ) + .await?; + + let first = db + .get_session(&first_id)? + .context("first session missing")?; + assert_eq!(first.state, SessionState::Running); + assert!(first.worktree.is_some()); + + let second = db + .get_session(&second_id)? + .context("second session missing")?; + assert_eq!(second.state, SessionState::Pending); + assert!(second.pid.is_none()); + assert!(second.worktree.is_none()); + assert!(db.pending_worktree_queue_contains(&second_id)?); + + let log = wait_for_file(&log_path)?; + assert!(log.contains("active worktree")); + assert!(!log.contains("queued worktree")); + + stop_session_with_options(&db, &first_id, true).await?; + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn activate_pending_worktree_sessions_starts_queued_session_when_slot_opens() -> Result<()> + { + let tempdir = TestDir::new("manager-worktree-limit-activate")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let mut cfg = build_config(tempdir.path()); + cfg.max_parallel_worktrees = 1; + let db = StateStore::open(&cfg.db_path)?; + let (fake_claude, _) = write_fake_claude(tempdir.path())?; + + let first_id = create_session_in_dir( + &db, + &cfg, + "active worktree", + "claude", + true, + &repo_root, + &fake_claude, + ) + .await?; + let second_id = create_session_in_dir( + &db, + &cfg, + "queued worktree", + "claude", + true, + &repo_root, + &fake_claude, + ) + .await?; + + stop_session_with_options(&db, &first_id, true).await?; + + let launch_log = tempdir.path().join("queued-launch.log"); + let started = + activate_pending_worktree_sessions_with(&db, &cfg, |_, session_id, task, _, cwd| { + let launch_log = launch_log.clone(); + async move { + fs::write( + &launch_log, + format!("{session_id}\n{task}\n{}\n", cwd.display()), + )?; + Ok(()) + } + }) + .await?; + + assert_eq!(started, vec![second_id.clone()]); + assert!(!db.pending_worktree_queue_contains(&second_id)?); + + let second = db + .get_session(&second_id)? + .context("queued session missing")?; + let worktree = second + .worktree + .context("queued session should gain worktree")?; + assert_eq!(second.state, SessionState::Pending); + assert!(worktree.path.exists()); + + let launch = fs::read_to_string(&launch_log)?; + assert!(launch.contains(&second_id)); + assert!(launch.contains("queued worktree")); + assert!(launch.contains(worktree.path.to_string_lossy().as_ref())); + + crate::worktree::remove(&worktree)?; + db.clear_worktree_to_dir(&second_id, &repo_root)?; + Ok(()) + } + #[test] fn enforce_budget_hard_limits_stops_active_sessions_without_cleaning_worktrees() -> Result<()> { let tempdir = TestDir::new("manager-budget-pause")?; diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 18d6610c..3b3244ca 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -14,12 +14,20 @@ use crate::observability::{ToolCallEvent, ToolLogEntry, ToolLogPage}; use super::output::{OutputLine, OutputStream, OUTPUT_BUFFER_LIMIT}; use super::{ FileActivityAction, FileActivityEntry, Session, SessionMessage, SessionMetrics, SessionState, + WorktreeInfo, }; pub struct StateStore { conn: Connection, } +#[derive(Debug, Clone)] +pub struct PendingWorktreeRequest { + pub session_id: String, + pub repo_root: PathBuf, + pub _requested_at: chrono::DateTime, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct FileActivityOverlap { pub path: String, @@ -183,6 +191,12 @@ impl StateStore { timestamp TEXT NOT NULL ); + CREATE TABLE IF NOT EXISTS pending_worktree_queue ( + session_id TEXT PRIMARY KEY REFERENCES sessions(id) ON DELETE CASCADE, + repo_root TEXT NOT NULL, + requested_at TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS daemon_activity ( id INTEGER PRIMARY KEY CHECK(id = 1), last_dispatch_at TEXT, @@ -215,6 +229,8 @@ impl StateStore { CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_session, read); CREATE INDEX IF NOT EXISTS idx_session_output_session ON session_output(session_id, id); + CREATE INDEX IF NOT EXISTS idx_pending_worktree_queue_requested_at + ON pending_worktree_queue(requested_at, session_id); INSERT OR IGNORE INTO daemon_activity (id) VALUES (1); ", @@ -575,15 +591,29 @@ impl StateStore { } pub fn clear_worktree(&self, session_id: &str) -> Result<()> { + let working_dir: String = self.conn.query_row( + "SELECT working_dir FROM sessions WHERE id = ?1", + [session_id], + |row| row.get(0), + )?; + self.clear_worktree_to_dir(session_id, Path::new(&working_dir)) + } + + pub fn clear_worktree_to_dir(&self, session_id: &str, working_dir: &Path) -> Result<()> { let updated = self.conn.execute( "UPDATE sessions - SET worktree_path = NULL, + SET working_dir = ?1, + worktree_path = NULL, worktree_branch = NULL, worktree_base = NULL, - updated_at = ?1, - last_heartbeat_at = ?1 - WHERE id = ?2", - rusqlite::params![chrono::Utc::now().to_rfc3339(), session_id], + updated_at = ?2, + last_heartbeat_at = ?2 + WHERE id = ?3", + rusqlite::params![ + working_dir.to_string_lossy().to_string(), + chrono::Utc::now().to_rfc3339(), + session_id + ], )?; if updated == 0 { @@ -593,6 +623,90 @@ impl StateStore { Ok(()) } + pub fn attach_worktree(&self, session_id: &str, worktree: &WorktreeInfo) -> Result<()> { + let updated = self.conn.execute( + "UPDATE sessions + SET working_dir = ?1, + worktree_path = ?2, + worktree_branch = ?3, + worktree_base = ?4, + updated_at = ?5, + last_heartbeat_at = ?5 + WHERE id = ?6", + rusqlite::params![ + worktree.path.to_string_lossy().to_string(), + worktree.path.to_string_lossy().to_string(), + worktree.branch, + worktree.base_branch, + chrono::Utc::now().to_rfc3339(), + session_id + ], + )?; + + if updated == 0 { + anyhow::bail!("Session not found: {session_id}"); + } + + Ok(()) + } + + pub fn enqueue_pending_worktree(&self, session_id: &str, repo_root: &Path) -> Result<()> { + self.conn.execute( + "INSERT OR REPLACE INTO pending_worktree_queue (session_id, repo_root, requested_at) + VALUES (?1, ?2, ?3)", + rusqlite::params![ + session_id, + repo_root.to_string_lossy().to_string(), + chrono::Utc::now().to_rfc3339() + ], + )?; + Ok(()) + } + + pub fn dequeue_pending_worktree(&self, session_id: &str) -> Result<()> { + self.conn.execute( + "DELETE FROM pending_worktree_queue WHERE session_id = ?1", + [session_id], + )?; + Ok(()) + } + + pub fn pending_worktree_queue_contains(&self, session_id: &str) -> Result { + Ok(self + .conn + .query_row( + "SELECT 1 FROM pending_worktree_queue WHERE session_id = ?1", + [session_id], + |_| Ok(()), + ) + .optional()? + .is_some()) + } + + pub fn pending_worktree_queue(&self, limit: usize) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT session_id, repo_root, requested_at + FROM pending_worktree_queue + ORDER BY requested_at ASC, session_id ASC + LIMIT ?1", + )?; + + let rows = stmt + .query_map([limit as i64], |row| { + let requested_at: String = row.get(2)?; + Ok(PendingWorktreeRequest { + session_id: row.get(0)?, + repo_root: PathBuf::from(row.get::<_, String>(1)?), + _requested_at: chrono::DateTime::parse_from_rfc3339(&requested_at) + .unwrap_or_default() + .with_timezone(&chrono::Utc), + }) + })? + .collect::, _>>()?; + + Ok(rows) + } + pub fn update_metrics(&self, session_id: &str, metrics: &SessionMetrics) -> Result<()> { self.conn.execute( "UPDATE sessions diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index 7b06afb0..eda90b22 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -1673,10 +1673,21 @@ impl Dashboard { self.refresh(); self.sync_selection_by_id(Some(&session_id)); - self.set_operator_note(format!( - "spawned session {}", - format_session_id(&session_id) - )); + let queued_for_worktree = self + .db + .pending_worktree_queue_contains(&session_id) + .unwrap_or(false); + if queued_for_worktree { + self.set_operator_note(format!( + "queued session {} pending worktree slot", + format_session_id(&session_id) + )); + } else { + self.set_operator_note(format!( + "spawned session {}", + format_session_id(&session_id) + )); + } self.reset_output_view(); self.sync_selected_output(); self.sync_selected_diff(); @@ -2565,7 +2576,15 @@ impl Dashboard { let preferred_selection = post_spawn_selection_id(source_session_id.as_deref(), &created_ids); self.refresh_after_spawn(preferred_selection.as_deref()); - let mut note = build_spawn_note(&plan, created_ids.len()); + let queued_count = created_ids + .iter() + .filter(|session_id| { + self.db + .pending_worktree_queue_contains(session_id) + .unwrap_or(false) + }) + .count(); + let mut note = build_spawn_note(&plan, created_ids.len(), queued_count); if let Some(layout_note) = self.auto_split_layout_after_spawn(created_ids.len()) { note.push_str(" | "); note.push_str(&layout_note); @@ -2770,6 +2789,10 @@ impl Dashboard { } } + if let Err(error) = manager::activate_pending_worktree_sessions(&self.db, &self.cfg).await { + tracing::warn!("Failed to activate queued worktree sessions: {error}"); + } + self.sync_from_store(); } @@ -4768,16 +4791,22 @@ fn expand_spawn_tasks(task: &str, count: usize) -> Vec { .collect() } -fn build_spawn_note(plan: &SpawnPlan, created_count: usize) -> String { +fn build_spawn_note(plan: &SpawnPlan, created_count: usize, queued_count: usize) -> String { let task = truncate_for_dashboard(&plan.task, 72); - if plan.spawn_count < plan.requested_count { + let mut note = if plan.spawn_count < plan.requested_count { format!( "spawned {created_count} session(s) for {task} (requested {}, capped at {})", plan.requested_count, plan.spawn_count ) } else { format!("spawned {created_count} session(s) for {task}") + }; + + if queued_count > 0 { + note.push_str(&format!(" | {queued_count} pending worktree slot")); } + + note } fn post_spawn_selection_id(