mirror of
https://github.com/affaan-m/everything-claude-code.git
synced 2026-04-12 04:33:29 +08:00
feat: report ecc2 remaining coordination backlog
This commit is contained in:
@@ -366,7 +366,7 @@ async fn main() -> Result<()> {
|
|||||||
worktree: use_worktree,
|
worktree: use_worktree,
|
||||||
lead_limit,
|
lead_limit,
|
||||||
}) => {
|
}) => {
|
||||||
let dispatch_outcomes = session::manager::auto_dispatch_backlog(
|
let outcome = session::manager::coordinate_backlog(
|
||||||
&db,
|
&db,
|
||||||
&cfg,
|
&cfg,
|
||||||
&agent,
|
&agent,
|
||||||
@@ -374,31 +374,31 @@ async fn main() -> Result<()> {
|
|||||||
lead_limit,
|
lead_limit,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let total_routed: usize =
|
let total_routed: usize = outcome
|
||||||
dispatch_outcomes.iter().map(|outcome| outcome.routed.len()).sum();
|
.dispatched
|
||||||
|
|
||||||
let rebalance_outcomes = session::manager::rebalance_all_teams(
|
|
||||||
&db,
|
|
||||||
&cfg,
|
|
||||||
&agent,
|
|
||||||
use_worktree,
|
|
||||||
lead_limit,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
let total_rerouted: usize = rebalance_outcomes
|
|
||||||
.iter()
|
.iter()
|
||||||
.map(|outcome| outcome.rerouted.len())
|
.map(|dispatch| dispatch.routed.len())
|
||||||
|
.sum();
|
||||||
|
let total_rerouted: usize = outcome
|
||||||
|
.rebalanced
|
||||||
|
.iter()
|
||||||
|
.map(|rebalance| rebalance.rerouted.len())
|
||||||
.sum();
|
.sum();
|
||||||
|
|
||||||
if total_routed == 0 && total_rerouted == 0 {
|
if total_routed == 0
|
||||||
|
&& total_rerouted == 0
|
||||||
|
&& outcome.remaining_backlog_sessions == 0
|
||||||
|
{
|
||||||
println!("Backlog already clear");
|
println!("Backlog already clear");
|
||||||
} else {
|
} else {
|
||||||
println!(
|
println!(
|
||||||
"Coordinated backlog: dispatched {} handoff(s) across {} lead(s); rebalanced {} handoff(s) across {} lead(s)",
|
"Coordinated backlog: dispatched {} handoff(s) across {} lead(s); rebalanced {} handoff(s) across {} lead(s); remaining {} handoff(s) across {} session(s)",
|
||||||
total_routed,
|
total_routed,
|
||||||
dispatch_outcomes.len(),
|
outcome.dispatched.len(),
|
||||||
total_rerouted,
|
total_rerouted,
|
||||||
rebalance_outcomes.len()
|
outcome.rebalanced.len(),
|
||||||
|
outcome.remaining_backlog_messages,
|
||||||
|
outcome.remaining_backlog_sessions
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -202,6 +202,30 @@ pub async fn rebalance_all_teams(
|
|||||||
Ok(outcomes)
|
Ok(outcomes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn coordinate_backlog(
|
||||||
|
db: &StateStore,
|
||||||
|
cfg: &Config,
|
||||||
|
agent_type: &str,
|
||||||
|
use_worktree: bool,
|
||||||
|
lead_limit: usize,
|
||||||
|
) -> Result<CoordinateBacklogOutcome> {
|
||||||
|
let dispatched = auto_dispatch_backlog(db, cfg, agent_type, use_worktree, lead_limit).await?;
|
||||||
|
let rebalanced = rebalance_all_teams(db, cfg, agent_type, use_worktree, lead_limit).await?;
|
||||||
|
let remaining_targets = db.unread_task_handoff_targets(db.list_sessions()?.len().max(1))?;
|
||||||
|
let remaining_backlog_sessions = remaining_targets.len();
|
||||||
|
let remaining_backlog_messages = remaining_targets
|
||||||
|
.iter()
|
||||||
|
.map(|(_, unread_count)| *unread_count)
|
||||||
|
.sum();
|
||||||
|
|
||||||
|
Ok(CoordinateBacklogOutcome {
|
||||||
|
dispatched,
|
||||||
|
rebalanced,
|
||||||
|
remaining_backlog_sessions,
|
||||||
|
remaining_backlog_messages,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn rebalance_team_backlog(
|
pub async fn rebalance_team_backlog(
|
||||||
db: &StateStore,
|
db: &StateStore,
|
||||||
cfg: &Config,
|
cfg: &Config,
|
||||||
@@ -1006,6 +1030,13 @@ pub struct LeadRebalanceOutcome {
|
|||||||
pub rerouted: Vec<RebalanceOutcome>,
|
pub rerouted: Vec<RebalanceOutcome>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct CoordinateBacklogOutcome {
|
||||||
|
pub dispatched: Vec<LeadDispatchOutcome>,
|
||||||
|
pub rebalanced: Vec<LeadRebalanceOutcome>,
|
||||||
|
pub remaining_backlog_sessions: usize,
|
||||||
|
pub remaining_backlog_messages: usize,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
pub enum AssignmentAction {
|
pub enum AssignmentAction {
|
||||||
Spawned,
|
Spawned,
|
||||||
@@ -1899,6 +1930,55 @@ mod tests {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "current_thread")]
|
||||||
|
async fn coordinate_backlog_reports_remaining_backlog_after_limited_pass() -> Result<()> {
|
||||||
|
let tempdir = TestDir::new("manager-coordinate-backlog")?;
|
||||||
|
let repo_root = tempdir.path().join("repo");
|
||||||
|
init_git_repo(&repo_root)?;
|
||||||
|
|
||||||
|
let mut cfg = build_config(tempdir.path());
|
||||||
|
cfg.auto_dispatch_limit_per_session = 5;
|
||||||
|
let db = StateStore::open(&cfg.db_path)?;
|
||||||
|
let now = Utc::now();
|
||||||
|
|
||||||
|
for lead_id in ["lead-a", "lead-b"] {
|
||||||
|
db.insert_session(&Session {
|
||||||
|
id: lead_id.to_string(),
|
||||||
|
task: format!("{lead_id} task"),
|
||||||
|
agent_type: "claude".to_string(),
|
||||||
|
working_dir: repo_root.clone(),
|
||||||
|
state: SessionState::Running,
|
||||||
|
pid: Some(42),
|
||||||
|
worktree: None,
|
||||||
|
created_at: now - Duration::minutes(3),
|
||||||
|
updated_at: now - Duration::minutes(3),
|
||||||
|
metrics: SessionMetrics::default(),
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
|
db.send_message(
|
||||||
|
"planner",
|
||||||
|
"lead-a",
|
||||||
|
"{\"task\":\"Review auth\",\"context\":\"Inbound\"}",
|
||||||
|
"task_handoff",
|
||||||
|
)?;
|
||||||
|
db.send_message(
|
||||||
|
"planner",
|
||||||
|
"lead-b",
|
||||||
|
"{\"task\":\"Review billing\",\"context\":\"Inbound\"}",
|
||||||
|
"task_handoff",
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let outcome = coordinate_backlog(&db, &cfg, "claude", true, 1).await?;
|
||||||
|
|
||||||
|
assert_eq!(outcome.dispatched.len(), 1);
|
||||||
|
assert_eq!(outcome.rebalanced.len(), 0);
|
||||||
|
assert_eq!(outcome.remaining_backlog_sessions, 2);
|
||||||
|
assert_eq!(outcome.remaining_backlog_messages, 2);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "current_thread")]
|
#[tokio::test(flavor = "current_thread")]
|
||||||
async fn rebalance_team_backlog_moves_work_off_backed_up_delegate() -> Result<()> {
|
async fn rebalance_team_backlog_moves_work_off_backed_up_delegate() -> Result<()> {
|
||||||
let tempdir = TestDir::new("manager-rebalance-team")?;
|
let tempdir = TestDir::new("manager-rebalance-team")?;
|
||||||
|
|||||||
@@ -892,7 +892,7 @@ impl Dashboard {
|
|||||||
let agent = self.cfg.default_agent.clone();
|
let agent = self.cfg.default_agent.clone();
|
||||||
let lead_limit = self.sessions.len().max(1);
|
let lead_limit = self.sessions.len().max(1);
|
||||||
|
|
||||||
let dispatch_outcomes = match manager::auto_dispatch_backlog(
|
let outcome = match manager::coordinate_backlog(
|
||||||
&self.db,
|
&self.db,
|
||||||
&self.cfg,
|
&self.cfg,
|
||||||
&agent,
|
&agent,
|
||||||
@@ -903,32 +903,20 @@ impl Dashboard {
|
|||||||
{
|
{
|
||||||
Ok(outcomes) => outcomes,
|
Ok(outcomes) => outcomes,
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
tracing::warn!("Failed to coordinate backlog dispatch from dashboard: {error}");
|
tracing::warn!("Failed to coordinate backlog from dashboard: {error}");
|
||||||
self.set_operator_note(format!("global coordinate failed during dispatch: {error}"));
|
self.set_operator_note(format!("global coordinate failed: {error}"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let total_routed: usize = dispatch_outcomes.iter().map(|outcome| outcome.routed.len()).sum();
|
let total_routed: usize = outcome
|
||||||
|
.dispatched
|
||||||
let rebalance_outcomes = match manager::rebalance_all_teams(
|
|
||||||
&self.db,
|
|
||||||
&self.cfg,
|
|
||||||
&agent,
|
|
||||||
true,
|
|
||||||
lead_limit,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(outcomes) => outcomes,
|
|
||||||
Err(error) => {
|
|
||||||
tracing::warn!("Failed to coordinate backlog rebalance from dashboard: {error}");
|
|
||||||
self.set_operator_note(format!("global coordinate failed during rebalance: {error}"));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let total_rerouted: usize = rebalance_outcomes
|
|
||||||
.iter()
|
.iter()
|
||||||
.map(|outcome| outcome.rerouted.len())
|
.map(|dispatch| dispatch.routed.len())
|
||||||
|
.sum();
|
||||||
|
let total_rerouted: usize = outcome
|
||||||
|
.rebalanced
|
||||||
|
.iter()
|
||||||
|
.map(|rebalance| rebalance.rerouted.len())
|
||||||
.sum();
|
.sum();
|
||||||
|
|
||||||
let selected_session_id = self
|
let selected_session_id = self
|
||||||
@@ -944,15 +932,17 @@ impl Dashboard {
|
|||||||
self.sync_selected_lineage();
|
self.sync_selected_lineage();
|
||||||
self.refresh_logs();
|
self.refresh_logs();
|
||||||
|
|
||||||
if total_routed == 0 && total_rerouted == 0 {
|
if total_routed == 0 && total_rerouted == 0 && outcome.remaining_backlog_sessions == 0 {
|
||||||
self.set_operator_note("backlog already clear".to_string());
|
self.set_operator_note("backlog already clear".to_string());
|
||||||
} else {
|
} else {
|
||||||
self.set_operator_note(format!(
|
self.set_operator_note(format!(
|
||||||
"coordinated backlog: dispatched {} handoff(s) across {} lead(s), rebalanced {} handoff(s) across {} lead(s)",
|
"coordinated backlog: dispatched {} across {} lead(s), rebalanced {} across {} lead(s), remaining {} across {} session(s)",
|
||||||
total_routed,
|
total_routed,
|
||||||
dispatch_outcomes.len(),
|
outcome.dispatched.len(),
|
||||||
total_rerouted,
|
total_rerouted,
|
||||||
rebalance_outcomes.len()
|
outcome.rebalanced.len(),
|
||||||
|
outcome.remaining_backlog_messages,
|
||||||
|
outcome.remaining_backlog_sessions
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user