mirror of
https://github.com/affaan-m/everything-claude-code.git
synced 2026-04-13 05:03:28 +08:00
feat: add ecc2 coordination status command
This commit is contained in:
@@ -102,6 +102,8 @@ enum Commands {
|
|||||||
#[arg(long, default_value_t = 10)]
|
#[arg(long, default_value_t = 10)]
|
||||||
lead_limit: usize,
|
lead_limit: usize,
|
||||||
},
|
},
|
||||||
|
/// Show global coordination, backlog, and daemon policy status
|
||||||
|
CoordinationStatus,
|
||||||
/// Rebalance unread handoffs across lead teams with backed-up delegates
|
/// Rebalance unread handoffs across lead teams with backed-up delegates
|
||||||
RebalanceAll {
|
RebalanceAll {
|
||||||
/// Agent type for routed delegates
|
/// Agent type for routed delegates
|
||||||
@@ -458,6 +460,10 @@ async fn main() -> Result<()> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Some(Commands::CoordinationStatus) => {
|
||||||
|
let status = session::manager::get_coordination_status(&db, &cfg)?;
|
||||||
|
println!("{status}");
|
||||||
|
}
|
||||||
Some(Commands::RebalanceAll {
|
Some(Commands::RebalanceAll {
|
||||||
agent,
|
agent,
|
||||||
worktree: use_worktree,
|
worktree: use_worktree,
|
||||||
@@ -953,6 +959,17 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cli_parses_coordination_status_command() {
|
||||||
|
let cli = Cli::try_parse_from(["ecc", "coordination-status"])
|
||||||
|
.expect("coordination-status should parse");
|
||||||
|
|
||||||
|
match cli.command {
|
||||||
|
Some(Commands::CoordinationStatus) => {}
|
||||||
|
_ => panic!("expected coordination-status subcommand"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn cli_parses_rebalance_team_command() {
|
fn cli_parses_rebalance_team_command() {
|
||||||
let cli = Cli::try_parse_from([
|
let cli = Cli::try_parse_from([
|
||||||
|
|||||||
@@ -1094,6 +1094,16 @@ pub struct CoordinateBacklogOutcome {
|
|||||||
pub remaining_saturated_sessions: usize,
|
pub remaining_saturated_sessions: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct CoordinationStatus {
|
||||||
|
pub backlog_leads: usize,
|
||||||
|
pub backlog_messages: usize,
|
||||||
|
pub absorbable_sessions: usize,
|
||||||
|
pub saturated_sessions: usize,
|
||||||
|
pub auto_dispatch_enabled: bool,
|
||||||
|
pub auto_dispatch_limit_per_session: usize,
|
||||||
|
pub daemon_activity: super::store::DaemonActivity,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
pub enum AssignmentAction {
|
pub enum AssignmentAction {
|
||||||
Spawned,
|
Spawned,
|
||||||
@@ -1106,6 +1116,25 @@ pub fn assignment_action_routes_work(action: AssignmentAction) -> bool {
|
|||||||
!matches!(action, AssignmentAction::DeferredSaturated)
|
!matches!(action, AssignmentAction::DeferredSaturated)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_coordination_status(db: &StateStore, cfg: &Config) -> Result<CoordinationStatus> {
|
||||||
|
let targets = db.unread_task_handoff_targets(db.list_sessions()?.len().max(1))?;
|
||||||
|
let pressure = summarize_backlog_pressure(db, cfg, &cfg.default_agent, &targets)?;
|
||||||
|
let backlog_messages = targets
|
||||||
|
.iter()
|
||||||
|
.map(|(_, unread_count)| *unread_count)
|
||||||
|
.sum::<usize>();
|
||||||
|
|
||||||
|
Ok(CoordinationStatus {
|
||||||
|
backlog_leads: targets.len(),
|
||||||
|
backlog_messages,
|
||||||
|
absorbable_sessions: pressure.absorbable_sessions,
|
||||||
|
saturated_sessions: pressure.saturated_sessions,
|
||||||
|
auto_dispatch_enabled: cfg.auto_dispatch_unread_handoffs,
|
||||||
|
auto_dispatch_limit_per_session: cfg.auto_dispatch_limit_per_session,
|
||||||
|
daemon_activity: db.daemon_activity()?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||||
struct BacklogPressureSummary {
|
struct BacklogPressureSummary {
|
||||||
absorbable_sessions: usize,
|
absorbable_sessions: usize,
|
||||||
@@ -1201,6 +1230,105 @@ impl fmt::Display for TeamStatus {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for CoordinationStatus {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
let stabilized = self.daemon_activity.stabilized_after_recovery_at();
|
||||||
|
let mode = if self.daemon_activity.dispatch_cooloff_active() {
|
||||||
|
"rebalance-cooloff (chronic saturation)"
|
||||||
|
} else if self.daemon_activity.prefers_rebalance_first() {
|
||||||
|
"rebalance-first (chronic saturation)"
|
||||||
|
} else if stabilized.is_some() {
|
||||||
|
"dispatch-first (stabilized)"
|
||||||
|
} else {
|
||||||
|
"dispatch-first"
|
||||||
|
};
|
||||||
|
|
||||||
|
writeln!(
|
||||||
|
f,
|
||||||
|
"Global handoff backlog: {} lead(s) / {} handoff(s) [{} absorbable, {} saturated]",
|
||||||
|
self.backlog_leads,
|
||||||
|
self.backlog_messages,
|
||||||
|
self.absorbable_sessions,
|
||||||
|
self.saturated_sessions
|
||||||
|
)?;
|
||||||
|
writeln!(
|
||||||
|
f,
|
||||||
|
"Auto-dispatch: {} @ {}/lead",
|
||||||
|
if self.auto_dispatch_enabled {
|
||||||
|
"on"
|
||||||
|
} else {
|
||||||
|
"off"
|
||||||
|
},
|
||||||
|
self.auto_dispatch_limit_per_session
|
||||||
|
)?;
|
||||||
|
writeln!(f, "Coordination mode: {mode}")?;
|
||||||
|
|
||||||
|
if self.daemon_activity.chronic_saturation_streak > 0 {
|
||||||
|
writeln!(
|
||||||
|
f,
|
||||||
|
"Chronic saturation streak: {} cycle(s)",
|
||||||
|
self.daemon_activity.chronic_saturation_streak
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.daemon_activity.operator_escalation_required() {
|
||||||
|
writeln!(
|
||||||
|
f,
|
||||||
|
"Operator escalation: chronic saturation is not clearing"
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(cleared_at) = self.daemon_activity.chronic_saturation_cleared_at() {
|
||||||
|
writeln!(
|
||||||
|
f,
|
||||||
|
"Chronic saturation cleared: {}",
|
||||||
|
cleared_at.to_rfc3339()
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(stabilized_at) = stabilized {
|
||||||
|
writeln!(f, "Recovery stabilized: {}", stabilized_at.to_rfc3339())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(last_dispatch_at) = self.daemon_activity.last_dispatch_at.as_ref() {
|
||||||
|
writeln!(
|
||||||
|
f,
|
||||||
|
"Last daemon dispatch: {} routed / {} deferred across {} lead(s) @ {}",
|
||||||
|
self.daemon_activity.last_dispatch_routed,
|
||||||
|
self.daemon_activity.last_dispatch_deferred,
|
||||||
|
self.daemon_activity.last_dispatch_leads,
|
||||||
|
last_dispatch_at.to_rfc3339()
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if stabilized.is_none() {
|
||||||
|
if let Some(last_recovery_dispatch_at) =
|
||||||
|
self.daemon_activity.last_recovery_dispatch_at.as_ref()
|
||||||
|
{
|
||||||
|
writeln!(
|
||||||
|
f,
|
||||||
|
"Last daemon recovery dispatch: {} handoff(s) across {} lead(s) @ {}",
|
||||||
|
self.daemon_activity.last_recovery_dispatch_routed,
|
||||||
|
self.daemon_activity.last_recovery_dispatch_leads,
|
||||||
|
last_recovery_dispatch_at.to_rfc3339()
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(last_rebalance_at) = self.daemon_activity.last_rebalance_at.as_ref() {
|
||||||
|
writeln!(
|
||||||
|
f,
|
||||||
|
"Last daemon rebalance: {} handoff(s) across {} lead(s) @ {}",
|
||||||
|
self.daemon_activity.last_rebalance_rerouted,
|
||||||
|
self.daemon_activity.last_rebalance_leads,
|
||||||
|
last_rebalance_at.to_rfc3339()
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn session_state_label(state: &SessionState) -> &'static str {
|
fn session_state_label(state: &SessionState) -> &'static str {
|
||||||
match state {
|
match state {
|
||||||
SessionState::Pending => "Pending",
|
SessionState::Pending => "Pending",
|
||||||
@@ -1283,6 +1411,23 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn build_daemon_activity() -> super::super::store::DaemonActivity {
|
||||||
|
let now = Utc::now();
|
||||||
|
super::super::store::DaemonActivity {
|
||||||
|
last_dispatch_at: Some(now),
|
||||||
|
last_dispatch_routed: 3,
|
||||||
|
last_dispatch_deferred: 1,
|
||||||
|
last_dispatch_leads: 2,
|
||||||
|
chronic_saturation_streak: 2,
|
||||||
|
last_recovery_dispatch_at: Some(now - Duration::seconds(5)),
|
||||||
|
last_recovery_dispatch_routed: 2,
|
||||||
|
last_recovery_dispatch_leads: 1,
|
||||||
|
last_rebalance_at: Some(now - Duration::seconds(2)),
|
||||||
|
last_rebalance_rerouted: 0,
|
||||||
|
last_rebalance_leads: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn init_git_repo(path: &Path) -> Result<()> {
|
fn init_git_repo(path: &Path) -> Result<()> {
|
||||||
fs::create_dir_all(path)?;
|
fs::create_dir_all(path)?;
|
||||||
run_git(path, ["init", "-q"])?;
|
run_git(path, ["init", "-q"])?;
|
||||||
@@ -2461,4 +2606,84 @@ mod tests {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn coordination_status_display_surfaces_mode_and_activity() {
|
||||||
|
let status = CoordinationStatus {
|
||||||
|
backlog_leads: 2,
|
||||||
|
backlog_messages: 5,
|
||||||
|
absorbable_sessions: 1,
|
||||||
|
saturated_sessions: 1,
|
||||||
|
auto_dispatch_enabled: true,
|
||||||
|
auto_dispatch_limit_per_session: 4,
|
||||||
|
daemon_activity: build_daemon_activity(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let rendered = status.to_string();
|
||||||
|
assert!(
|
||||||
|
rendered.contains(
|
||||||
|
"Global handoff backlog: 2 lead(s) / 5 handoff(s) [1 absorbable, 1 saturated]"
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assert!(rendered.contains("Auto-dispatch: on @ 4/lead"));
|
||||||
|
assert!(rendered.contains("Coordination mode: rebalance-first (chronic saturation)"));
|
||||||
|
assert!(rendered.contains("Chronic saturation streak: 2 cycle(s)"));
|
||||||
|
assert!(rendered.contains("Last daemon dispatch: 3 routed / 1 deferred across 2 lead(s)"));
|
||||||
|
assert!(rendered.contains("Last daemon recovery dispatch: 2 handoff(s) across 1 lead(s)"));
|
||||||
|
assert!(rendered.contains("Last daemon rebalance: 0 handoff(s) across 1 lead(s)"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn coordination_status_summarizes_real_handoff_backlog() -> Result<()> {
|
||||||
|
let tempdir = TestDir::new("manager-coordination-status")?;
|
||||||
|
let repo_root = tempdir.path().join("repo");
|
||||||
|
init_git_repo(&repo_root)?;
|
||||||
|
|
||||||
|
let cfg = Config {
|
||||||
|
max_parallel_sessions: 1,
|
||||||
|
..build_config(tempdir.path())
|
||||||
|
};
|
||||||
|
let db = StateStore::open(&cfg.db_path)?;
|
||||||
|
let now = Utc::now();
|
||||||
|
|
||||||
|
db.insert_session(&build_session("source", SessionState::Running, now))?;
|
||||||
|
db.insert_session(&build_session("lead-a", SessionState::Running, now))?;
|
||||||
|
db.insert_session(&build_session("lead-b", SessionState::Running, now))?;
|
||||||
|
db.insert_session(&build_session(
|
||||||
|
"delegate-b",
|
||||||
|
SessionState::Idle,
|
||||||
|
now - Duration::seconds(1),
|
||||||
|
))?;
|
||||||
|
|
||||||
|
db.send_message(
|
||||||
|
"source",
|
||||||
|
"lead-a",
|
||||||
|
"{\"task\":\"clear docs\",\"context\":\"incoming\"}",
|
||||||
|
"task_handoff",
|
||||||
|
)?;
|
||||||
|
db.send_message(
|
||||||
|
"source",
|
||||||
|
"lead-b",
|
||||||
|
"{\"task\":\"review queue\",\"context\":\"incoming\"}",
|
||||||
|
"task_handoff",
|
||||||
|
)?;
|
||||||
|
db.send_message(
|
||||||
|
"lead-b",
|
||||||
|
"delegate-b",
|
||||||
|
"{\"task\":\"delegate queue\",\"context\":\"routed\"}",
|
||||||
|
"task_handoff",
|
||||||
|
)?;
|
||||||
|
|
||||||
|
db.record_daemon_dispatch_pass(1, 1, 2)?;
|
||||||
|
|
||||||
|
let status = get_coordination_status(&db, &cfg)?;
|
||||||
|
assert_eq!(status.backlog_leads, 3);
|
||||||
|
assert_eq!(status.backlog_messages, 3);
|
||||||
|
assert_eq!(status.absorbable_sessions, 2);
|
||||||
|
assert_eq!(status.saturated_sessions, 1);
|
||||||
|
assert_eq!(status.daemon_activity.last_dispatch_routed, 1);
|
||||||
|
assert_eq!(status.daemon_activity.last_dispatch_deferred, 1);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user