feat: prioritize ecc2 handoff queues

This commit is contained in:
Affaan Mustafa
2026-04-10 08:16:17 -07:00
parent d84c64fa0e
commit 52371f5016
5 changed files with 228 additions and 17 deletions
+81 -14
View File
@@ -8,6 +8,7 @@ use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::time::Duration;
use crate::comms;
use crate::config::Config;
use crate::observability::{ToolCallEvent, ToolLogEntry, ToolLogPage};
@@ -1885,11 +1886,10 @@ impl StateStore {
"SELECT id, from_session, to_session, content, msg_type, read, timestamp
FROM messages
WHERE to_session = ?1 AND msg_type = 'task_handoff' AND read = 0
ORDER BY id ASC
LIMIT ?2",
ORDER BY id ASC",
)?;
let messages = stmt.query_map(rusqlite::params![session_id, limit as i64], |row| {
let messages = stmt.query_map(rusqlite::params![session_id], |row| {
let timestamp: String = row.get(6)?;
Ok(SessionMessage {
@@ -1905,7 +1905,16 @@ impl StateStore {
})
})?;
messages.collect::<Result<Vec<_>, _>>().map_err(Into::into)
let mut messages = messages.collect::<Result<Vec<_>, _>>()?;
messages.sort_by(|left, right| {
let left_priority = comms::handoff_priority(&left.content);
let right_priority = comms::handoff_priority(&right.content);
Reverse(left_priority)
.cmp(&Reverse(right_priority))
.then_with(|| left.id.cmp(&right.id))
});
messages.truncate(limit);
Ok(messages)
}
pub fn unread_task_handoff_count(&self, session_id: &str) -> Result<usize> {
@@ -1923,19 +1932,49 @@ impl StateStore {
pub fn unread_task_handoff_targets(&self, limit: usize) -> Result<Vec<(String, usize)>> {
let mut stmt = self.conn.prepare(
"SELECT to_session, COUNT(*) as unread_count
"SELECT to_session, content, id
FROM messages
WHERE msg_type = 'task_handoff' AND read = 0
GROUP BY to_session
ORDER BY unread_count DESC, MAX(id) ASC
LIMIT ?1",
ORDER BY id ASC",
)?;
let targets = stmt.query_map(rusqlite::params![limit as i64], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)? as usize))
let targets = stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, i64>(2)?,
))
})?;
let mut aggregated: HashMap<String, (usize, comms::TaskPriority, i64)> = HashMap::new();
for (to_session, content, id) in targets.collect::<Result<Vec<_>, _>>()? {
let priority = comms::handoff_priority(&content);
aggregated
.entry(to_session)
.and_modify(|entry| {
entry.0 += 1;
if priority > entry.1 {
entry.1 = priority;
}
if id < entry.2 {
entry.2 = id;
}
})
.or_insert((1, priority, id));
}
targets.collect::<Result<Vec<_>, _>>().map_err(Into::into)
let mut targets = aggregated.into_iter().collect::<Vec<_>>();
targets.sort_by(|(left_session, left), (right_session, right)| {
Reverse(left.1)
.cmp(&Reverse(right.1))
.then_with(|| Reverse(left.0).cmp(&Reverse(right.0)))
.then_with(|| left.2.cmp(&right.2))
.then_with(|| left_session.cmp(right_session))
});
targets.truncate(limit);
Ok(targets
.into_iter()
.map(|(session_id, (count, _, _))| (session_id, count))
.collect())
}
pub fn mark_messages_read(&self, session_id: &str) -> Result<usize> {
@@ -5521,7 +5560,19 @@ mod tests {
db.send_message(
"planner",
"worker-3",
"{\"task\":\"Check billing\",\"context\":\"Delegated from planner\"}",
"{\"task\":\"Check billing\",\"context\":\"Delegated from planner\",\"priority\":\"high\"}",
"task_handoff",
)?;
db.send_message(
"planner",
"worker-4",
"{\"task\":\"Low priority follow-up\",\"context\":\"Delegated from planner\",\"priority\":\"low\"}",
"task_handoff",
)?;
db.send_message(
"planner",
"worker-4",
"{\"task\":\"Critical production incident\",\"context\":\"Delegated from planner\",\"priority\":\"critical\"}",
"task_handoff",
)?;
@@ -5531,12 +5582,28 @@ mod tests {
);
assert_eq!(
db.delegated_children("planner", 10)?,
vec!["worker-3".to_string(), "worker-2".to_string(),]
vec![
"worker-4".to_string(),
"worker-3".to_string(),
"worker-2".to_string(),
]
);
assert_eq!(
db.unread_task_handoff_targets(10)?,
vec![("worker-2".to_string(), 1), ("worker-3".to_string(), 1),]
vec![
("worker-4".to_string(), 2),
("worker-3".to_string(), 1),
("worker-2".to_string(), 1),
]
);
let worker_4_handoffs = db.unread_task_handoffs_for_session("worker-4", 10)?;
assert_eq!(worker_4_handoffs.len(), 2);
assert!(worker_4_handoffs[0]
.content
.contains("Critical production incident"));
assert!(worker_4_handoffs[1]
.content
.contains("Low priority follow-up"));
let planner_entities = db.list_context_entities(Some("planner"), Some("session"), 10)?;
assert_eq!(planner_entities.len(), 1);