feat: add ecc2 merge queue reporting

This commit is contained in:
Affaan Mustafa
2026-04-09 20:04:04 -07:00
parent cf8b5473c7
commit d0dbb20805
4 changed files with 669 additions and 10 deletions

View File

@@ -964,6 +964,191 @@ pub async fn prune_inactive_worktrees(
})
}
#[derive(Debug, Clone, Serialize)]
pub struct MergeQueueBlocker {
pub session_id: String,
pub branch: String,
pub state: SessionState,
pub conflicts: Vec<String>,
pub summary: String,
pub conflicting_patch_preview: Option<String>,
pub blocker_patch_preview: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct MergeQueueEntry {
pub session_id: String,
pub task: String,
pub project: String,
pub task_group: String,
pub branch: String,
pub base_branch: String,
pub state: SessionState,
pub worktree_health: worktree::WorktreeHealth,
pub dirty: bool,
pub queue_position: Option<usize>,
pub ready_to_merge: bool,
pub blocked_by: Vec<MergeQueueBlocker>,
pub suggested_action: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct MergeQueueReport {
pub ready_entries: Vec<MergeQueueEntry>,
pub blocked_entries: Vec<MergeQueueEntry>,
}
pub fn build_merge_queue(db: &StateStore) -> Result<MergeQueueReport> {
let mut sessions = db
.list_sessions()?
.into_iter()
.filter(|session| session.worktree.is_some())
.collect::<Vec<_>>();
sessions.sort_by(|left, right| {
merge_queue_priority(left)
.cmp(&merge_queue_priority(right))
.then_with(|| left.project.cmp(&right.project))
.then_with(|| left.task_group.cmp(&right.task_group))
.then_with(|| left.updated_at.cmp(&right.updated_at))
.then_with(|| left.id.cmp(&right.id))
});
let mut entries = Vec::new();
let mut mergeable_sessions = Vec::<Session>::new();
let mut next_position = 1usize;
for session in sessions {
let Some(worktree) = session.worktree.clone() else {
continue;
};
let worktree_health = worktree::health(&worktree)?;
let dirty = worktree::has_uncommitted_changes(&worktree)?;
let mut blocked_by = Vec::new();
if matches!(
session.state,
SessionState::Pending | SessionState::Running | SessionState::Idle | SessionState::Stale
) {
blocked_by.push(MergeQueueBlocker {
session_id: session.id.clone(),
branch: worktree.branch.clone(),
state: session.state.clone(),
conflicts: Vec::new(),
summary: format!("session is still {}", session_state_label(&session.state)),
conflicting_patch_preview: None,
blocker_patch_preview: None,
});
} else if worktree_health == worktree::WorktreeHealth::Conflicted {
let readiness = worktree::merge_readiness(&worktree)?;
blocked_by.push(MergeQueueBlocker {
session_id: session.id.clone(),
branch: worktree.branch.clone(),
state: session.state.clone(),
conflicts: readiness.conflicts,
summary: readiness.summary,
conflicting_patch_preview: worktree::diff_patch_preview(&worktree, 18)?,
blocker_patch_preview: None,
});
} else if dirty {
blocked_by.push(MergeQueueBlocker {
session_id: session.id.clone(),
branch: worktree.branch.clone(),
state: session.state.clone(),
conflicts: Vec::new(),
summary: "worktree has uncommitted changes".to_string(),
conflicting_patch_preview: worktree::diff_patch_preview(&worktree, 18)?,
blocker_patch_preview: None,
});
} else {
for blocker in &mergeable_sessions {
let Some(blocker_worktree) = blocker.worktree.as_ref() else {
continue;
};
let Some(conflict) =
worktree::branch_conflict_preview(&worktree, blocker_worktree, 12)?
else {
continue;
};
blocked_by.push(MergeQueueBlocker {
session_id: blocker.id.clone(),
branch: blocker_worktree.branch.clone(),
state: blocker.state.clone(),
conflicts: conflict.conflicts,
summary: format!(
"merge after {} to avoid branch conflicts",
blocker.id
),
conflicting_patch_preview: conflict.right_patch_preview,
blocker_patch_preview: conflict.left_patch_preview,
});
}
}
let ready_to_merge = blocked_by.is_empty();
let queue_position = if ready_to_merge {
let position = next_position;
next_position += 1;
mergeable_sessions.push(session.clone());
Some(position)
} else {
None
};
let suggested_action = if let Some(position) = queue_position {
format!("merge in queue order #{position}")
} else if blocked_by.iter().any(|blocker| blocker.session_id == session.id) {
blocked_by
.first()
.map(|blocker| blocker.summary.clone())
.unwrap_or_else(|| "resolve merge blockers".to_string())
} else {
format!(
"merge after {}",
blocked_by
.iter()
.map(|blocker| blocker.session_id.as_str())
.collect::<Vec<_>>()
.join(", ")
)
};
entries.push(MergeQueueEntry {
session_id: session.id,
task: session.task,
project: session.project,
task_group: session.task_group,
branch: worktree.branch,
base_branch: worktree.base_branch,
state: session.state,
worktree_health,
dirty,
queue_position,
ready_to_merge,
blocked_by,
suggested_action,
});
}
let mut ready_entries = entries
.iter()
.filter(|entry| entry.ready_to_merge)
.cloned()
.collect::<Vec<_>>();
ready_entries.sort_by_key(|entry| entry.queue_position.unwrap_or(usize::MAX));
let blocked_entries = entries
.into_iter()
.filter(|entry| !entry.ready_to_merge)
.collect::<Vec<_>>();
Ok(MergeQueueReport {
ready_entries,
blocked_entries,
})
}
pub async fn delete_session(db: &StateStore, id: &str) -> Result<()> {
let session = resolve_session(db, id)?;
@@ -1326,6 +1511,14 @@ fn attached_worktree_count(db: &StateStore) -> Result<usize> {
.count())
}
fn merge_queue_priority(session: &Session) -> (u8, chrono::DateTime<chrono::Utc>) {
let active_rank = match session.state {
SessionState::Completed | SessionState::Failed | SessionState::Stopped => 0,
SessionState::Pending | SessionState::Running | SessionState::Idle | SessionState::Stale => 1,
};
(active_rank, session.updated_at)
}
async fn spawn_session_runner(
task: &str,
session_id: &str,
@@ -3020,6 +3213,97 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn build_merge_queue_orders_ready_sessions_and_blocks_conflicts() -> Result<()> {
let tempdir = TestDir::new("manager-merge-queue")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
let alpha_worktree = worktree::create_for_session_in_repo("alpha", &cfg, &repo_root)?;
fs::write(alpha_worktree.path.join("README.md"), "alpha\n")?;
run_git(&alpha_worktree.path, ["add", "README.md"])?;
run_git(&alpha_worktree.path, ["commit", "-m", "alpha change"])?;
let beta_worktree = worktree::create_for_session_in_repo("beta", &cfg, &repo_root)?;
fs::write(beta_worktree.path.join("README.md"), "beta\n")?;
run_git(&beta_worktree.path, ["add", "README.md"])?;
run_git(&beta_worktree.path, ["commit", "-m", "beta change"])?;
let gamma_worktree = worktree::create_for_session_in_repo("gamma", &cfg, &repo_root)?;
fs::write(gamma_worktree.path.join("src.txt"), "gamma\n")?;
run_git(&gamma_worktree.path, ["add", "src.txt"])?;
run_git(&gamma_worktree.path, ["commit", "-m", "gamma change"])?;
db.insert_session(&Session {
id: "alpha".to_string(),
task: "alpha merge".to_string(),
project: "ecc".to_string(),
task_group: "merge".to_string(),
agent_type: "claude".to_string(),
working_dir: alpha_worktree.path.clone(),
state: SessionState::Stopped,
pid: None,
worktree: Some(alpha_worktree),
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "beta".to_string(),
task: "beta merge".to_string(),
project: "ecc".to_string(),
task_group: "merge".to_string(),
agent_type: "claude".to_string(),
working_dir: beta_worktree.path.clone(),
state: SessionState::Stopped,
pid: None,
worktree: Some(beta_worktree),
created_at: now - Duration::minutes(2),
updated_at: now - Duration::minutes(2),
last_heartbeat_at: now - Duration::minutes(2),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "gamma".to_string(),
task: "gamma merge".to_string(),
project: "ecc".to_string(),
task_group: "merge".to_string(),
agent_type: "claude".to_string(),
working_dir: gamma_worktree.path.clone(),
state: SessionState::Stopped,
pid: None,
worktree: Some(gamma_worktree),
created_at: now - Duration::minutes(1),
updated_at: now - Duration::minutes(1),
last_heartbeat_at: now - Duration::minutes(1),
metrics: SessionMetrics::default(),
})?;
let queue = build_merge_queue(&db)?;
assert_eq!(queue.ready_entries.len(), 2);
assert_eq!(queue.ready_entries[0].session_id, "alpha");
assert_eq!(queue.ready_entries[0].queue_position, Some(1));
assert_eq!(queue.ready_entries[1].session_id, "gamma");
assert_eq!(queue.ready_entries[1].queue_position, Some(2));
assert_eq!(queue.blocked_entries.len(), 1);
let blocked = &queue.blocked_entries[0];
assert_eq!(blocked.session_id, "beta");
assert_eq!(blocked.blocked_by.len(), 1);
assert_eq!(blocked.blocked_by[0].session_id, "alpha");
assert!(blocked.blocked_by[0]
.conflicts
.contains(&"README.md".to_string()));
assert!(blocked.suggested_action.contains("merge after alpha"));
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn delete_session_removes_inactive_session_and_worktree() -> Result<()> {
let tempdir = TestDir::new("manager-delete-session")?;