From 8cc92c59a64d7f1d0b820b98981934d5a69e1252 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Fri, 10 Apr 2026 06:07:12 -0700 Subject: [PATCH] feat: add ecc2 graph compaction --- ecc2/src/main.rs | 116 +++++++++++++++++++ ecc2/src/session/mod.rs | 8 ++ ecc2/src/session/store.rs | 237 +++++++++++++++++++++++++++++++++++++- 3 files changed, 356 insertions(+), 5 deletions(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 226730f8..9329af08 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -490,6 +490,18 @@ enum GraphCommands { #[arg(long)] json: bool, }, + /// Compact stored observations in the shared context graph + Compact { + /// Filter by source session ID or alias + #[arg(long)] + session_id: Option, + /// Maximum observations to retain per entity after compaction + #[arg(long, default_value_t = 12)] + keep_observations_per_entity: usize, + /// Emit machine-readable JSON instead of the human summary + #[arg(long)] + json: bool, + }, /// Recall relevant context graph entities for a query Recall { /// Filter by source session ID or alias @@ -1314,6 +1326,32 @@ async fn main() -> Result<()> { println!("{}", format_graph_observations_human(&observations)); } } + GraphCommands::Compact { + session_id, + keep_observations_per_entity, + json, + } => { + let resolved_session_id = session_id + .as_deref() + .map(|value| resolve_session_id(&db, value)) + .transpose()?; + let stats = db.compact_context_graph( + resolved_session_id.as_deref(), + keep_observations_per_entity, + )?; + if json { + println!("{}", serde_json::to_string_pretty(&stats)?); + } else { + println!( + "{}", + format_graph_compaction_stats_human( + &stats, + resolved_session_id.as_deref(), + keep_observations_per_entity, + ) + ); + } + } GraphCommands::Recall { session_id, query, @@ -2416,6 +2454,32 @@ fn format_graph_recall_human( lines.join("\n") } +fn format_graph_compaction_stats_human( + stats: &session::ContextGraphCompactionStats, + session_id: Option<&str>, + keep_observations_per_entity: usize, +) -> String { + let scope = session_id + .map(short_session) + .unwrap_or_else(|| "all sessions".to_string()); + [ + format!( + "Context graph compaction complete for {scope} (keep {keep_observations_per_entity} observations per entity)" + ), + format!("- entities scanned {}", stats.entities_scanned), + format!( + "- duplicate observations deleted {}", + stats.duplicate_observations_deleted + ), + format!( + "- overflow observations deleted {}", + stats.overflow_observations_deleted + ), + format!("- observations retained {}", stats.observations_retained), + ] + .join("\n") +} + fn format_graph_entity_detail_human(detail: &session::ContextGraphEntityDetail) -> String { let mut lines = vec![format_graph_entity_human(&detail.entity)]; lines.push(String::new()); @@ -4393,6 +4457,37 @@ mod tests { } } + #[test] + fn cli_parses_graph_compact_command() { + let cli = Cli::try_parse_from([ + "ecc", + "graph", + "compact", + "--session-id", + "latest", + "--keep-observations-per-entity", + "6", + "--json", + ]) + .expect("graph compact should parse"); + + match cli.command { + Some(Commands::Graph { + command: + GraphCommands::Compact { + session_id, + keep_observations_per_entity, + json, + }, + }) => { + assert_eq!(session_id.as_deref(), Some("latest")); + assert_eq!(keep_observations_per_entity, 6); + assert!(json); + } + _ => panic!("expected graph compact subcommand"), + } + } + #[test] fn format_decisions_human_renders_details() { let text = format_decisions_human( @@ -4534,6 +4629,27 @@ mod tests { assert!(text.contains("summary Finished auth callback recovery with 2 tests")); } + #[test] + fn format_graph_compaction_stats_human_renders_counts() { + let text = format_graph_compaction_stats_human( + &session::ContextGraphCompactionStats { + entities_scanned: 3, + duplicate_observations_deleted: 2, + overflow_observations_deleted: 4, + observations_retained: 9, + }, + Some("sess-12345678"), + 6, + ); + + assert!(text.contains("Context graph compaction complete for sess-123")); + assert!(text.contains("keep 6 observations per entity")); + assert!(text.contains("- entities scanned 3")); + assert!(text.contains("- duplicate observations deleted 2")); + assert!(text.contains("- overflow observations deleted 4")); + assert!(text.contains("- observations retained 9")); + } + #[test] fn format_graph_sync_stats_human_renders_counts() { let text = format_graph_sync_stats_human( diff --git a/ecc2/src/session/mod.rs b/ecc2/src/session/mod.rs index 40e15ea7..727a7c9f 100644 --- a/ecc2/src/session/mod.rs +++ b/ecc2/src/session/mod.rs @@ -220,6 +220,14 @@ pub struct ContextGraphSyncStats { pub messages_processed: usize, } +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +pub struct ContextGraphCompactionStats { + pub entities_scanned: usize, + pub duplicate_observations_deleted: usize, + pub overflow_observations_deleted: usize, + pub observations_retained: usize, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum FileActivityAction { diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 01b1fa06..31d93ce6 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -13,17 +13,19 @@ use crate::observability::{ToolCallEvent, ToolLogEntry, ToolLogPage}; use super::output::{OutputLine, OutputStream, OUTPUT_BUFFER_LIMIT}; use super::{ - default_project_label, default_task_group_label, normalize_group_label, ContextGraphEntity, - ContextGraphEntityDetail, ContextGraphObservation, ContextGraphRecallEntry, - ContextGraphRelation, ContextGraphSyncStats, DecisionLogEntry, FileActivityAction, - FileActivityEntry, Session, SessionAgentProfile, SessionMessage, SessionMetrics, SessionState, - WorktreeInfo, + default_project_label, default_task_group_label, normalize_group_label, + ContextGraphCompactionStats, ContextGraphEntity, ContextGraphEntityDetail, + ContextGraphObservation, ContextGraphRecallEntry, ContextGraphRelation, ContextGraphSyncStats, + DecisionLogEntry, FileActivityAction, FileActivityEntry, Session, SessionAgentProfile, + SessionMessage, SessionMetrics, SessionState, WorktreeInfo, }; pub struct StateStore { conn: Connection, } +const DEFAULT_CONTEXT_GRAPH_OBSERVATION_RETENTION: usize = 12; + #[derive(Debug, Clone)] pub struct PendingWorktreeRequest { pub session_id: String, @@ -2235,6 +2237,11 @@ impl StateStore { ], )?; let observation_id = self.conn.last_insert_rowid(); + self.compact_context_graph_observations( + None, + Some(entity_id), + DEFAULT_CONTEXT_GRAPH_OBSERVATION_RETENTION, + )?; self.conn .query_row( "SELECT o.id, o.session_id, o.entity_id, e.entity_type, e.name, @@ -2248,6 +2255,14 @@ impl StateStore { .map_err(Into::into) } + pub fn compact_context_graph( + &self, + session_id: Option<&str>, + keep_observations_per_entity: usize, + ) -> Result { + self.compact_context_graph_observations(session_id, None, keep_observations_per_entity) + } + pub fn add_session_observation( &self, session_id: &str, @@ -2289,6 +2304,94 @@ impl StateStore { Ok(entries) } + fn compact_context_graph_observations( + &self, + session_id: Option<&str>, + entity_id: Option, + keep_observations_per_entity: usize, + ) -> Result { + let entities_scanned = self.conn.query_row( + "SELECT COUNT(DISTINCT o.entity_id) + FROM context_graph_observations o + JOIN context_graph_entities e ON e.id = o.entity_id + WHERE (?1 IS NULL OR e.session_id = ?1) + AND (?2 IS NULL OR o.entity_id = ?2)", + rusqlite::params![session_id, entity_id], + |row| row.get::<_, i64>(0), + )? as usize; + + let duplicate_observations_deleted = self.conn.execute( + "DELETE FROM context_graph_observations + WHERE id IN ( + SELECT id + FROM ( + SELECT o.id, + ROW_NUMBER() OVER ( + PARTITION BY o.entity_id, o.observation_type, o.summary + ORDER BY o.created_at DESC, o.id DESC + ) AS rn + FROM context_graph_observations o + JOIN context_graph_entities e ON e.id = o.entity_id + WHERE (?1 IS NULL OR e.session_id = ?1) + AND (?2 IS NULL OR o.entity_id = ?2) + ) ranked + WHERE ranked.rn > 1 + )", + rusqlite::params![session_id, entity_id], + )?; + + let overflow_observations_deleted = if keep_observations_per_entity == 0 { + self.conn.execute( + "DELETE FROM context_graph_observations + WHERE id IN ( + SELECT o.id + FROM context_graph_observations o + JOIN context_graph_entities e ON e.id = o.entity_id + WHERE (?1 IS NULL OR e.session_id = ?1) + AND (?2 IS NULL OR o.entity_id = ?2) + )", + rusqlite::params![session_id, entity_id], + )? + } else { + self.conn.execute( + "DELETE FROM context_graph_observations + WHERE id IN ( + SELECT id + FROM ( + SELECT o.id, + ROW_NUMBER() OVER ( + PARTITION BY o.entity_id + ORDER BY o.created_at DESC, o.id DESC + ) AS rn + FROM context_graph_observations o + JOIN context_graph_entities e ON e.id = o.entity_id + WHERE (?1 IS NULL OR e.session_id = ?1) + AND (?2 IS NULL OR o.entity_id = ?2) + ) ranked + WHERE ranked.rn > ?3 + )", + rusqlite::params![session_id, entity_id, keep_observations_per_entity as i64], + )? + }; + + let observations_retained = self.conn.query_row( + "SELECT COUNT(*) + FROM context_graph_observations o + JOIN context_graph_entities e ON e.id = o.entity_id + WHERE (?1 IS NULL OR e.session_id = ?1) + AND (?2 IS NULL OR o.entity_id = ?2)", + rusqlite::params![session_id, entity_id], + |row| row.get::<_, i64>(0), + )? as usize; + + Ok(ContextGraphCompactionStats { + entities_scanned, + duplicate_observations_deleted, + overflow_observations_deleted, + observations_retained, + }) + } + pub fn upsert_context_relation( &self, session_id: Option<&str>, @@ -4200,6 +4303,130 @@ mod tests { Ok(()) } + #[test] + fn compact_context_graph_prunes_duplicate_and_overflow_observations() -> Result<()> { + let tempdir = TestDir::new("store-context-compaction")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "session-1".to_string(), + task: "deep memory".to_string(), + project: "workspace".to_string(), + task_group: "knowledge".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: SessionMetrics::default(), + })?; + + let entity = db.upsert_context_entity( + Some("session-1"), + "decision", + "Prefer recovery-first routing", + None, + "Recovered installs should go through the portal first", + &BTreeMap::new(), + )?; + + for summary in [ + "old duplicate", + "keep me", + "old duplicate", + "recent", + "latest", + ] { + db.conn.execute( + "INSERT INTO context_graph_observations ( + session_id, entity_id, observation_type, summary, details_json, created_at + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + rusqlite::params![ + "session-1", + entity.id, + "note", + summary, + "{}", + chrono::Utc::now().to_rfc3339(), + ], + )?; + std::thread::sleep(std::time::Duration::from_millis(2)); + } + + let stats = db.compact_context_graph(None, 3)?; + assert_eq!(stats.entities_scanned, 1); + assert_eq!(stats.duplicate_observations_deleted, 1); + assert_eq!(stats.overflow_observations_deleted, 1); + assert_eq!(stats.observations_retained, 3); + + let observations = db.list_context_observations(Some(entity.id), 10)?; + let summaries = observations + .iter() + .map(|observation| observation.summary.as_str()) + .collect::>(); + assert_eq!(summaries, vec!["latest", "recent", "old duplicate"]); + + Ok(()) + } + + #[test] + fn add_context_observation_auto_compacts_entity_history() -> Result<()> { + let tempdir = TestDir::new("store-context-auto-compaction")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "session-1".to_string(), + task: "deep memory".to_string(), + project: "workspace".to_string(), + task_group: "knowledge".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: SessionMetrics::default(), + })?; + + let entity = db.upsert_context_entity( + Some("session-1"), + "session", + "session-1", + None, + "Deep-memory worker", + &BTreeMap::new(), + )?; + + for index in 0..(DEFAULT_CONTEXT_GRAPH_OBSERVATION_RETENTION + 2) { + let summary = format!("completion summary {}", index); + db.add_context_observation( + Some("session-1"), + entity.id, + "completion_summary", + &summary, + &BTreeMap::new(), + )?; + std::thread::sleep(std::time::Duration::from_millis(2)); + } + + let observations = db.list_context_observations(Some(entity.id), 20)?; + assert_eq!( + observations.len(), + DEFAULT_CONTEXT_GRAPH_OBSERVATION_RETENTION + ); + assert_eq!(observations[0].summary, "completion summary 13"); + assert_eq!(observations.last().unwrap().summary, "completion summary 2"); + + Ok(()) + } + #[test] fn recall_context_entities_ranks_matching_entities() -> Result<()> { let tempdir = TestDir::new("store-context-recall")?;