From bcf8d0617e558b92a51546a982fc471b8320ddc6 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Wed, 8 Apr 2026 13:19:24 -0700 Subject: [PATCH] feat: add ecc2 coordination status health metadata --- ecc2/src/main.rs | 19 ++++++--- ecc2/src/session/manager.rs | 84 ++++++++++++++++++++++++++++++++----- 2 files changed, 87 insertions(+), 16 deletions(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 2f17ea27..aef8bd09 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -692,12 +692,11 @@ fn format_coordination_status( } fn coordination_status_exit_code(status: &session::manager::CoordinationStatus) -> i32 { - if status.daemon_activity.operator_escalation_required() || status.saturated_sessions > 0 { - 2 - } else if status.backlog_messages > 0 { - 1 - } else { - 0 + match status.health { + session::manager::CoordinationHealth::Healthy => 0, + session::manager::CoordinationHealth::BacklogAbsorbable => 1, + session::manager::CoordinationHealth::Saturated + | session::manager::CoordinationHealth::EscalationRequired => 2, } } @@ -1039,6 +1038,9 @@ mod tests { backlog_messages: 5, absorbable_sessions: 1, saturated_sessions: 1, + mode: session::manager::CoordinationMode::RebalanceFirstChronicSaturation, + health: session::manager::CoordinationHealth::Saturated, + operator_escalation_required: false, auto_dispatch_enabled: true, auto_dispatch_limit_per_session: 4, daemon_activity: session::store::DaemonActivity { @@ -1065,6 +1067,9 @@ mod tests { backlog_messages: 0, absorbable_sessions: 0, saturated_sessions: 0, + mode: session::manager::CoordinationMode::DispatchFirst, + health: session::manager::CoordinationHealth::Healthy, + operator_escalation_required: false, auto_dispatch_enabled: false, auto_dispatch_limit_per_session: 5, daemon_activity: Default::default(), @@ -1075,12 +1080,14 @@ mod tests { backlog_messages: 2, backlog_leads: 1, absorbable_sessions: 1, + health: session::manager::CoordinationHealth::BacklogAbsorbable, ..clear.clone() }; assert_eq!(coordination_status_exit_code(&absorbable), 1); let saturated = session::manager::CoordinationStatus { saturated_sessions: 1, + health: session::manager::CoordinationHealth::Saturated, ..absorbable }; assert_eq!(coordination_status_exit_code(&saturated), 2); diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 90f924e7..f8e62131 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -1101,11 +1101,32 @@ pub struct CoordinationStatus { pub backlog_messages: usize, pub absorbable_sessions: usize, pub saturated_sessions: usize, + pub mode: CoordinationMode, + pub health: CoordinationHealth, + pub operator_escalation_required: bool, pub auto_dispatch_enabled: bool, pub auto_dispatch_limit_per_session: usize, pub daemon_activity: super::store::DaemonActivity, } +#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum CoordinationMode { + DispatchFirst, + DispatchFirstStabilized, + RebalanceFirstChronicSaturation, + RebalanceCooloffChronicSaturation, +} + +#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum CoordinationHealth { + Healthy, + BacklogAbsorbable, + Saturated, + EscalationRequired, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum AssignmentAction { Spawned, @@ -1118,6 +1139,34 @@ pub fn assignment_action_routes_work(action: AssignmentAction) -> bool { !matches!(action, AssignmentAction::DeferredSaturated) } +fn coordination_mode(activity: &super::store::DaemonActivity) -> CoordinationMode { + if activity.dispatch_cooloff_active() { + CoordinationMode::RebalanceCooloffChronicSaturation + } else if activity.prefers_rebalance_first() { + CoordinationMode::RebalanceFirstChronicSaturation + } else if activity.stabilized_after_recovery_at().is_some() { + CoordinationMode::DispatchFirstStabilized + } else { + CoordinationMode::DispatchFirst + } +} + +fn coordination_health( + backlog_messages: usize, + saturated_sessions: usize, + activity: &super::store::DaemonActivity, +) -> CoordinationHealth { + if activity.operator_escalation_required() { + CoordinationHealth::EscalationRequired + } else if saturated_sessions > 0 { + CoordinationHealth::Saturated + } else if backlog_messages > 0 { + CoordinationHealth::BacklogAbsorbable + } else { + CoordinationHealth::Healthy + } +} + pub fn get_coordination_status(db: &StateStore, cfg: &Config) -> Result { let targets = db.unread_task_handoff_targets(db.list_sessions()?.len().max(1))?; let pressure = summarize_backlog_pressure(db, cfg, &cfg.default_agent, &targets)?; @@ -1125,15 +1174,23 @@ pub fn get_coordination_status(db: &StateStore, cfg: &Config) -> Result(); + let daemon_activity = db.daemon_activity()?; Ok(CoordinationStatus { backlog_leads: targets.len(), backlog_messages, absorbable_sessions: pressure.absorbable_sessions, saturated_sessions: pressure.saturated_sessions, + mode: coordination_mode(&daemon_activity), + health: coordination_health( + backlog_messages, + pressure.saturated_sessions, + &daemon_activity, + ), + operator_escalation_required: daemon_activity.operator_escalation_required(), auto_dispatch_enabled: cfg.auto_dispatch_unread_handoffs, auto_dispatch_limit_per_session: cfg.auto_dispatch_limit_per_session, - daemon_activity: db.daemon_activity()?, + daemon_activity, }) } @@ -1235,14 +1292,15 @@ impl fmt::Display for TeamStatus { impl fmt::Display for CoordinationStatus { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let stabilized = self.daemon_activity.stabilized_after_recovery_at(); - let mode = if self.daemon_activity.dispatch_cooloff_active() { - "rebalance-cooloff (chronic saturation)" - } else if self.daemon_activity.prefers_rebalance_first() { - "rebalance-first (chronic saturation)" - } else if stabilized.is_some() { - "dispatch-first (stabilized)" - } else { - "dispatch-first" + let mode = match self.mode { + CoordinationMode::DispatchFirst => "dispatch-first", + CoordinationMode::DispatchFirstStabilized => "dispatch-first (stabilized)", + CoordinationMode::RebalanceFirstChronicSaturation => { + "rebalance-first (chronic saturation)" + } + CoordinationMode::RebalanceCooloffChronicSaturation => { + "rebalance-cooloff (chronic saturation)" + } }; writeln!( @@ -1273,7 +1331,7 @@ impl fmt::Display for CoordinationStatus { )?; } - if self.daemon_activity.operator_escalation_required() { + if self.operator_escalation_required { writeln!( f, "Operator escalation: chronic saturation is not clearing" @@ -2616,6 +2674,9 @@ mod tests { backlog_messages: 5, absorbable_sessions: 1, saturated_sessions: 1, + mode: CoordinationMode::RebalanceFirstChronicSaturation, + health: CoordinationHealth::Saturated, + operator_escalation_required: false, auto_dispatch_enabled: true, auto_dispatch_limit_per_session: 4, daemon_activity: build_daemon_activity(), @@ -2683,6 +2744,9 @@ mod tests { assert_eq!(status.backlog_messages, 3); assert_eq!(status.absorbable_sessions, 2); assert_eq!(status.saturated_sessions, 1); + assert_eq!(status.mode, CoordinationMode::RebalanceFirstChronicSaturation); + assert_eq!(status.health, CoordinationHealth::Saturated); + assert!(!status.operator_escalation_required); assert_eq!(status.daemon_activity.last_dispatch_routed, 1); assert_eq!(status.daemon_activity.last_dispatch_deferred, 1);