From 9c294f78157db9069cf83723e6b7b37ff5680df0 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Fri, 10 Apr 2026 07:06:37 -0700 Subject: [PATCH] feat: add ecc2 pinned memory observations --- ecc2/src/main.rs | 125 ++++++++++++++++++++++- ecc2/src/session/mod.rs | 2 + ecc2/src/session/store.rs | 206 ++++++++++++++++++++++++++++++++++++-- ecc2/src/tui/dashboard.rs | 15 ++- 4 files changed, 331 insertions(+), 17 deletions(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index ce18d5fb..fa145fba 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -473,6 +473,9 @@ enum GraphCommands { /// Observation priority #[arg(long, value_enum, default_value_t = ObservationPriorityArg::Normal)] priority: ObservationPriorityArg, + /// Keep this observation across aggressive compaction + #[arg(long)] + pinned: bool, /// Observation summary #[arg(long)] summary: String, @@ -483,6 +486,24 @@ enum GraphCommands { #[arg(long)] json: bool, }, + /// Pin an existing observation so compaction preserves it + PinObservation { + /// Observation ID + #[arg(long)] + observation_id: i64, + /// Emit machine-readable JSON instead of the human summary + #[arg(long)] + json: bool, + }, + /// Remove the pin from an existing observation + UnpinObservation { + /// Observation ID + #[arg(long)] + observation_id: i64, + /// Emit machine-readable JSON instead of the human summary + #[arg(long)] + json: bool, + }, /// List observations in the shared context graph Observations { /// Filter to observations for a specific entity ID @@ -1388,6 +1409,7 @@ async fn main() -> Result<()> { entity_id, observation_type, priority, + pinned, summary, details, json, @@ -1402,6 +1424,7 @@ async fn main() -> Result<()> { entity_id, &observation_type, priority.into(), + pinned, &summary, &details, )?; @@ -1411,6 +1434,38 @@ async fn main() -> Result<()> { println!("{}", format_graph_observation_human(&observation)); } } + GraphCommands::PinObservation { + observation_id, + json, + } => { + let Some(observation) = db.set_context_observation_pinned(observation_id, true)? + else { + return Err(anyhow::anyhow!( + "Context graph observation #{observation_id} was not found" + )); + }; + if json { + println!("{}", serde_json::to_string_pretty(&observation)?); + } else { + println!("{}", format_graph_observation_human(&observation)); + } + } + GraphCommands::UnpinObservation { + observation_id, + json, + } => { + let Some(observation) = db.set_context_observation_pinned(observation_id, false)? + else { + return Err(anyhow::anyhow!( + "Context graph observation #{observation_id} was not found" + )); + }; + if json { + println!("{}", serde_json::to_string_pretty(&observation)?); + } else { + println!("{}", format_graph_observation_human(&observation)); + } + } GraphCommands::Observations { entity_id, limit, @@ -2144,6 +2199,7 @@ fn import_memory_connector_record( entity.id, observation_type, session::ContextObservationPriority::Normal, + false, summary, &record.details, )?; @@ -3349,6 +3405,7 @@ fn format_graph_observation_human(observation: &session::ContextGraphObservation ), format!("Type: {}", observation.observation_type), format!("Priority: {}", observation.priority), + format!("Pinned: {}", if observation.pinned { "yes" } else { "no" }), format!("Summary: {}", observation.summary), ]; if let Some(session_id) = observation.session_id.as_deref() { @@ -3380,10 +3437,11 @@ fn format_graph_observations_human(observations: &[session::ContextGraphObservat )]; for observation in observations { let mut line = format!( - "- #{} [{}/{}] {}", + "- #{} [{}/{}{}] {}", observation.id, observation.observation_type, observation.priority, + if observation.pinned { "/pinned" } else { "" }, observation.entity_name ); if let Some(session_id) = observation.session_id.as_deref() { @@ -3424,6 +3482,9 @@ fn format_graph_recall_human( entry.observation_count, entry.max_observation_priority ); + if entry.has_pinned_observation { + line.push_str(" | pinned"); + } if let Some(session_id) = entry.entity.session_id.as_deref() { line.push_str(&format!(" | {}", short_session(session_id))); } @@ -5463,6 +5524,7 @@ mod tests { "7", "--type", "completion_summary", + "--pinned", "--summary", "Finished auth callback recovery", "--detail", @@ -5479,6 +5541,7 @@ mod tests { entity_id, observation_type, priority, + pinned, summary, details, json, @@ -5488,6 +5551,7 @@ mod tests { assert_eq!(entity_id, 7); assert_eq!(observation_type, "completion_summary"); assert!(matches!(priority, ObservationPriorityArg::Normal)); + assert!(pinned); assert_eq!(summary, "Finished auth callback recovery"); assert_eq!(details, vec!["tests_run=2"]); assert!(json); @@ -5496,6 +5560,60 @@ mod tests { } } + #[test] + fn cli_parses_graph_pin_observation_command() { + let cli = Cli::try_parse_from([ + "ecc", + "graph", + "pin-observation", + "--observation-id", + "42", + "--json", + ]) + .expect("graph pin-observation should parse"); + + match cli.command { + Some(Commands::Graph { + command: + GraphCommands::PinObservation { + observation_id, + json, + }, + }) => { + assert_eq!(observation_id, 42); + assert!(json); + } + _ => panic!("expected graph pin-observation subcommand"), + } + } + + #[test] + fn cli_parses_graph_unpin_observation_command() { + let cli = Cli::try_parse_from([ + "ecc", + "graph", + "unpin-observation", + "--observation-id", + "42", + "--json", + ]) + .expect("graph unpin-observation should parse"); + + match cli.command { + Some(Commands::Graph { + command: + GraphCommands::UnpinObservation { + observation_id, + json, + }, + }) => { + assert_eq!(observation_id, 42); + assert!(json); + } + _ => panic!("expected graph unpin-observation subcommand"), + } + } + #[test] fn cli_parses_graph_compact_command() { let cli = Cli::try_parse_from([ @@ -5701,6 +5819,7 @@ mod tests { relation_count: 2, observation_count: 1, max_observation_priority: session::ContextObservationPriority::High, + has_pinned_observation: true, }], Some("sess-12345678"), "auth callback recovery", @@ -5709,6 +5828,7 @@ mod tests { assert!(text.contains("Relevant memory: 1 entries")); assert!(text.contains("[file] callback.ts | score 319 | relations 2 | observations 1")); assert!(text.contains("priority high")); + assert!(text.contains("| pinned")); assert!(text.contains("matches auth, callback, recovery")); assert!(text.contains("path src/routes/auth/callback.ts")); } @@ -5723,6 +5843,7 @@ mod tests { entity_name: "sess-12345678".to_string(), observation_type: "completion_summary".to_string(), priority: session::ContextObservationPriority::High, + pinned: true, summary: "Finished auth callback recovery with 2 tests".to_string(), details: BTreeMap::from([("tests_run".to_string(), "2".to_string())]), created_at: chrono::DateTime::parse_from_rfc3339("2026-04-10T01:02:03Z") @@ -5731,7 +5852,7 @@ mod tests { }]); assert!(text.contains("Context graph observations: 1")); - assert!(text.contains("[completion_summary/high] sess-12345678")); + assert!(text.contains("[completion_summary/high/pinned] sess-12345678")); assert!(text.contains("summary Finished auth callback recovery with 2 tests")); } diff --git a/ecc2/src/session/mod.rs b/ecc2/src/session/mod.rs index 878d88bc..ddde4cd4 100644 --- a/ecc2/src/session/mod.rs +++ b/ecc2/src/session/mod.rs @@ -199,6 +199,7 @@ pub struct ContextGraphObservation { pub entity_name: String, pub observation_type: String, pub priority: ContextObservationPriority, + pub pinned: bool, pub summary: String, pub details: BTreeMap, pub created_at: DateTime, @@ -212,6 +213,7 @@ pub struct ContextGraphRecallEntry { pub relation_count: usize, pub observation_count: usize, pub max_observation_priority: ContextObservationPriority, + pub has_pinned_observation: bool, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 46025b0c..b8465f62 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -268,6 +268,7 @@ impl StateStore { entity_id INTEGER NOT NULL REFERENCES context_graph_entities(id) ON DELETE CASCADE, observation_type TEXT NOT NULL, priority INTEGER NOT NULL DEFAULT 1, + pinned INTEGER NOT NULL DEFAULT 0, summary TEXT NOT NULL, details_json TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL @@ -473,6 +474,14 @@ impl StateStore { ) .context("Failed to add priority column to context_graph_observations table")?; } + if !self.has_column("context_graph_observations", "pinned")? { + self.conn + .execute( + "ALTER TABLE context_graph_observations ADD COLUMN pinned INTEGER NOT NULL DEFAULT 0", + [], + ) + .context("Failed to add pinned column to context_graph_observations table")?; + } if !self.has_column("daemon_activity", "last_dispatch_deferred")? { self.conn @@ -2103,7 +2112,12 @@ impl StateStore { SELECT MAX(priority) FROM context_graph_observations o WHERE o.entity_id = e.id - ), 1) AS max_observation_priority + ), 1) AS max_observation_priority, + COALESCE(( + SELECT MAX(pinned) + FROM context_graph_observations o + WHERE o.entity_id = e.id + ), 0) AS has_pinned_observation FROM context_graph_entities e WHERE (?1 IS NULL OR e.session_id = ?1) ORDER BY e.updated_at DESC, e.id DESC @@ -2120,12 +2134,14 @@ impl StateStore { let observation_count = row.get::<_, i64>(11)?.max(0) as usize; let max_observation_priority = ContextObservationPriority::from_db_value(row.get::<_, i64>(12)?); + let has_pinned_observation = row.get::<_, i64>(13)? != 0; Ok(( entity, relation_count, observation_text, observation_count, max_observation_priority, + has_pinned_observation, )) }, )? @@ -2141,6 +2157,7 @@ impl StateStore { observation_text, observation_count, max_observation_priority, + has_pinned_observation, )| { let matched_terms = context_graph_matched_terms(&entity, &observation_text, &terms); @@ -2154,6 +2171,7 @@ impl StateStore { relation_count, observation_count, max_observation_priority, + has_pinned_observation, entity.updated_at, now, ), @@ -2162,6 +2180,7 @@ impl StateStore { relation_count, observation_count, max_observation_priority, + has_pinned_observation, }) }, ) @@ -2250,6 +2269,7 @@ impl StateStore { entity_id: i64, observation_type: &str, priority: ContextObservationPriority, + pinned: bool, summary: &str, details: &BTreeMap, ) -> Result { @@ -2268,13 +2288,14 @@ impl StateStore { let details_json = serde_json::to_string(details)?; self.conn.execute( "INSERT INTO context_graph_observations ( - session_id, entity_id, observation_type, priority, summary, details_json, created_at - ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + session_id, entity_id, observation_type, priority, pinned, summary, details_json, created_at + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", rusqlite::params![ session_id, entity_id, observation_type.trim(), priority.as_db_value(), + pinned as i64, summary.trim(), details_json, now, @@ -2289,7 +2310,7 @@ impl StateStore { self.conn .query_row( "SELECT o.id, o.session_id, o.entity_id, e.entity_type, e.name, - o.observation_type, o.priority, o.summary, o.details_json, o.created_at + o.observation_type, o.priority, o.pinned, o.summary, o.details_json, o.created_at FROM context_graph_observations o JOIN context_graph_entities e ON e.id = o.entity_id WHERE o.id = ?1", @@ -2299,6 +2320,34 @@ impl StateStore { .map_err(Into::into) } + pub fn set_context_observation_pinned( + &self, + observation_id: i64, + pinned: bool, + ) -> Result> { + let changed = self.conn.execute( + "UPDATE context_graph_observations + SET pinned = ?2 + WHERE id = ?1", + rusqlite::params![observation_id, pinned as i64], + )?; + if changed == 0 { + return Ok(None); + } + self.conn + .query_row( + "SELECT o.id, o.session_id, o.entity_id, e.entity_type, e.name, + o.observation_type, o.priority, o.pinned, o.summary, o.details_json, o.created_at + FROM context_graph_observations o + JOIN context_graph_entities e ON e.id = o.entity_id + WHERE o.id = ?1", + rusqlite::params![observation_id], + map_context_graph_observation, + ) + .optional() + .map_err(Into::into) + } + pub fn compact_context_graph( &self, session_id: Option<&str>, @@ -2312,6 +2361,7 @@ impl StateStore { session_id: &str, observation_type: &str, priority: ContextObservationPriority, + pinned: bool, summary: &str, details: &BTreeMap, ) -> Result { @@ -2321,6 +2371,7 @@ impl StateStore { session_entity.id, observation_type, priority, + pinned, summary, details, ) @@ -2333,11 +2384,11 @@ impl StateStore { ) -> Result> { let mut stmt = self.conn.prepare( "SELECT o.id, o.session_id, o.entity_id, e.entity_type, e.name, - o.observation_type, o.priority, o.summary, o.details_json, o.created_at + o.observation_type, o.priority, o.pinned, o.summary, o.details_json, o.created_at FROM context_graph_observations o JOIN context_graph_entities e ON e.id = o.entity_id WHERE (?1 IS NULL OR o.entity_id = ?1) - ORDER BY o.created_at DESC, o.id DESC + ORDER BY o.pinned DESC, o.created_at DESC, o.id DESC LIMIT ?2", )?; @@ -2414,7 +2465,7 @@ impl StateStore { SELECT o.id, ROW_NUMBER() OVER ( PARTITION BY o.entity_id, o.observation_type, o.summary - ORDER BY o.created_at DESC, o.id DESC + ORDER BY o.pinned DESC, o.created_at DESC, o.id DESC ) AS rn FROM context_graph_observations o JOIN context_graph_entities e ON e.id = o.entity_id @@ -2435,6 +2486,7 @@ impl StateStore { JOIN context_graph_entities e ON e.id = o.entity_id WHERE (?1 IS NULL OR e.session_id = ?1) AND (?2 IS NULL OR o.entity_id = ?2) + AND o.pinned = 0 )", rusqlite::params![session_id, entity_id], )? @@ -2453,6 +2505,7 @@ impl StateStore { JOIN context_graph_entities e ON e.id = o.entity_id WHERE (?1 IS NULL OR e.session_id = ?1) AND (?2 IS NULL OR o.entity_id = ?2) + AND o.pinned = 0 ) ranked WHERE ranked.rn > ?3 )", @@ -3464,12 +3517,12 @@ fn map_context_graph_observation( row: &rusqlite::Row<'_>, ) -> rusqlite::Result { let details_json = row - .get::<_, Option>(8)? + .get::<_, Option>(9)? .unwrap_or_else(|| "{}".to_string()); let details = serde_json::from_str(&details_json).map_err(|error| { - rusqlite::Error::FromSqlConversionFailure(8, rusqlite::types::Type::Text, Box::new(error)) + rusqlite::Error::FromSqlConversionFailure(9, rusqlite::types::Type::Text, Box::new(error)) })?; - let created_at = parse_store_timestamp(row.get::<_, String>(9)?, 9)?; + let created_at = parse_store_timestamp(row.get::<_, String>(10)?, 10)?; Ok(ContextGraphObservation { id: row.get(0)?, @@ -3479,7 +3532,8 @@ fn map_context_graph_observation( entity_name: row.get(4)?, observation_type: row.get(5)?, priority: ContextObservationPriority::from_db_value(row.get::<_, i64>(6)?), - summary: row.get(7)?, + pinned: row.get::<_, i64>(7)? != 0, + summary: row.get(8)?, details, created_at, }) @@ -3534,6 +3588,7 @@ fn context_graph_recall_score( relation_count: usize, observation_count: usize, max_observation_priority: ContextObservationPriority, + has_pinned_observation: bool, updated_at: chrono::DateTime, now: chrono::DateTime, ) -> u64 { @@ -3554,6 +3609,7 @@ fn context_graph_recall_score( + (relation_count.min(9) as u64 * 10) + (observation_count.min(6) as u64 * 8) + (max_observation_priority.as_db_value() as u64 * 18) + + if has_pinned_observation { 48 } else { 0 } + recency_bonus } @@ -4376,6 +4432,7 @@ mod tests { entity.id, "note", ContextObservationPriority::Normal, + false, "Customer wiped setup and got charged twice", &BTreeMap::from([("customer".to_string(), "viktor".to_string())]), )?; @@ -4386,6 +4443,7 @@ mod tests { assert_eq!(observations[0].entity_name, "Prefer recovery-first routing"); assert_eq!(observations[0].observation_type, "note"); assert_eq!(observations[0].priority, ContextObservationPriority::Normal); + assert!(!observations[0].pinned); assert_eq!( observations[0].details.get("customer"), Some(&"viktor".to_string()) @@ -4503,6 +4561,7 @@ mod tests { entity.id, "completion_summary", ContextObservationPriority::Normal, + false, &summary, &BTreeMap::new(), )?; @@ -4586,6 +4645,7 @@ mod tests { recovery.id, "incident_note", ContextObservationPriority::High, + true, "Previous auth callback recovery incident affected Viktor after a wipe", &BTreeMap::new(), )?; @@ -4605,6 +4665,7 @@ mod tests { results[0].max_observation_priority, ContextObservationPriority::High ); + assert!(results[0].has_pinned_observation); assert_eq!(results[1].entity.id, callback.id); assert!(results[1] .matched_terms @@ -4620,11 +4681,134 @@ mod tests { results[1].max_observation_priority, ContextObservationPriority::Normal ); + assert!(!results[1].has_pinned_observation); assert!(!results.iter().any(|entry| entry.entity.id == unrelated.id)); Ok(()) } + #[test] + fn compact_context_graph_preserves_pinned_observations() -> Result<()> { + let tempdir = TestDir::new("store-context-pinned-observations")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "session-1".to_string(), + task: "deep memory".to_string(), + project: "workspace".to_string(), + task_group: "knowledge".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: SessionMetrics::default(), + })?; + + let entity = db.upsert_context_entity( + Some("session-1"), + "incident", + "billing-recovery", + None, + "Recovery notes", + &BTreeMap::new(), + )?; + + db.add_context_observation( + Some("session-1"), + entity.id, + "incident_note", + ContextObservationPriority::High, + true, + "Pinned billing recovery memory", + &BTreeMap::new(), + )?; + std::thread::sleep(std::time::Duration::from_millis(2)); + db.add_context_observation( + Some("session-1"), + entity.id, + "incident_note", + ContextObservationPriority::Normal, + false, + "Newest unpinned memory", + &BTreeMap::new(), + )?; + + let stats = db.compact_context_graph(None, 1)?; + assert_eq!(stats.observations_retained, 2); + + let observations = db.list_context_observations(Some(entity.id), 10)?; + assert_eq!(observations.len(), 2); + assert!(observations.iter().any(|entry| entry.pinned)); + assert!(observations + .iter() + .any(|entry| entry.summary == "Pinned billing recovery memory")); + assert!(observations + .iter() + .any(|entry| entry.summary == "Newest unpinned memory")); + + Ok(()) + } + + #[test] + fn set_context_observation_pinned_updates_existing_observation() -> Result<()> { + let tempdir = TestDir::new("store-context-pin-toggle")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "session-1".to_string(), + task: "deep memory".to_string(), + project: "workspace".to_string(), + task_group: "knowledge".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: SessionMetrics::default(), + })?; + + let entity = db.upsert_context_entity( + Some("session-1"), + "incident", + "billing-recovery", + None, + "Recovery notes", + &BTreeMap::new(), + )?; + + let observation = db.add_context_observation( + Some("session-1"), + entity.id, + "incident_note", + ContextObservationPriority::Normal, + false, + "Temporarily useful note", + &BTreeMap::new(), + )?; + assert!(!observation.pinned); + + let pinned = db + .set_context_observation_pinned(observation.id, true)? + .expect("observation should exist"); + assert!(pinned.pinned); + + let unpinned = db + .set_context_observation_pinned(observation.id, false)? + .expect("observation should still exist"); + assert!(!unpinned.pinned); + + Ok(()) + } + #[test] fn context_graph_detail_includes_incoming_and_outgoing_relations() -> Result<()> { let tempdir = TestDir::new("store-context-relations")?; diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index a9e7464b..c2e3712b 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -4261,6 +4261,7 @@ impl Dashboard { &session.id, observation_type, priority, + false, &observation_summary, &details, ) { @@ -5374,6 +5375,9 @@ impl Dashboard { entry.observation_count, entry.max_observation_priority ); + if entry.has_pinned_observation { + line.push_str(" | pinned"); + } if let Some(session_id) = entry.entity.session_id.as_deref() { if session_id != session.id { line.push_str(&format!(" | {}", format_session_id(session_id))); @@ -5395,8 +5399,9 @@ impl Dashboard { if let Ok(observations) = self.db.list_context_observations(Some(entry.entity.id), 1) { if let Some(observation) = observations.first() { lines.push(format!( - " memory [{}] {}", + " memory [{}{}] {}", observation.priority, + if observation.pinned { "/pinned" } else { "" }, truncate_for_dashboard(&observation.summary, 72) )); } @@ -10544,6 +10549,7 @@ diff --git a/src/lib.rs b/src/lib.rs\n\ entity.id, "completion_summary", ContextObservationPriority::Normal, + true, "Recovered auth callback incident with billing fallback", &BTreeMap::new(), )?; @@ -10551,10 +10557,11 @@ diff --git a/src/lib.rs b/src/lib.rs\n\ let text = dashboard.selected_session_metrics_text(); assert!(text.contains("Relevant memory")); assert!(text.contains("[file] callback.ts")); + assert!(text.contains("| pinned")); assert!(text.contains("matches auth, callback, recovery")); - assert!( - text.contains("memory [normal] Recovered auth callback incident with billing fallback") - ); + assert!(text.contains( + "memory [normal/pinned] Recovered auth callback incident with billing fallback" + )); Ok(()) }