feat: auto-rebase blocked merge queue worktrees

This commit is contained in:
Affaan Mustafa
2026-04-09 21:28:33 -07:00
parent 5fb2e62216
commit 599a9d1e7b
5 changed files with 641 additions and 6 deletions

View File

@@ -1202,9 +1202,11 @@ mod tests {
invoked_flag.store(true, std::sync::atomic::Ordering::SeqCst);
Ok(manager::WorktreeBulkMergeOutcome {
merged: Vec::new(),
rebased: Vec::new(),
active_with_worktree_ids: Vec::new(),
conflicted_session_ids: Vec::new(),
dirty_worktree_ids: Vec::new(),
blocked_by_queue_session_ids: Vec::new(),
failures: Vec::new(),
})
}
@@ -1239,9 +1241,16 @@ mod tests {
cleaned_worktree: true,
},
],
rebased: vec![manager::WorktreeRebaseOutcome {
session_id: "worker-r".to_string(),
branch: "ecc/worker-r".to_string(),
base_branch: "main".to_string(),
already_up_to_date: false,
}],
active_with_worktree_ids: vec!["worker-c".to_string()],
conflicted_session_ids: vec!["worker-d".to_string()],
dirty_worktree_ids: vec!["worker-e".to_string()],
blocked_by_queue_session_ids: vec!["worker-f".to_string()],
failures: Vec::new(),
})
})

View File

