feat: add ecc2 graph coordination edges

This commit is contained in:
Affaan Mustafa
2026-04-10 04:30:32 -07:00
parent 315b87d391
commit beaba1ca15
3 changed files with 159 additions and 29 deletions

View File

@@ -2266,6 +2266,7 @@ fn format_graph_sync_stats_human(
format!("- sessions scanned {}", stats.sessions_scanned),
format!("- decisions processed {}", stats.decisions_processed),
format!("- file events processed {}", stats.file_events_processed),
format!("- messages processed {}", stats.messages_processed),
]
.join("\n")
}
@@ -4202,6 +4203,7 @@ mod tests {
sessions_scanned: 2,
decisions_processed: 3,
file_events_processed: 5,
messages_processed: 4,
},
Some("sess-12345678"),
);
@@ -4210,6 +4212,7 @@ mod tests {
assert!(text.contains("- sessions scanned 2"));
assert!(text.contains("- decisions processed 3"));
assert!(text.contains("- file events processed 5"));
assert!(text.contains("- messages processed 4"));
}
#[test]

View File

@@ -195,6 +195,7 @@ pub struct ContextGraphSyncStats {
pub sessions_scanned: usize,
pub decisions_processed: usize,
pub file_events_processed: usize,
pub messages_processed: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]

View File

