feat: surface ecc2 daemon recovery pressure

This commit is contained in:
Affaan Mustafa
2026-04-08 03:14:20 -07:00
parent 19ad704216
commit 08e9d0e28b
3 changed files with 162 additions and 17 deletions

View File

@@ -17,7 +17,11 @@ pub struct StateStore {
pub struct DaemonActivity {
pub last_dispatch_at: Option<chrono::DateTime<chrono::Utc>>,
pub last_dispatch_routed: usize,
pub last_dispatch_deferred: usize,
pub last_dispatch_leads: usize,
pub last_recovery_dispatch_at: Option<chrono::DateTime<chrono::Utc>>,
pub last_recovery_dispatch_routed: usize,
pub last_recovery_dispatch_leads: usize,
pub last_rebalance_at: Option<chrono::DateTime<chrono::Utc>>,
pub last_rebalance_rerouted: usize,
pub last_rebalance_leads: usize,
@@ -88,7 +92,11 @@ impl StateStore {
id INTEGER PRIMARY KEY CHECK(id = 1),
last_dispatch_at TEXT,
last_dispatch_routed INTEGER NOT NULL DEFAULT 0,
last_dispatch_deferred INTEGER NOT NULL DEFAULT 0,
last_dispatch_leads INTEGER NOT NULL DEFAULT 0,
last_recovery_dispatch_at TEXT,
last_recovery_dispatch_routed INTEGER NOT NULL DEFAULT 0,
last_recovery_dispatch_leads INTEGER NOT NULL DEFAULT 0,
last_rebalance_at TEXT,
last_rebalance_rerouted INTEGER NOT NULL DEFAULT 0,
last_rebalance_leads INTEGER NOT NULL DEFAULT 0
@@ -123,6 +131,42 @@ impl StateStore {
.context("Failed to add pid column to sessions table")?;
}
if !self.has_column("daemon_activity", "last_dispatch_deferred")? {
self.conn
.execute(
"ALTER TABLE daemon_activity ADD COLUMN last_dispatch_deferred INTEGER NOT NULL DEFAULT 0",
[],
)
.context("Failed to add last_dispatch_deferred column to daemon_activity table")?;
}
if !self.has_column("daemon_activity", "last_recovery_dispatch_at")? {
self.conn
.execute(
"ALTER TABLE daemon_activity ADD COLUMN last_recovery_dispatch_at TEXT",
[],
)
.context("Failed to add last_recovery_dispatch_at column to daemon_activity table")?;
}
if !self.has_column("daemon_activity", "last_recovery_dispatch_routed")? {
self.conn
.execute(
"ALTER TABLE daemon_activity ADD COLUMN last_recovery_dispatch_routed INTEGER NOT NULL DEFAULT 0",
[],
)
.context("Failed to add last_recovery_dispatch_routed column to daemon_activity table")?;
}
if !self.has_column("daemon_activity", "last_recovery_dispatch_leads")? {
self.conn
.execute(
"ALTER TABLE daemon_activity ADD COLUMN last_recovery_dispatch_leads INTEGER NOT NULL DEFAULT 0",
[],
)
.context("Failed to add last_recovery_dispatch_leads column to daemon_activity table")?;
}
Ok(())
}
@@ -513,7 +557,8 @@ impl StateStore {
pub fn daemon_activity(&self) -> Result<DaemonActivity> {
self.conn
.query_row(
"SELECT last_dispatch_at, last_dispatch_routed, last_dispatch_leads,
"SELECT last_dispatch_at, last_dispatch_routed, last_dispatch_deferred, last_dispatch_leads,
last_recovery_dispatch_at, last_recovery_dispatch_routed, last_recovery_dispatch_leads,
last_rebalance_at, last_rebalance_rerouted, last_rebalance_leads
FROM daemon_activity
WHERE id = 1",
@@ -539,22 +584,50 @@ impl StateStore {
Ok(DaemonActivity {
last_dispatch_at: parse_ts(row.get(0)?)?,
last_dispatch_routed: row.get::<_, i64>(1)? as usize,
last_dispatch_leads: row.get::<_, i64>(2)? as usize,
last_rebalance_at: parse_ts(row.get(3)?)?,
last_rebalance_rerouted: row.get::<_, i64>(4)? as usize,
last_rebalance_leads: row.get::<_, i64>(5)? as usize,
last_dispatch_deferred: row.get::<_, i64>(2)? as usize,
last_dispatch_leads: row.get::<_, i64>(3)? as usize,
last_recovery_dispatch_at: parse_ts(row.get(4)?)?,
last_recovery_dispatch_routed: row.get::<_, i64>(5)? as usize,
last_recovery_dispatch_leads: row.get::<_, i64>(6)? as usize,
last_rebalance_at: parse_ts(row.get(7)?)?,
last_rebalance_rerouted: row.get::<_, i64>(8)? as usize,
last_rebalance_leads: row.get::<_, i64>(9)? as usize,
})
},
)
.map_err(Into::into)
}
pub fn record_daemon_dispatch_pass(&self, routed: usize, leads: usize) -> Result<()> {
pub fn record_daemon_dispatch_pass(
&self,
routed: usize,
deferred: usize,
leads: usize,
) -> Result<()> {
self.conn.execute(
"UPDATE daemon_activity
SET last_dispatch_at = ?1,
last_dispatch_routed = ?2,
last_dispatch_leads = ?3
last_dispatch_deferred = ?3,
last_dispatch_leads = ?4
WHERE id = 1",
rusqlite::params![
chrono::Utc::now().to_rfc3339(),
routed as i64,
deferred as i64,
leads as i64
],
)?;
Ok(())
}
pub fn record_daemon_recovery_dispatch_pass(&self, routed: usize, leads: usize) -> Result<()> {
self.conn.execute(
"UPDATE daemon_activity
SET last_recovery_dispatch_at = ?1,
last_recovery_dispatch_routed = ?2,
last_recovery_dispatch_leads = ?3
WHERE id = 1",
rusqlite::params![chrono::Utc::now().to_rfc3339(), routed as i64, leads as i64],
)?;
@@ -948,15 +1021,20 @@ mod tests {
let tempdir = TestDir::new("store-daemon-activity")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
db.record_daemon_dispatch_pass(4, 2)?;
db.record_daemon_dispatch_pass(4, 1, 2)?;
db.record_daemon_recovery_dispatch_pass(2, 1)?;
db.record_daemon_rebalance_pass(3, 1)?;
let activity = db.daemon_activity()?;
assert_eq!(activity.last_dispatch_routed, 4);
assert_eq!(activity.last_dispatch_deferred, 1);
assert_eq!(activity.last_dispatch_leads, 2);
assert_eq!(activity.last_recovery_dispatch_routed, 2);
assert_eq!(activity.last_recovery_dispatch_leads, 1);
assert_eq!(activity.last_rebalance_rerouted, 3);
assert_eq!(activity.last_rebalance_leads, 1);
assert!(activity.last_dispatch_at.is_some());
assert!(activity.last_recovery_dispatch_at.is_some());
assert!(activity.last_rebalance_at.is_some());
Ok(())