feat: prefer ecc2 rebalance after chronic saturation

This commit is contained in:
Affaan Mustafa
2026-04-08 03:17:44 -07:00
parent 08e9d0e28b
commit f498dc0971

View File

@@ -111,8 +111,10 @@ async fn maybe_auto_dispatch(db: &StateStore, cfg: &Config) -> Result<usize> {
}
async fn coordinate_backlog_cycle(db: &StateStore, cfg: &Config) -> Result<()> {
let activity = db.daemon_activity()?;
coordinate_backlog_cycle_with(
cfg,
&activity,
|| {
maybe_auto_dispatch_with_recorder(cfg, || {
manager::auto_dispatch_backlog(
@@ -143,6 +145,7 @@ async fn coordinate_backlog_cycle(db: &StateStore, cfg: &Config) -> Result<()> {
async fn coordinate_backlog_cycle_with<DF, DFut, RF, RFut, Rec>(
_cfg: &Config,
prior_activity: &super::store::DaemonActivity,
dispatch: DF,
rebalance: RF,
mut record_recovery: Rec,
@@ -154,6 +157,12 @@ where
RFut: Future<Output = Result<usize>>,
Rec: FnMut(usize, usize) -> Result<()>,
{
if should_rebalance_first(prior_activity) {
let rebalanced = rebalance().await?;
let first_dispatch = dispatch().await?;
return Ok((first_dispatch, rebalanced, DispatchPassSummary::default()));
}
let first_dispatch = dispatch().await?;
let rebalanced = rebalance().await?;
let recovery_dispatch = if first_dispatch.deferred > 0 && rebalanced > 0 {
@@ -173,6 +182,21 @@ where
Ok((first_dispatch, rebalanced, recovery_dispatch))
}
fn should_rebalance_first(activity: &super::store::DaemonActivity) -> bool {
if activity.last_dispatch_deferred == 0 {
return false;
}
match (
activity.last_dispatch_at.as_ref(),
activity.last_recovery_dispatch_at.as_ref(),
) {
(Some(dispatch_at), Some(recovery_at)) => recovery_at < dispatch_at,
(Some(_), None) => true,
_ => false,
}
}
async fn maybe_auto_dispatch_with<F, Fut>(cfg: &Config, dispatch: F) -> Result<usize>
where
F: Fn() -> Fut,
@@ -317,6 +341,7 @@ mod tests {
AssignmentAction, InboxDrainOutcome, LeadDispatchOutcome, LeadRebalanceOutcome,
RebalanceOutcome,
};
use crate::session::store::DaemonActivity;
use crate::session::{Session, SessionMetrics, SessionState};
use std::path::PathBuf;
@@ -504,11 +529,13 @@ mod tests {
auto_dispatch_unread_handoffs: true,
..Config::default()
};
let activity = DaemonActivity::default();
let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let calls_clone = calls.clone();
let (first, rebalanced, recovery) = coordinate_backlog_cycle_with(
&cfg,
&activity,
move || {
let calls_clone = calls_clone.clone();
async move {
@@ -545,11 +572,13 @@ mod tests {
auto_dispatch_unread_handoffs: true,
..Config::default()
};
let activity = DaemonActivity::default();
let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let calls_clone = calls.clone();
let (first, rebalanced, recovery) = coordinate_backlog_cycle_with(
&cfg,
&activity,
move || {
let calls_clone = calls_clone.clone();
async move {
@@ -579,6 +608,7 @@ mod tests {
auto_dispatch_unread_handoffs: true,
..Config::default()
};
let activity = DaemonActivity::default();
let recorded = std::sync::Arc::new(std::sync::Mutex::new(None));
let recorded_clone = recorded.clone();
let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
@@ -586,6 +616,7 @@ mod tests {
let (_first, _rebalanced, recovery) = coordinate_backlog_cycle_with(
&cfg,
&activity,
move || {
let calls_clone = calls_clone.clone();
async move {
@@ -617,6 +648,89 @@ mod tests {
Ok(())
}
#[test]
fn should_rebalance_first_only_after_unrecovered_deferred_pressure() {
let now = chrono::Utc::now();
assert!(!should_rebalance_first(&DaemonActivity::default()));
let unresolved = DaemonActivity {
last_dispatch_at: Some(now),
last_dispatch_routed: 0,
last_dispatch_deferred: 2,
last_dispatch_leads: 1,
last_recovery_dispatch_at: None,
last_recovery_dispatch_routed: 0,
last_recovery_dispatch_leads: 0,
last_rebalance_at: None,
last_rebalance_rerouted: 0,
last_rebalance_leads: 0,
};
assert!(should_rebalance_first(&unresolved));
let recovered = DaemonActivity {
last_recovery_dispatch_at: Some(now + chrono::Duration::seconds(1)),
last_recovery_dispatch_routed: 1,
..unresolved.clone()
};
assert!(!should_rebalance_first(&recovered));
}
#[tokio::test]
async fn coordinate_backlog_cycle_rebalances_first_after_unrecovered_deferred_pressure() -> Result<()> {
let cfg = Config {
auto_dispatch_unread_handoffs: true,
..Config::default()
};
let now = chrono::Utc::now();
let activity = DaemonActivity {
last_dispatch_at: Some(now),
last_dispatch_routed: 0,
last_dispatch_deferred: 2,
last_dispatch_leads: 1,
last_recovery_dispatch_at: None,
last_recovery_dispatch_routed: 0,
last_recovery_dispatch_leads: 0,
last_rebalance_at: None,
last_rebalance_rerouted: 0,
last_rebalance_leads: 0,
};
let order = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let dispatch_order = order.clone();
let rebalance_order = order.clone();
let (first, rebalanced, recovery) = coordinate_backlog_cycle_with(
&cfg,
&activity,
move || {
let dispatch_order = dispatch_order.clone();
async move {
dispatch_order.lock().unwrap().push("dispatch");
Ok(DispatchPassSummary {
routed: 1,
deferred: 0,
leads: 1,
})
}
},
move || {
let rebalance_order = rebalance_order.clone();
async move {
rebalance_order.lock().unwrap().push("rebalance");
Ok(1)
}
},
|_, _| Ok(()),
)
.await?;
assert_eq!(*order.lock().unwrap(), vec!["rebalance", "dispatch"]);
assert_eq!(first.routed, 1);
assert_eq!(rebalanced, 1);
assert_eq!(recovery, DispatchPassSummary::default());
Ok(())
}
#[tokio::test]
async fn maybe_auto_rebalance_noops_when_disabled() -> Result<()> {
let path = temp_db_path();