feat: group ecc2 sessions by project and task

This commit is contained in:
Affaan Mustafa
2026-04-09 19:54:28 -07:00
parent 181bc26b29
commit cf8b5473c7
8 changed files with 540 additions and 38 deletions

View File

@@ -9,7 +9,10 @@ use tokio::process::Command;
use super::output::SessionOutputStore;
use super::runtime::capture_command_output;
use super::store::StateStore;
use super::{Session, SessionMetrics, SessionState};
use super::{
default_project_label, default_task_group_label, normalize_group_label, Session,
SessionGrouping, SessionMetrics, SessionState,
};
use crate::comms::{self, MessageType};
use crate::config::Config;
use crate::observability::{log_tool_call, ToolCallEvent, ToolLogEntry, ToolLogPage, ToolLogger};
@@ -21,10 +24,29 @@ pub async fn create_session(
task: &str,
agent_type: &str,
use_worktree: bool,
) -> Result<String> {
create_session_with_grouping(
db,
cfg,
task,
agent_type,
use_worktree,
SessionGrouping::default(),
)
.await
}
pub async fn create_session_with_grouping(
db: &StateStore,
cfg: &Config,
task: &str,
agent_type: &str,
use_worktree: bool,
grouping: SessionGrouping,
) -> Result<String> {
let repo_root =
std::env::current_dir().context("Failed to resolve current working directory")?;
queue_session_in_dir(db, cfg, task, agent_type, use_worktree, &repo_root).await
queue_session_in_dir(db, cfg, task, agent_type, use_worktree, &repo_root, grouping).await
}
pub fn list_sessions(db: &StateStore) -> Result<Vec<Session>> {
@@ -127,6 +149,27 @@ pub async fn assign_session(
task: &str,
agent_type: &str,
use_worktree: bool,
) -> Result<AssignmentOutcome> {
assign_session_with_grouping(
db,
cfg,
lead_id,
task,
agent_type,
use_worktree,
SessionGrouping::default(),
)
.await
}
pub async fn assign_session_with_grouping(
db: &StateStore,
cfg: &Config,
lead_id: &str,
task: &str,
agent_type: &str,
use_worktree: bool,
grouping: SessionGrouping,
) -> Result<AssignmentOutcome> {
let repo_root =
std::env::current_dir().context("Failed to resolve current working directory")?;
@@ -139,6 +182,7 @@ pub async fn assign_session(
use_worktree,
&repo_root,
&std::env::current_exe().context("Failed to resolve ECC executable path")?,
grouping,
)
.await
}
@@ -175,6 +219,7 @@ pub async fn drain_inbox(
use_worktree,
&repo_root,
&runner_program,
SessionGrouping::default(),
)
.await?;
@@ -380,6 +425,7 @@ pub async fn rebalance_team_backlog(
use_worktree,
&repo_root,
&runner_program,
SessionGrouping::default(),
)
.await?;
@@ -538,8 +584,17 @@ async fn assign_session_in_dir_with_runner_program(
use_worktree: bool,
repo_root: &Path,
runner_program: &Path,
grouping: SessionGrouping,
) -> Result<AssignmentOutcome> {
let lead = resolve_session(db, lead_id)?;
let inherited_grouping = SessionGrouping {
project: grouping
.project
.or_else(|| normalize_group_label(&lead.project)),
task_group: grouping
.task_group
.or_else(|| normalize_group_label(&lead.task_group)),
};
let delegates = direct_delegate_sessions(db, &lead.id, agent_type)?;
let delegate_handoff_backlog = delegates
.iter()
@@ -577,6 +632,7 @@ async fn assign_session_in_dir_with_runner_program(
use_worktree,
repo_root,
runner_program,
inherited_grouping.clone(),
)
.await?;
send_task_handoff(db, &lead, &session_id, task, "spawned new delegate")?;
@@ -651,6 +707,7 @@ async fn assign_session_in_dir_with_runner_program(
use_worktree,
repo_root,
runner_program,
inherited_grouping,
)
.await?;
send_task_handoff(db, &lead, &session_id, task, "spawned fallback delegate")?;
@@ -1093,6 +1150,7 @@ async fn queue_session_in_dir(
agent_type: &str,
use_worktree: bool,
repo_root: &Path,
grouping: SessionGrouping,
) -> Result<String> {
queue_session_in_dir_with_runner_program(
db,
@@ -1102,6 +1160,7 @@ async fn queue_session_in_dir(
use_worktree,
repo_root,
&std::env::current_exe().context("Failed to resolve ECC executable path")?,
grouping,
)
.await
}
@@ -1114,8 +1173,17 @@ async fn queue_session_in_dir_with_runner_program(
use_worktree: bool,
repo_root: &Path,
runner_program: &Path,
grouping: SessionGrouping,
) -> Result<String> {
let session = build_session_record(db, task, agent_type, use_worktree, cfg, repo_root)?;
let session = build_session_record(
db,
task,
agent_type,
use_worktree,
cfg,
repo_root,
grouping,
)?;
db.insert_session(&session)?;
if use_worktree && session.worktree.is_none() {
@@ -1158,6 +1226,7 @@ fn build_session_record(
use_worktree: bool,
cfg: &Config,
repo_root: &Path,
grouping: SessionGrouping,
) -> Result<Session> {
let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
let now = chrono::Utc::now();
@@ -1171,10 +1240,22 @@ fn build_session_record(
.as_ref()
.map(|worktree| worktree.path.clone())
.unwrap_or_else(|| repo_root.to_path_buf());
let project = grouping
.project
.as_deref()
.and_then(normalize_group_label)
.unwrap_or_else(|| default_project_label(repo_root));
let task_group = grouping
.task_group
.as_deref()
.and_then(normalize_group_label)
.unwrap_or_else(|| default_task_group_label(task));
Ok(Session {
id,
task: task.to_string(),
project,
task_group,
agent_type: agent_type.to_string(),
working_dir,
state: SessionState::Pending,
@@ -1196,7 +1277,15 @@ async fn create_session_in_dir(
repo_root: &Path,
agent_program: &Path,
) -> Result<String> {
let session = build_session_record(db, task, agent_type, use_worktree, cfg, repo_root)?;
let session = build_session_record(
db,
task,
agent_type,
use_worktree,
cfg,
repo_root,
SessionGrouping::default(),
)?;
db.insert_session(&session)?;
@@ -1962,6 +2051,8 @@ mod tests {
Session {
id: id.to_string(),
task: format!("task-{id}"),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state,
@@ -1984,6 +2075,8 @@ mod tests {
db.insert_session(&Session {
id: "stale-1".to_string(),
task: "heartbeat overdue".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
@@ -2019,6 +2112,8 @@ mod tests {
db.insert_session(&Session {
id: "stale-2".to_string(),
task: "terminate overdue".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
@@ -2171,6 +2266,37 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn create_session_derives_project_and_task_group_defaults() -> Result<()> {
let tempdir = TestDir::new("manager-create-session-grouping-defaults")?;
let repo_root = tempdir.path().join("checkout-api");
init_git_repo(&repo_root)?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let (fake_claude, _) = write_fake_claude(tempdir.path())?;
let session_id = create_session_in_dir(
&db,
&cfg,
"stabilize auth callback",
"claude",
false,
&repo_root,
&fake_claude,
)
.await?;
let session = db
.get_session(&session_id)?
.context("session should exist")?;
assert_eq!(session.project, "checkout-api");
assert_eq!(session.task_group, "stabilize auth callback");
stop_session_with_options(&db, &session_id, false).await?;
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn stop_session_kills_process_and_optionally_cleans_worktree() -> Result<()> {
let tempdir = TestDir::new("manager-stop-session")?;
@@ -2379,6 +2505,8 @@ mod tests {
db.insert_session(&Session {
id: "active-over-budget".to_string(),
task: "pause on hard limit".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: tempdir.path().to_path_buf(),
state: SessionState::Running,
@@ -2440,6 +2568,8 @@ mod tests {
db.insert_session(&Session {
id: "completed-over-budget".to_string(),
task: "already done".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: tempdir.path().to_path_buf(),
state: SessionState::Completed,
@@ -2485,6 +2615,8 @@ mod tests {
db.insert_session(&Session {
id: "deadbeef".to_string(),
task: "resume previous task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: tempdir.path().join("resume-working-dir"),
state: SessionState::Failed,
@@ -2797,6 +2929,8 @@ mod tests {
db.insert_session(&Session {
id: "merge-ready".to_string(),
task: "merge me".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: merged_worktree.path.clone(),
state: SessionState::Completed,
@@ -2813,6 +2947,8 @@ mod tests {
db.insert_session(&Session {
id: "active-worktree".to_string(),
task: "still running".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: active_worktree.path.clone(),
state: SessionState::Running,
@@ -2830,6 +2966,8 @@ mod tests {
db.insert_session(&Session {
id: "dirty-worktree".to_string(),
task: "needs commit".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: dirty_worktree.path.clone(),
state: SessionState::Stopped,
@@ -3056,6 +3194,8 @@ mod tests {
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3069,6 +3209,8 @@ mod tests {
db.insert_session(&Session {
id: "idle-worker".to_string(),
task: "old worker task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Idle,
@@ -3097,6 +3239,7 @@ mod tests {
true,
&repo_root,
&fake_runner,
SessionGrouping::default(),
)
.await?;
@@ -3125,6 +3268,8 @@ mod tests {
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3138,6 +3283,8 @@ mod tests {
db.insert_session(&Session {
id: "idle-worker".to_string(),
task: "old worker task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Idle,
@@ -3165,6 +3312,7 @@ mod tests {
true,
&repo_root,
&fake_runner,
SessionGrouping::default(),
)
.await?;
@@ -3203,6 +3351,8 @@ mod tests {
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3216,6 +3366,8 @@ mod tests {
db.insert_session(&Session {
id: "idle-worker".to_string(),
task: "old worker task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Idle,
@@ -3245,6 +3397,7 @@ mod tests {
true,
&repo_root,
&fake_runner,
SessionGrouping::default(),
)
.await?;
@@ -3272,6 +3425,8 @@ mod tests {
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3285,6 +3440,8 @@ mod tests {
db.insert_session(&Session {
id: "busy-worker".to_string(),
task: "existing work".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3312,6 +3469,7 @@ mod tests {
true,
&repo_root,
&fake_runner,
SessionGrouping::default(),
)
.await?;
@@ -3331,6 +3489,57 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn assign_session_inherits_lead_grouping_for_spawned_delegate() -> Result<()> {
let tempdir = TestDir::new("manager-assign-grouping-inheritance")?;
let repo_root = tempdir.path().join("repo");
init_git_repo(&repo_root)?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
project: "ecc-platform".to_string(),
task_group: "checkout recovery".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
pid: Some(42),
worktree: None,
created_at: now - Duration::minutes(3),
updated_at: now - Duration::minutes(3),
last_heartbeat_at: now - Duration::minutes(3),
metrics: SessionMetrics::default(),
})?;
let (fake_runner, _) = write_fake_claude(tempdir.path())?;
let outcome = assign_session_in_dir_with_runner_program(
&db,
&cfg,
"lead",
"investigate webhook retry edge cases",
"claude",
true,
&repo_root,
&fake_runner,
SessionGrouping::default(),
)
.await?;
assert_eq!(outcome.action, AssignmentAction::Spawned);
let spawned = db
.get_session(&outcome.session_id)?
.context("spawned delegated session missing")?;
assert_eq!(spawned.project, "ecc-platform");
assert_eq!(spawned.task_group, "checkout recovery");
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn assign_session_defers_when_team_is_saturated() -> Result<()> {
let tempdir = TestDir::new("manager-assign-defer-saturated")?;
@@ -3345,6 +3554,8 @@ mod tests {
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3358,6 +3569,8 @@ mod tests {
db.insert_session(&Session {
id: "busy-worker".to_string(),
task: "existing work".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3385,6 +3598,7 @@ mod tests {
true,
&repo_root,
&fake_runner,
SessionGrouping::default(),
)
.await?;
@@ -3412,6 +3626,8 @@ mod tests {
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3460,6 +3676,8 @@ mod tests {
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3473,6 +3691,8 @@ mod tests {
db.insert_session(&Session {
id: "busy-worker".to_string(),
task: "existing work".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3529,6 +3749,8 @@ mod tests {
db.insert_session(&Session {
id: lead_id.to_string(),
task: format!("{lead_id} task"),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3589,6 +3811,8 @@ mod tests {
db.insert_session(&Session {
id: lead_id.to_string(),
task: format!("{lead_id} task"),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3641,6 +3865,8 @@ mod tests {
db.insert_session(&Session {
id: "worker".to_string(),
task: "worker task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3655,6 +3881,8 @@ mod tests {
db.insert_session(&Session {
id: "worker-child".to_string(),
task: "delegate task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3711,6 +3939,8 @@ mod tests {
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3724,6 +3954,8 @@ mod tests {
db.insert_session(&Session {
id: "worker-a".to_string(),
task: "auth lane".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Idle,
@@ -3737,6 +3969,8 @@ mod tests {
db.insert_session(&Session {
id: "worker-b".to_string(),
task: "billing lane".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Idle,
@@ -3799,6 +4033,8 @@ mod tests {
db.insert_session(&Session {
id: "lead".to_string(),
task: "lead task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root.clone(),
state: SessionState::Running,
@@ -3812,6 +4048,8 @@ mod tests {
db.insert_session(&Session {
id: "worker".to_string(),
task: "delegate task".to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: repo_root,
state: SessionState::Idle,