mirror of
https://github.com/affaan-m/everything-claude-code.git
synced 2026-04-10 19:33:37 +08:00
feat: classify ecc2 remaining coordination pressure
This commit is contained in:
@@ -392,13 +392,15 @@ async fn main() -> Result<()> {
|
|||||||
println!("Backlog already clear");
|
println!("Backlog already clear");
|
||||||
} else {
|
} else {
|
||||||
println!(
|
println!(
|
||||||
"Coordinated backlog: dispatched {} handoff(s) across {} lead(s); rebalanced {} handoff(s) across {} lead(s); remaining {} handoff(s) across {} session(s)",
|
"Coordinated backlog: dispatched {} handoff(s) across {} lead(s); rebalanced {} handoff(s) across {} lead(s); remaining {} handoff(s) across {} session(s) [{} absorbable, {} saturated]",
|
||||||
total_routed,
|
total_routed,
|
||||||
outcome.dispatched.len(),
|
outcome.dispatched.len(),
|
||||||
total_rerouted,
|
total_rerouted,
|
||||||
outcome.rebalanced.len(),
|
outcome.rebalanced.len(),
|
||||||
outcome.remaining_backlog_messages,
|
outcome.remaining_backlog_messages,
|
||||||
outcome.remaining_backlog_sessions
|
outcome.remaining_backlog_sessions,
|
||||||
|
outcome.remaining_absorbable_sessions,
|
||||||
|
outcome.remaining_saturated_sessions
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -212,6 +212,7 @@ pub async fn coordinate_backlog(
|
|||||||
let dispatched = auto_dispatch_backlog(db, cfg, agent_type, use_worktree, lead_limit).await?;
|
let dispatched = auto_dispatch_backlog(db, cfg, agent_type, use_worktree, lead_limit).await?;
|
||||||
let rebalanced = rebalance_all_teams(db, cfg, agent_type, use_worktree, lead_limit).await?;
|
let rebalanced = rebalance_all_teams(db, cfg, agent_type, use_worktree, lead_limit).await?;
|
||||||
let remaining_targets = db.unread_task_handoff_targets(db.list_sessions()?.len().max(1))?;
|
let remaining_targets = db.unread_task_handoff_targets(db.list_sessions()?.len().max(1))?;
|
||||||
|
let pressure = summarize_backlog_pressure(db, cfg, agent_type, &remaining_targets)?;
|
||||||
let remaining_backlog_sessions = remaining_targets.len();
|
let remaining_backlog_sessions = remaining_targets.len();
|
||||||
let remaining_backlog_messages = remaining_targets
|
let remaining_backlog_messages = remaining_targets
|
||||||
.iter()
|
.iter()
|
||||||
@@ -223,6 +224,8 @@ pub async fn coordinate_backlog(
|
|||||||
rebalanced,
|
rebalanced,
|
||||||
remaining_backlog_sessions,
|
remaining_backlog_sessions,
|
||||||
remaining_backlog_messages,
|
remaining_backlog_messages,
|
||||||
|
remaining_absorbable_sessions: pressure.absorbable_sessions,
|
||||||
|
remaining_saturated_sessions: pressure.saturated_sessions,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -811,6 +814,33 @@ fn direct_delegate_sessions(db: &StateStore, lead_id: &str, agent_type: &str) ->
|
|||||||
Ok(sessions)
|
Ok(sessions)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn summarize_backlog_pressure(
|
||||||
|
db: &StateStore,
|
||||||
|
cfg: &Config,
|
||||||
|
agent_type: &str,
|
||||||
|
targets: &[(String, usize)],
|
||||||
|
) -> Result<BacklogPressureSummary> {
|
||||||
|
let unread_counts = db.unread_message_counts()?;
|
||||||
|
let mut summary = BacklogPressureSummary::default();
|
||||||
|
|
||||||
|
for (session_id, _) in targets {
|
||||||
|
let delegates = direct_delegate_sessions(db, session_id, agent_type)?;
|
||||||
|
let has_clear_idle_delegate = delegates.iter().any(|delegate| {
|
||||||
|
delegate.state == SessionState::Idle
|
||||||
|
&& unread_counts.get(&delegate.id).copied().unwrap_or(0) == 0
|
||||||
|
});
|
||||||
|
let has_capacity = delegates.len() < cfg.max_parallel_sessions;
|
||||||
|
|
||||||
|
if has_clear_idle_delegate || has_capacity {
|
||||||
|
summary.absorbable_sessions += 1;
|
||||||
|
} else {
|
||||||
|
summary.saturated_sessions += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(summary)
|
||||||
|
}
|
||||||
|
|
||||||
fn send_task_handoff(
|
fn send_task_handoff(
|
||||||
db: &StateStore,
|
db: &StateStore,
|
||||||
from_session: &Session,
|
from_session: &Session,
|
||||||
@@ -1035,6 +1065,8 @@ pub struct CoordinateBacklogOutcome {
|
|||||||
pub rebalanced: Vec<LeadRebalanceOutcome>,
|
pub rebalanced: Vec<LeadRebalanceOutcome>,
|
||||||
pub remaining_backlog_sessions: usize,
|
pub remaining_backlog_sessions: usize,
|
||||||
pub remaining_backlog_messages: usize,
|
pub remaining_backlog_messages: usize,
|
||||||
|
pub remaining_absorbable_sessions: usize,
|
||||||
|
pub remaining_saturated_sessions: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
@@ -1044,6 +1076,12 @@ pub enum AssignmentAction {
|
|||||||
ReusedActive,
|
ReusedActive,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||||
|
struct BacklogPressureSummary {
|
||||||
|
absorbable_sessions: usize,
|
||||||
|
saturated_sessions: usize,
|
||||||
|
}
|
||||||
|
|
||||||
struct DelegatedSessionSummary {
|
struct DelegatedSessionSummary {
|
||||||
depth: usize,
|
depth: usize,
|
||||||
unread_messages: usize,
|
unread_messages: usize,
|
||||||
@@ -1975,6 +2013,77 @@ mod tests {
|
|||||||
assert_eq!(outcome.rebalanced.len(), 0);
|
assert_eq!(outcome.rebalanced.len(), 0);
|
||||||
assert_eq!(outcome.remaining_backlog_sessions, 2);
|
assert_eq!(outcome.remaining_backlog_sessions, 2);
|
||||||
assert_eq!(outcome.remaining_backlog_messages, 2);
|
assert_eq!(outcome.remaining_backlog_messages, 2);
|
||||||
|
assert_eq!(outcome.remaining_absorbable_sessions, 2);
|
||||||
|
assert_eq!(outcome.remaining_saturated_sessions, 0);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "current_thread")]
|
||||||
|
async fn coordinate_backlog_classifies_remaining_saturated_pressure() -> Result<()> {
|
||||||
|
let tempdir = TestDir::new("manager-coordinate-saturated")?;
|
||||||
|
let repo_root = tempdir.path().join("repo");
|
||||||
|
init_git_repo(&repo_root)?;
|
||||||
|
|
||||||
|
let mut cfg = build_config(tempdir.path());
|
||||||
|
cfg.max_parallel_sessions = 1;
|
||||||
|
cfg.auto_dispatch_limit_per_session = 1;
|
||||||
|
let db = StateStore::open(&cfg.db_path)?;
|
||||||
|
let now = Utc::now();
|
||||||
|
|
||||||
|
db.insert_session(&Session {
|
||||||
|
id: "worker".to_string(),
|
||||||
|
task: "worker task".to_string(),
|
||||||
|
agent_type: "claude".to_string(),
|
||||||
|
working_dir: repo_root.clone(),
|
||||||
|
state: SessionState::Running,
|
||||||
|
pid: Some(42),
|
||||||
|
worktree: None,
|
||||||
|
created_at: now - Duration::minutes(3),
|
||||||
|
updated_at: now - Duration::minutes(3),
|
||||||
|
metrics: SessionMetrics::default(),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
db.insert_session(&Session {
|
||||||
|
id: "worker-child".to_string(),
|
||||||
|
task: "delegate task".to_string(),
|
||||||
|
agent_type: "claude".to_string(),
|
||||||
|
working_dir: repo_root.clone(),
|
||||||
|
state: SessionState::Running,
|
||||||
|
pid: Some(43),
|
||||||
|
worktree: None,
|
||||||
|
created_at: now - Duration::minutes(2),
|
||||||
|
updated_at: now - Duration::minutes(2),
|
||||||
|
metrics: SessionMetrics::default(),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
db.send_message(
|
||||||
|
"worker",
|
||||||
|
"worker-child",
|
||||||
|
"{\"task\":\"seed delegate\",\"context\":\"Delegated from worker\"}",
|
||||||
|
"task_handoff",
|
||||||
|
)?;
|
||||||
|
let _ = db.mark_messages_read("worker-child")?;
|
||||||
|
|
||||||
|
db.send_message(
|
||||||
|
"planner",
|
||||||
|
"worker",
|
||||||
|
"{\"task\":\"task-a\",\"context\":\"Inbound\"}",
|
||||||
|
"task_handoff",
|
||||||
|
)?;
|
||||||
|
db.send_message(
|
||||||
|
"planner",
|
||||||
|
"worker",
|
||||||
|
"{\"task\":\"task-b\",\"context\":\"Inbound\"}",
|
||||||
|
"task_handoff",
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let outcome = coordinate_backlog(&db, &cfg, "claude", true, 10).await?;
|
||||||
|
|
||||||
|
assert_eq!(outcome.remaining_backlog_sessions, 1);
|
||||||
|
assert_eq!(outcome.remaining_backlog_messages, 2);
|
||||||
|
assert_eq!(outcome.remaining_absorbable_sessions, 0);
|
||||||
|
assert_eq!(outcome.remaining_saturated_sessions, 1);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -936,13 +936,15 @@ impl Dashboard {
|
|||||||
self.set_operator_note("backlog already clear".to_string());
|
self.set_operator_note("backlog already clear".to_string());
|
||||||
} else {
|
} else {
|
||||||
self.set_operator_note(format!(
|
self.set_operator_note(format!(
|
||||||
"coordinated backlog: dispatched {} across {} lead(s), rebalanced {} across {} lead(s), remaining {} across {} session(s)",
|
"coordinated backlog: dispatched {} across {} lead(s), rebalanced {} across {} lead(s), remaining {} across {} session(s) [{} absorbable, {} saturated]",
|
||||||
total_routed,
|
total_routed,
|
||||||
outcome.dispatched.len(),
|
outcome.dispatched.len(),
|
||||||
total_rerouted,
|
total_rerouted,
|
||||||
outcome.rebalanced.len(),
|
outcome.rebalanced.len(),
|
||||||
outcome.remaining_backlog_messages,
|
outcome.remaining_backlog_messages,
|
||||||
outcome.remaining_backlog_sessions
|
outcome.remaining_backlog_sessions,
|
||||||
|
outcome.remaining_absorbable_sessions,
|
||||||
|
outcome.remaining_saturated_sessions
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user