feat: enforce queued parallel worktree limits

This commit is contained in:
Affaan Mustafa
2026-04-09 08:23:01 -07:00
parent 941d4e6172
commit 491f213fbd
5 changed files with 473 additions and 20 deletions

View File

@@ -963,6 +963,114 @@ pub async fn run_session(
Ok(())
}
pub async fn activate_pending_worktree_sessions(
db: &StateStore,
cfg: &Config,
) -> Result<Vec<String>> {
activate_pending_worktree_sessions_with(
db,
cfg,
|cfg, session_id, task, agent_type, cwd| async move {
tokio::spawn(async move {
if let Err(error) = run_session(&cfg, &session_id, &task, &agent_type, &cwd).await {
tracing::error!(
"Failed to start queued worktree session {}: {error}",
session_id
);
}
});
Ok(())
},
)
.await
}
async fn activate_pending_worktree_sessions_with<F, Fut>(
db: &StateStore,
cfg: &Config,
spawn: F,
) -> Result<Vec<String>>
where
F: Fn(Config, String, String, String, PathBuf) -> Fut,
Fut: std::future::Future<Output = Result<()>>,
{
let mut available_slots = cfg
.max_parallel_worktrees
.saturating_sub(attached_worktree_count(db)?);
if available_slots == 0 {
return Ok(Vec::new());
}
let mut started = Vec::new();
for request in db.pending_worktree_queue(available_slots)? {
let Some(session) = db.get_session(&request.session_id)? else {
db.dequeue_pending_worktree(&request.session_id)?;
continue;
};
if session.worktree.is_some()
|| session.pid.is_some()
|| session.state != SessionState::Pending
{
db.dequeue_pending_worktree(&session.id)?;
continue;
}
let worktree =
match worktree::create_for_session_in_repo(&session.id, cfg, &request.repo_root) {
Ok(worktree) => worktree,
Err(error) => {
db.dequeue_pending_worktree(&session.id)?;
db.update_state(&session.id, &SessionState::Failed)?;
tracing::warn!(
"Failed to create queued worktree for session {}: {error}",
session.id
);
continue;
}
};
if let Err(error) = db.attach_worktree(&session.id, &worktree) {
let _ = worktree::remove(&worktree);
db.dequeue_pending_worktree(&session.id)?;
db.update_state(&session.id, &SessionState::Failed)?;
return Err(error.context(format!(
"Failed to attach queued worktree for session {}",
session.id
)));
}
if let Err(error) = spawn(
cfg.clone(),
session.id.clone(),
session.task.clone(),
session.agent_type.clone(),
worktree.path.clone(),
)
.await
{
let _ = worktree::remove(&worktree);
let _ = db.clear_worktree_to_dir(&session.id, &request.repo_root);
db.dequeue_pending_worktree(&session.id)?;
db.update_state(&session.id, &SessionState::Failed)?;
tracing::warn!(
"Failed to start queued worktree session {}: {error}",
session.id
);
continue;
}
db.dequeue_pending_worktree(&session.id)?;
started.push(session.id);
available_slots = available_slots.saturating_sub(1);
if available_slots == 0 {
break;
}
}
Ok(started)
}
async fn queue_session_in_dir(
db: &StateStore,
cfg: &Config,
@@ -992,9 +1100,14 @@ async fn queue_session_in_dir_with_runner_program(
repo_root: &Path,
runner_program: &Path,
) -> Result<String> {
let session = build_session_record(task, agent_type, use_worktree, cfg, repo_root)?;
let session = build_session_record(db, task, agent_type, use_worktree, cfg, repo_root)?;
db.insert_session(&session)?;
if use_worktree && session.worktree.is_none() {
db.enqueue_pending_worktree(&session.id, repo_root)?;
return Ok(session.id);
}
let working_dir = session
.worktree
.as_ref()
@@ -1024,6 +1137,7 @@ async fn queue_session_in_dir_with_runner_program(
}
fn build_session_record(
db: &StateStore,
task: &str,
agent_type: &str,
use_worktree: bool,
@@ -1033,7 +1147,7 @@ fn build_session_record(
let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
let now = chrono::Utc::now();
let worktree = if use_worktree {
let worktree = if use_worktree && attached_worktree_count(db)? < cfg.max_parallel_worktrees {
Some(worktree::create_for_session_in_repo(&id, cfg, repo_root)?)
} else {
None
@@ -1067,10 +1181,15 @@ async fn create_session_in_dir(
repo_root: &Path,
agent_program: &Path,
) -> Result<String> {
let session = build_session_record(task, agent_type, use_worktree, cfg, repo_root)?;
let session = build_session_record(db, task, agent_type, use_worktree, cfg, repo_root)?;
db.insert_session(&session)?;
if use_worktree && session.worktree.is_none() {
db.enqueue_pending_worktree(&session.id, repo_root)?;
return Ok(session.id);
}
let working_dir = session
.worktree
.as_ref()
@@ -1095,6 +1214,14 @@ async fn create_session_in_dir(
}
}
fn attached_worktree_count(db: &StateStore) -> Result<usize> {
Ok(db
.list_sessions()?
.into_iter()
.filter(|session| session.worktree.is_some())
.count())
}
async fn spawn_session_runner(
task: &str,
session_id: &str,
@@ -1296,6 +1423,7 @@ fn stop_session_recorded(db: &StateStore, session: &Session, cleanup_worktree: b
if cleanup_worktree {
if let Some(worktree) = session.worktree.as_ref() {
crate::worktree::remove(worktree)?;
db.clear_worktree_to_dir(&session.id, &session.working_dir)?;
}
}
@@ -2095,6 +2223,131 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn create_session_with_worktree_limit_queues_without_starting_runner() -> Result<()> {
let tempdir = TestDir::new("manager-worktree-limit-queue")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let mut cfg = build_config(tempdir.path());
cfg.max_parallel_worktrees = 1;
let db = StateStore::open(&cfg.db_path)?;
let (fake_claude, log_path) = write_fake_claude(tempdir.path())?;
let first_id = create_session_in_dir(
&db,
&cfg,
"active worktree",
"claude",
true,
&repo_root,
&fake_claude,
)
.await?;
let second_id = create_session_in_dir(
&db,
&cfg,
"queued worktree",
"claude",
true,
&repo_root,
&fake_claude,
)
.await?;
let first = db
.get_session(&first_id)?
.context("first session missing")?;
assert_eq!(first.state, SessionState::Running);
assert!(first.worktree.is_some());
let second = db
.get_session(&second_id)?
.context("second session missing")?;
assert_eq!(second.state, SessionState::Pending);
assert!(second.pid.is_none());
assert!(second.worktree.is_none());
assert!(db.pending_worktree_queue_contains(&second_id)?);
let log = wait_for_file(&log_path)?;
assert!(log.contains("active worktree"));
assert!(!log.contains("queued worktree"));
stop_session_with_options(&db, &first_id, true).await?;
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn activate_pending_worktree_sessions_starts_queued_session_when_slot_opens() -> Result<()>
{
let tempdir = TestDir::new("manager-worktree-limit-activate")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let mut cfg = build_config(tempdir.path());
cfg.max_parallel_worktrees = 1;
let db = StateStore::open(&cfg.db_path)?;
let (fake_claude, _) = write_fake_claude(tempdir.path())?;
let first_id = create_session_in_dir(
&db,
&cfg,
"active worktree",
"claude",
true,
&repo_root,
&fake_claude,
)
.await?;
let second_id = create_session_in_dir(
&db,
&cfg,
"queued worktree",
"claude",
true,
&repo_root,
&fake_claude,
)
.await?;
stop_session_with_options(&db, &first_id, true).await?;
let launch_log = tempdir.path().join("queued-launch.log");
let started =
activate_pending_worktree_sessions_with(&db, &cfg, |_, session_id, task, _, cwd| {
let launch_log = launch_log.clone();
async move {
fs::write(
&launch_log,
format!("{session_id}\n{task}\n{}\n", cwd.display()),
)?;
Ok(())
}
})
.await?;
assert_eq!(started, vec![second_id.clone()]);
assert!(!db.pending_worktree_queue_contains(&second_id)?);
let second = db
.get_session(&second_id)?
.context("queued session missing")?;
let worktree = second
.worktree
.context("queued session should gain worktree")?;
assert_eq!(second.state, SessionState::Pending);
assert!(worktree.path.exists());
let launch = fs::read_to_string(&launch_log)?;
assert!(launch.contains(&second_id));
assert!(launch.contains("queued worktree"));
assert!(launch.contains(worktree.path.to_string_lossy().as_ref()));
crate::worktree::remove(&worktree)?;
db.clear_worktree_to_dir(&second_id, &repo_root)?;
Ok(())
}
#[test]
fn enforce_budget_hard_limits_stops_active_sessions_without_cleaning_worktrees() -> Result<()> {
let tempdir = TestDir::new("manager-budget-pause")?;