feat: relax ecc2 stabilized cycles

This commit is contained in:
Affaan Mustafa
2026-04-08 03:40:26 -07:00
parent 40ed9c7f6a
commit 051d47eb5f

View File

@@ -177,6 +177,12 @@ where
}
let first_dispatch = dispatch().await?;
if prior_activity.stabilized_after_recovery_at().is_some() && first_dispatch.deferred == 0 {
tracing::info!(
"Skipping rebalance because stabilized dispatch cycle has no deferred handoffs"
);
return Ok((first_dispatch, 0, DispatchPassSummary::default()));
}
let rebalanced = rebalance().await?;
let recovery_dispatch = if first_dispatch.deferred > 0 && rebalanced > 0 {
let recovery = dispatch().await?;
@@ -796,6 +802,56 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn coordinate_backlog_cycle_skips_rebalance_when_stabilized_and_dispatch_is_healthy() -> Result<()> {
let cfg = Config {
auto_dispatch_unread_handoffs: true,
..Config::default()
};
let now = chrono::Utc::now();
let activity = DaemonActivity {
last_dispatch_at: Some(now + chrono::Duration::seconds(2)),
last_dispatch_routed: 2,
last_dispatch_deferred: 0,
last_dispatch_leads: 1,
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
last_recovery_dispatch_routed: 1,
last_recovery_dispatch_leads: 1,
last_rebalance_at: Some(now),
last_rebalance_rerouted: 1,
last_rebalance_leads: 1,
};
let rebalance_calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let rebalance_calls_clone = rebalance_calls.clone();
let (first, rebalanced, recovery) = coordinate_backlog_cycle_with(
&cfg,
&activity,
|| async move {
Ok(DispatchPassSummary {
routed: 1,
deferred: 0,
leads: 1,
})
},
move || {
let rebalance_calls_clone = rebalance_calls_clone.clone();
async move {
rebalance_calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(1)
}
},
|_, _| Ok(()),
)
.await?;
assert_eq!(first.routed, 1);
assert_eq!(rebalanced, 0);
assert_eq!(recovery, DispatchPassSummary::default());
assert_eq!(rebalance_calls.load(std::sync::atomic::Ordering::SeqCst), 0);
Ok(())
}
#[tokio::test]
async fn maybe_auto_rebalance_noops_when_disabled() -> Result<()> {
let path = temp_db_path();