@@ -1334,46 +1334,88 @@ impl StateStore {
}
fn sync_context_graph_session(&self, session_id: &str) -> Result<ContextGraphEntity> {
let session = self
.get_session(session_id)?
.ok_or_else(|| anyhow::anyhow!("Session not found for context graph sync: {session_id}"))?;
let session = self.get_session(session_id)?;
let mut metadata = BTreeMap::new();
metadata.insert("task".to_string(), session.task.clone());
metadata.insert("project".to_string(), session.project.clone());
metadata.insert("task_group".to_string(), session.task_group.clone());
metadata.insert("agent_type".to_string(), session.agent_type.clone());
metadata.insert("state".to_string(), session.state.to_string());
metadata.insert(
"working_dir".to_string(),
session.working_dir.display().to_string(),
);
if let Some(pid) = session.pid {
metadata.insert("pid".to_string(), pid.to_string());
}
if let Some(worktree) = &session.worktree {
let persisted_session_id = if session.is_some() {
Some(session_id)
} else {
None
};
let summary = if let Some(session) = session {
metadata.insert("task".to_string(), session.task.clone());
metadata.insert("project".to_string(), session.project.clone());
metadata.insert("task_group".to_string(), session.task_group.clone());
metadata.insert("agent_type".to_string(), session.agent_type.clone());
metadata.insert("state".to_string(), session.state.to_string());
metadata.insert(
"worktree_path".to_string(),
worktree.path.display().to_string(),
"working_dir".to_string(),
session.working_dir.display().to_string(),
);
metadata.insert("worktree_branch".to_string(), worktree.branch.clone());
metadata.insert("base_branch".to_string(), worktree.base_branch.clone());
}
if let Some(pid) = session.pid {
metadata.insert("pid".to_string(), pid.to_string());
}
if let Some(worktree) = &session.worktree {
metadata.insert(
"worktree_path".to_string(),
worktree.path.display().to_string(),
);
metadata.insert("worktree_branch".to_string(), worktree.branch.clone());
metadata.insert("base_branch".to_string(), worktree.base_branch.clone());
}
let summary = format!(
"{} | {} | {} / {}",
session.state, session.agent_type, session.project, session.task_group
);
format!(
"{} | {} | {} / {}",
session.state, session.agent_type, session.project, session.task_group
)
} else {
metadata.insert("state".to_string(), "unknown".to_string());
"session placeholder".to_string()
};
self.upsert_context_entity(
Some(&session.id),
persisted_session_id,
"session",
&session.id,
session_id,
None,
&summary,
&metadata,
)
}
fn sync_context_graph_message(
&self,
from_session_id: &str,
to_session_id: &str,
content: &str,
msg_type: &str,
) -> Result<()> {
let relation_session_id = self
.get_session(from_session_id)?
.map(|session| session.id)
.filter(|id| !id.is_empty());
let from_entity = self.sync_context_graph_session(from_session_id)?;
let to_entity = self.sync_context_graph_session(to_session_id)?;
let relation_type = match msg_type {
"task_handoff" => "delegates_to",
"query" => "queries",
"response" => "responds_to",
"completed" => "completed_for",
"conflict" => "conflicts_with",
other => other,
};
let summary = crate::comms::preview(msg_type, content);
self.upsert_context_relation(
relation_session_id.as_deref(),
from_entity.id,
to_entity.id,
relation_type,
&summary,
)?;
Ok(())
}
pub fn increment_tool_calls(&self, session_id: &str) -> Result<()> {
self.conn.execute(
"UPDATE sessions
@@ -1503,9 +1545,45 @@ impl StateStore {
VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![from, to, content, msg_type, chrono::Utc::now().to_rfc3339()],
)?;
self.sync_context_graph_message(from, to, content, msg_type)?;
Ok(())
}
fn list_messages_sent_by_session(
&self,
session_id: &str,
limit: usize,
) -> Result<Vec<SessionMessage>> {
let mut stmt = self.conn.prepare(
"SELECT id, from_session, to_session, content, msg_type, read, timestamp
FROM messages
WHERE from_session = ?1
ORDER BY id DESC
LIMIT ?2",
)?;
let mut messages = stmt
.query_map(rusqlite::params![session_id, limit as i64], |row| {
let timestamp: String = row.get(6)?;
Ok(SessionMessage {
id: row.get(0)?,
from_session: row.get(1)?,
to_session: row.get(2)?,
content: row.get(3)?,
msg_type: row.get(4)?,
read: row.get::<_, i64>(5)? != 0,
timestamp: chrono::DateTime::parse_from_rfc3339(&timestamp)
.unwrap_or_default()
.with_timezone(&chrono::Utc),
})
})?
.collect::<Result<Vec<_>, _>>()?;
messages.reverse();
Ok(messages)
}
pub fn list_messages_for_session(
&self,
session_id: &str,
@@ -1845,6 +1923,16 @@ impl StateStore {
self.sync_context_graph_file_event(&session.id, "history", &persisted)?;
stats.file_events_processed = stats.file_events_processed.saturating_add(1);
}
for message in self.list_messages_sent_by_session(&session.id, per_session_limit)? {
self.sync_context_graph_message(
&message.from_session,
&message.to_session,
&message.content,
&message.msg_type,
)?;
stats.messages_processed = stats.messages_processed.saturating_add(1);
}
}
Ok(stats)
@@ -4020,11 +4108,23 @@ mod tests {
"[{\"path\":\"src/backfill.rs\",\"action\":\"modify\"}]",
],
)?;
db.conn.execute(
"INSERT INTO messages (from_session, to_session, content, msg_type, timestamp)
VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![
"session-1",
"session-2",
"{\"task\":\"Review backfill output\",\"context\":\"graph sync\"}",
"task_handoff",
"2026-04-10T00:02:00Z",
],
)?;
let stats = db.sync_context_graph_history(Some("session-1"), 10)?;
assert_eq!(stats.sessions_scanned, 1);
assert_eq!(stats.decisions_processed, 1);
assert_eq!(stats.file_events_processed, 1);
assert_eq!(stats.messages_processed, 1);
let entities = db.list_context_entities(Some("session-1"), None, 10)?;
assert!(entities
@@ -4038,9 +4138,12 @@ mod tests {
.find(|entity| entity.entity_type == "session" && entity.name == "session-1")
.expect("session entity should exist");
let relations = db.list_context_relations(Some(session_entity.id), 10)?;
assert_eq!(relations.len(), 2);
assert_eq!(relations.len(), 3);
assert!(relations.iter().any(|relation| relation.relation_type == "decided"));
assert!(relations.iter().any(|relation| relation.relation_type == "modify"));
assert!(relations
.iter()
.any(|relation| relation.relation_type == "delegates_to"));
Ok(())
}
@@ -4229,6 +4332,29 @@ mod tests {
vec![("worker-2".to_string(), 1), ("worker-3".to_string(), 1),]
);
let planner_entities = db.list_context_entities(Some("planner"), Some("session"), 10)?;
assert_eq!(planner_entities.len(), 1);
let planner_relations = db.list_context_relations(Some(planner_entities[0].id), 10)?;
assert!(planner_relations.iter().any(|relation| {
relation.relation_type == "queries" && relation.to_entity_name == "worker"
}));
assert!(planner_relations.iter().any(|relation| {
relation.relation_type == "delegates_to" && relation.to_entity_name == "worker-2"
}));
assert!(planner_relations.iter().any(|relation| {
relation.relation_type == "delegates_to" && relation.to_entity_name == "worker-3"
}));
let worker_entity = db
.list_context_entities(Some("worker"), Some("session"), 10)?
.into_iter()
.find(|entity| entity.name == "worker")
.expect("worker session entity should exist");
let worker_relations = db.list_context_relations(Some(worker_entity.id), 10)?;
assert!(worker_relations.iter().any(|relation| {
relation.relation_type == "completed_for" && relation.to_entity_name == "planner"
}));
Ok(())
}