@@ -803,6 +803,14 @@ pub struct WorktreeMergeOutcome {
pub cleaned_worktree: bool,
}
#[derive(Debug, Clone, Serialize)]
pub struct WorktreeRebaseOutcome {
pub session_id: String,
pub branch: String,
pub base_branch: String,
pub already_up_to_date: bool,
}
pub async fn merge_session_worktree(
db: &StateStore,
id: &str,
@@ -841,6 +849,34 @@ pub async fn merge_session_worktree(
})
}
pub async fn rebase_session_worktree(db: &StateStore, id: &str) -> Result<WorktreeRebaseOutcome> {
let session = resolve_session(db, id)?;
if matches!(
session.state,
SessionState::Pending | SessionState::Running | SessionState::Idle | SessionState::Stale
) {
anyhow::bail!(
"Cannot rebase active session {} while it is {}",
session.id,
session.state
);
}
let worktree = session
.worktree
.clone()
.ok_or_else(|| anyhow::anyhow!("Session {} has no attached worktree", session.id))?;
let outcome = crate::worktree::rebase_onto_base(&worktree)?;
Ok(WorktreeRebaseOutcome {
session_id: session.id,
branch: outcome.branch,
base_branch: outcome.base_branch,
already_up_to_date: outcome.already_up_to_date,
})
}
#[derive(Debug, Clone, Serialize)]
pub struct WorktreeMergeFailure {
pub session_id: String,
@@ -850,15 +886,110 @@ pub struct WorktreeMergeFailure {
#[derive(Debug, Clone, Serialize)]
pub struct WorktreeBulkMergeOutcome {
pub merged: Vec<WorktreeMergeOutcome>,
pub rebased: Vec<WorktreeRebaseOutcome>,
pub active_with_worktree_ids: Vec<String>,
pub conflicted_session_ids: Vec<String>,
pub dirty_worktree_ids: Vec<String>,
pub blocked_by_queue_session_ids: Vec<String>,
pub failures: Vec<WorktreeMergeFailure>,
}
pub async fn merge_ready_worktrees(
db: &StateStore,
cleanup_worktree: bool,
) -> Result<WorktreeBulkMergeOutcome> {
if cleanup_worktree {
return process_merge_queue(db).await;
}
merge_ready_worktrees_one_pass(db, cleanup_worktree).await
}
pub async fn process_merge_queue(db: &StateStore) -> Result<WorktreeBulkMergeOutcome> {
let mut merged = Vec::new();
let mut rebased = Vec::new();
let mut failures = Vec::new();
let mut attempted_rebase_heads = BTreeMap::<String, String>::new();
loop {
let report = build_merge_queue(db)?;
let mut merged_any = false;
for entry in &report.ready_entries {
match merge_session_worktree(db, &entry.session_id, true).await {
Ok(outcome) => {
merged.push(outcome);
merged_any = true;
}
Err(error) => failures.push(WorktreeMergeFailure {
session_id: entry.session_id.clone(),
reason: error.to_string(),
}),
}
}
if merged_any {
continue;
}
let mut rebased_any = false;
for entry in &report.blocked_entries {
if !can_auto_rebase_merge_queue_entry(entry) {
continue;
}
let session = resolve_session(db, &entry.session_id)?;
let Some(worktree) = session.worktree.clone() else {
continue;
};
let base_head = crate::worktree::branch_head_oid(&worktree, &worktree.base_branch)?;
if attempted_rebase_heads
.get(&entry.session_id)
.is_some_and(|last_head| last_head == &base_head)
{
continue;
}
attempted_rebase_heads.insert(entry.session_id.clone(), base_head);
match rebase_session_worktree(db, &entry.session_id).await {
Ok(outcome) => {
rebased.push(outcome);
rebased_any = true;
break;
}
Err(error) => failures.push(WorktreeMergeFailure {
session_id: entry.session_id.clone(),
reason: error.to_string(),
}),
}
}
if rebased_any {
continue;
}
let (
active_with_worktree_ids,
conflicted_session_ids,
dirty_worktree_ids,
blocked_by_queue_session_ids,
) = classify_merge_queue_report(&report);
return Ok(WorktreeBulkMergeOutcome {
merged,
rebased,
active_with_worktree_ids,
conflicted_session_ids,
dirty_worktree_ids,
blocked_by_queue_session_ids,
failures,
});
}
}
async fn merge_ready_worktrees_one_pass(
db: &StateStore,
cleanup_worktree: bool,
) -> Result<WorktreeBulkMergeOutcome> {
let sessions = db.list_sessions()?;
let mut merged = Vec::new();
@@ -926,9 +1057,11 @@ pub async fn merge_ready_worktrees(
Ok(WorktreeBulkMergeOutcome {
merged,
rebased: Vec::new(),
active_with_worktree_ids,
conflicted_session_ids,
dirty_worktree_ids,
blocked_by_queue_session_ids: Vec::new(),
failures,
})
}
@@ -1170,6 +1303,49 @@ pub fn build_merge_queue(db: &StateStore) -> Result<MergeQueueReport> {
})
}
fn can_auto_rebase_merge_queue_entry(entry: &MergeQueueEntry) -> bool {
!entry.ready_to_merge
&& !entry.dirty
&& entry.worktree_health == worktree::WorktreeHealth::Conflicted
&& !entry.blocked_by.is_empty()
&& entry
.blocked_by
.iter()
.all(|blocker| blocker.session_id == entry.session_id)
}
fn classify_merge_queue_report(
report: &MergeQueueReport,
) -> (Vec<String>, Vec<String>, Vec<String>, Vec<String>) {
let mut active = Vec::new();
let mut conflicted = Vec::new();
let mut dirty = Vec::new();
let mut queue_blocked = Vec::new();
for entry in &report.blocked_entries {
if entry.blocked_by.iter().any(|blocker| {
blocker.session_id == entry.session_id
&& matches!(
blocker.state,
SessionState::Pending
| SessionState::Running
| SessionState::Idle
| SessionState::Stale
)
}) {
active.push(entry.session_id.clone());
} else if entry.dirty {
dirty.push(entry.session_id.clone());
} else if entry.worktree_health == worktree::WorktreeHealth::Conflicted {
conflicted.push(entry.session_id.clone());
} else {
queue_blocked.push(entry.session_id.clone());
}
}
(active, conflicted, dirty, queue_blocked)
}
pub async fn delete_session(db: &StateStore, id: &str) -> Result<()> {
let session = resolve_session(db, id)?;
@@ -3235,6 +3411,174 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn process_merge_queue_rebases_blocked_session_and_merges_it() -> Result<()> {
let tempdir = TestDir::new("manager-process-merge-queue-success")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
let alpha_worktree = worktree::create_for_session_in_repo("alpha", &cfg, &repo_root)?;
fs::write(alpha_worktree.path.join("README.md"), "hello\nalpha\n")?;
run_git(&alpha_worktree.path, ["commit", "-am", "alpha change"])?;
let beta_worktree = worktree::create_for_session_in_repo("beta", &cfg, &repo_root)?;
fs::write(beta_worktree.path.join("README.md"), "hello\nalpha\n")?;
run_git(&beta_worktree.path, ["commit", "-am", "beta shared change"])?;
fs::write(beta_worktree.path.join("README.md"), "hello\nalpha\nbeta\n")?;
run_git(&beta_worktree.path, ["commit", "-am", "beta follow-up"])?;
db.insert_session(&Session {
id: "alpha".to_string(),
task: "alpha merge".to_string(),
project: "ecc".to_string(),
task_group: "merge".to_string(),
agent_type: "claude".to_string(),
working_dir: alpha_worktree.path.clone(),
state: SessionState::Completed,
pid: None,
worktree: Some(alpha_worktree.clone()),
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
last_heartbeat_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "beta".to_string(),
task: "beta merge".to_string(),
project: "ecc".to_string(),
task_group: "merge".to_string(),
agent_type: "claude".to_string(),
working_dir: beta_worktree.path.clone(),
state: SessionState::Completed,
pid: None,
worktree: Some(beta_worktree.clone()),
created_at: now - Duration::minutes(1),
updated_at: now - Duration::minutes(1),
last_heartbeat_at: now - Duration::minutes(1),
metrics: SessionMetrics::default(),
})?;
let queue_before = build_merge_queue(&db)?;
assert_eq!(queue_before.ready_entries.len(), 1);
assert_eq!(queue_before.ready_entries[0].session_id, "alpha");
assert_eq!(queue_before.blocked_entries.len(), 1);
assert_eq!(queue_before.blocked_entries[0].session_id, "beta");
let outcome = process_merge_queue(&db).await?;
assert_eq!(
outcome
.merged
.iter()
.map(|entry| entry.session_id.as_str())
.collect::<Vec<_>>(),
vec!["alpha", "beta"]
);
assert_eq!(outcome.rebased.len(), 1);
assert_eq!(outcome.rebased[0].session_id, "beta");
assert!(outcome.active_with_worktree_ids.is_empty());
assert!(outcome.conflicted_session_ids.is_empty());
assert!(outcome.dirty_worktree_ids.is_empty());
assert!(outcome.blocked_by_queue_session_ids.is_empty());
assert!(outcome.failures.is_empty());
assert_eq!(
fs::read_to_string(repo_root.join("README.md"))?,
"hello\nalpha\nbeta\n"
);
assert!(db
.get_session("alpha")?
.context("alpha should still exist")?
.worktree
.is_none());
assert!(db
.get_session("beta")?
.context("beta should still exist")?
.worktree
.is_none());
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn process_merge_queue_records_failed_rebase_and_leaves_blocked_session() -> Result<()> {
let tempdir = TestDir::new("manager-process-merge-queue-fail")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
let alpha_worktree = worktree::create_for_session_in_repo("alpha", &cfg, &repo_root)?;
fs::write(alpha_worktree.path.join("README.md"), "hello\nalpha\n")?;
run_git(&alpha_worktree.path, ["commit", "-am", "alpha change"])?;
let beta_worktree = worktree::create_for_session_in_repo("beta", &cfg, &repo_root)?;
fs::write(beta_worktree.path.join("README.md"), "hello\nbeta\n")?;
run_git(&beta_worktree.path, ["commit", "-am", "beta change"])?;
db.insert_session(&Session {
id: "alpha".to_string(),
task: "alpha merge".to_string(),
project: "ecc".to_string(),
task_group: "merge".to_string(),
agent_type: "claude".to_string(),
working_dir: alpha_worktree.path.clone(),
state: SessionState::Completed,
pid: None,
worktree: Some(alpha_worktree.clone()),
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
last_heartbeat_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "beta".to_string(),
task: "beta merge".to_string(),
project: "ecc".to_string(),
task_group: "merge".to_string(),
agent_type: "claude".to_string(),
working_dir: beta_worktree.path.clone(),
state: SessionState::Completed,
pid: None,
worktree: Some(beta_worktree.clone()),
created_at: now - Duration::minutes(1),
updated_at: now - Duration::minutes(1),
last_heartbeat_at: now - Duration::minutes(1),
metrics: SessionMetrics::default(),
})?;
let outcome = process_merge_queue(&db).await?;
assert_eq!(
outcome
.merged
.iter()
.map(|entry| entry.session_id.as_str())
.collect::<Vec<_>>(),
vec!["alpha"]
);
assert!(outcome.rebased.is_empty());
assert_eq!(outcome.conflicted_session_ids, vec!["beta".to_string()]);
assert!(outcome.active_with_worktree_ids.is_empty());
assert!(outcome.dirty_worktree_ids.is_empty());
assert!(outcome.blocked_by_queue_session_ids.is_empty());
assert_eq!(outcome.failures.len(), 1);
assert_eq!(outcome.failures[0].session_id, "beta");
assert!(outcome.failures[0].reason.contains("git rebase failed"));
assert!(db
.get_session("beta")?
.context("beta should still exist")?
.worktree
.is_some());
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn build_merge_queue_orders_ready_sessions_and_blocks_conflicts() -> Result<()> {
let tempdir = TestDir::new("manager-merge-queue")?;