From beaba1ca15487cfa4b94d5a3a144eba05ec0905a Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Fri, 10 Apr 2026 04:30:32 -0700 Subject: [PATCH] feat: add ecc2 graph coordination edges --- ecc2/src/main.rs | 3 + ecc2/src/session/mod.rs | 1 + ecc2/src/session/store.rs | 184 ++++++++++++++++++++++++++++++++------ 3 files changed, 159 insertions(+), 29 deletions(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 434707aa..24038d34 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -2266,6 +2266,7 @@ fn format_graph_sync_stats_human( format!("- sessions scanned {}", stats.sessions_scanned), format!("- decisions processed {}", stats.decisions_processed), format!("- file events processed {}", stats.file_events_processed), + format!("- messages processed {}", stats.messages_processed), ] .join("\n") } @@ -4202,6 +4203,7 @@ mod tests { sessions_scanned: 2, decisions_processed: 3, file_events_processed: 5, + messages_processed: 4, }, Some("sess-12345678"), ); @@ -4210,6 +4212,7 @@ mod tests { assert!(text.contains("- sessions scanned 2")); assert!(text.contains("- decisions processed 3")); assert!(text.contains("- file events processed 5")); + assert!(text.contains("- messages processed 4")); } #[test] diff --git a/ecc2/src/session/mod.rs b/ecc2/src/session/mod.rs index 1fce45e9..583d8bde 100644 --- a/ecc2/src/session/mod.rs +++ b/ecc2/src/session/mod.rs @@ -195,6 +195,7 @@ pub struct ContextGraphSyncStats { pub sessions_scanned: usize, pub decisions_processed: usize, pub file_events_processed: usize, + pub messages_processed: usize, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 5786298a..b32bb0ea 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -1334,46 +1334,88 @@ impl StateStore { } fn sync_context_graph_session(&self, session_id: &str) -> Result { - let session = self - .get_session(session_id)? - .ok_or_else(|| anyhow::anyhow!("Session not found for context graph sync: {session_id}"))?; - + let session = self.get_session(session_id)?; let mut metadata = BTreeMap::new(); - metadata.insert("task".to_string(), session.task.clone()); - metadata.insert("project".to_string(), session.project.clone()); - metadata.insert("task_group".to_string(), session.task_group.clone()); - metadata.insert("agent_type".to_string(), session.agent_type.clone()); - metadata.insert("state".to_string(), session.state.to_string()); - metadata.insert( - "working_dir".to_string(), - session.working_dir.display().to_string(), - ); - if let Some(pid) = session.pid { - metadata.insert("pid".to_string(), pid.to_string()); - } - if let Some(worktree) = &session.worktree { + let persisted_session_id = if session.is_some() { + Some(session_id) + } else { + None + }; + let summary = if let Some(session) = session { + metadata.insert("task".to_string(), session.task.clone()); + metadata.insert("project".to_string(), session.project.clone()); + metadata.insert("task_group".to_string(), session.task_group.clone()); + metadata.insert("agent_type".to_string(), session.agent_type.clone()); + metadata.insert("state".to_string(), session.state.to_string()); metadata.insert( - "worktree_path".to_string(), - worktree.path.display().to_string(), + "working_dir".to_string(), + session.working_dir.display().to_string(), ); - metadata.insert("worktree_branch".to_string(), worktree.branch.clone()); - metadata.insert("base_branch".to_string(), worktree.base_branch.clone()); - } + if let Some(pid) = session.pid { + metadata.insert("pid".to_string(), pid.to_string()); + } + if let Some(worktree) = &session.worktree { + metadata.insert( + "worktree_path".to_string(), + worktree.path.display().to_string(), + ); + metadata.insert("worktree_branch".to_string(), worktree.branch.clone()); + metadata.insert("base_branch".to_string(), worktree.base_branch.clone()); + } - let summary = format!( - "{} | {} | {} / {}", - session.state, session.agent_type, session.project, session.task_group - ); + format!( + "{} | {} | {} / {}", + session.state, session.agent_type, session.project, session.task_group + ) + } else { + metadata.insert("state".to_string(), "unknown".to_string()); + "session placeholder".to_string() + }; self.upsert_context_entity( - Some(&session.id), + persisted_session_id, "session", - &session.id, + session_id, None, &summary, &metadata, ) } + fn sync_context_graph_message( + &self, + from_session_id: &str, + to_session_id: &str, + content: &str, + msg_type: &str, + ) -> Result<()> { + let relation_session_id = self + .get_session(from_session_id)? + .map(|session| session.id) + .filter(|id| !id.is_empty()); + let from_entity = self.sync_context_graph_session(from_session_id)?; + let to_entity = self.sync_context_graph_session(to_session_id)?; + + let relation_type = match msg_type { + "task_handoff" => "delegates_to", + "query" => "queries", + "response" => "responds_to", + "completed" => "completed_for", + "conflict" => "conflicts_with", + other => other, + }; + let summary = crate::comms::preview(msg_type, content); + + self.upsert_context_relation( + relation_session_id.as_deref(), + from_entity.id, + to_entity.id, + relation_type, + &summary, + )?; + + Ok(()) + } + pub fn increment_tool_calls(&self, session_id: &str) -> Result<()> { self.conn.execute( "UPDATE sessions @@ -1503,9 +1545,45 @@ impl StateStore { VALUES (?1, ?2, ?3, ?4, ?5)", rusqlite::params![from, to, content, msg_type, chrono::Utc::now().to_rfc3339()], )?; + self.sync_context_graph_message(from, to, content, msg_type)?; Ok(()) } + fn list_messages_sent_by_session( + &self, + session_id: &str, + limit: usize, + ) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, from_session, to_session, content, msg_type, read, timestamp + FROM messages + WHERE from_session = ?1 + ORDER BY id DESC + LIMIT ?2", + )?; + + let mut messages = stmt + .query_map(rusqlite::params![session_id, limit as i64], |row| { + let timestamp: String = row.get(6)?; + + Ok(SessionMessage { + id: row.get(0)?, + from_session: row.get(1)?, + to_session: row.get(2)?, + content: row.get(3)?, + msg_type: row.get(4)?, + read: row.get::<_, i64>(5)? != 0, + timestamp: chrono::DateTime::parse_from_rfc3339(×tamp) + .unwrap_or_default() + .with_timezone(&chrono::Utc), + }) + })? + .collect::, _>>()?; + + messages.reverse(); + Ok(messages) + } + pub fn list_messages_for_session( &self, session_id: &str, @@ -1845,6 +1923,16 @@ impl StateStore { self.sync_context_graph_file_event(&session.id, "history", &persisted)?; stats.file_events_processed = stats.file_events_processed.saturating_add(1); } + + for message in self.list_messages_sent_by_session(&session.id, per_session_limit)? { + self.sync_context_graph_message( + &message.from_session, + &message.to_session, + &message.content, + &message.msg_type, + )?; + stats.messages_processed = stats.messages_processed.saturating_add(1); + } } Ok(stats) @@ -4020,11 +4108,23 @@ mod tests { "[{\"path\":\"src/backfill.rs\",\"action\":\"modify\"}]", ], )?; + db.conn.execute( + "INSERT INTO messages (from_session, to_session, content, msg_type, timestamp) + VALUES (?1, ?2, ?3, ?4, ?5)", + rusqlite::params![ + "session-1", + "session-2", + "{\"task\":\"Review backfill output\",\"context\":\"graph sync\"}", + "task_handoff", + "2026-04-10T00:02:00Z", + ], + )?; let stats = db.sync_context_graph_history(Some("session-1"), 10)?; assert_eq!(stats.sessions_scanned, 1); assert_eq!(stats.decisions_processed, 1); assert_eq!(stats.file_events_processed, 1); + assert_eq!(stats.messages_processed, 1); let entities = db.list_context_entities(Some("session-1"), None, 10)?; assert!(entities @@ -4038,9 +4138,12 @@ mod tests { .find(|entity| entity.entity_type == "session" && entity.name == "session-1") .expect("session entity should exist"); let relations = db.list_context_relations(Some(session_entity.id), 10)?; - assert_eq!(relations.len(), 2); + assert_eq!(relations.len(), 3); assert!(relations.iter().any(|relation| relation.relation_type == "decided")); assert!(relations.iter().any(|relation| relation.relation_type == "modify")); + assert!(relations + .iter() + .any(|relation| relation.relation_type == "delegates_to")); Ok(()) } @@ -4229,6 +4332,29 @@ mod tests { vec![("worker-2".to_string(), 1), ("worker-3".to_string(), 1),] ); + let planner_entities = db.list_context_entities(Some("planner"), Some("session"), 10)?; + assert_eq!(planner_entities.len(), 1); + let planner_relations = db.list_context_relations(Some(planner_entities[0].id), 10)?; + assert!(planner_relations.iter().any(|relation| { + relation.relation_type == "queries" && relation.to_entity_name == "worker" + })); + assert!(planner_relations.iter().any(|relation| { + relation.relation_type == "delegates_to" && relation.to_entity_name == "worker-2" + })); + assert!(planner_relations.iter().any(|relation| { + relation.relation_type == "delegates_to" && relation.to_entity_name == "worker-3" + })); + + let worker_entity = db + .list_context_entities(Some("worker"), Some("session"), 10)? + .into_iter() + .find(|entity| entity.name == "worker") + .expect("worker session entity should exist"); + let worker_relations = db.list_context_relations(Some(worker_entity.id), 10)?; + assert!(worker_relations.iter().any(|relation| { + relation.relation_type == "completed_for" && relation.to_entity_name == "planner" + })); + Ok(()) }