feat: add ecc2 pinned memory observations

This commit is contained in:
Affaan Mustafa
2026-04-10 07:06:37 -07:00
parent 766bf31737
commit 9c294f7815
4 changed files with 331 additions and 17 deletions

View File

@@ -473,6 +473,9 @@ enum GraphCommands {
/// Observation priority
#[arg(long, value_enum, default_value_t = ObservationPriorityArg::Normal)]
priority: ObservationPriorityArg,
/// Keep this observation across aggressive compaction
#[arg(long)]
pinned: bool,
/// Observation summary
#[arg(long)]
summary: String,
@@ -483,6 +486,24 @@ enum GraphCommands {
#[arg(long)]
json: bool,
},
/// Pin an existing observation so compaction preserves it
PinObservation {
/// Observation ID
#[arg(long)]
observation_id: i64,
/// Emit machine-readable JSON instead of the human summary
#[arg(long)]
json: bool,
},
/// Remove the pin from an existing observation
UnpinObservation {
/// Observation ID
#[arg(long)]
observation_id: i64,
/// Emit machine-readable JSON instead of the human summary
#[arg(long)]
json: bool,
},
/// List observations in the shared context graph
Observations {
/// Filter to observations for a specific entity ID
@@ -1388,6 +1409,7 @@ async fn main() -> Result<()> {
entity_id,
observation_type,
priority,
pinned,
summary,
details,
json,
@@ -1402,6 +1424,7 @@ async fn main() -> Result<()> {
entity_id,
&observation_type,
priority.into(),
pinned,
&summary,
&details,
)?;
@@ -1411,6 +1434,38 @@ async fn main() -> Result<()> {
println!("{}", format_graph_observation_human(&observation));
}
}
GraphCommands::PinObservation {
observation_id,
json,
} => {
let Some(observation) = db.set_context_observation_pinned(observation_id, true)?
else {
return Err(anyhow::anyhow!(
"Context graph observation #{observation_id} was not found"
));
};
if json {
println!("{}", serde_json::to_string_pretty(&observation)?);
} else {
println!("{}", format_graph_observation_human(&observation));
}
}
GraphCommands::UnpinObservation {
observation_id,
json,
} => {
let Some(observation) = db.set_context_observation_pinned(observation_id, false)?
else {
return Err(anyhow::anyhow!(
"Context graph observation #{observation_id} was not found"
));
};
if json {
println!("{}", serde_json::to_string_pretty(&observation)?);
} else {
println!("{}", format_graph_observation_human(&observation));
}
}
GraphCommands::Observations {
entity_id,
limit,
@@ -2144,6 +2199,7 @@ fn import_memory_connector_record(
entity.id,
observation_type,
session::ContextObservationPriority::Normal,
false,
summary,
&record.details,
)?;
@@ -3349,6 +3405,7 @@ fn format_graph_observation_human(observation: &session::ContextGraphObservation
),
format!("Type: {}", observation.observation_type),
format!("Priority: {}", observation.priority),
format!("Pinned: {}", if observation.pinned { "yes" } else { "no" }),
format!("Summary: {}", observation.summary),
];
if let Some(session_id) = observation.session_id.as_deref() {
@@ -3380,10 +3437,11 @@ fn format_graph_observations_human(observations: &[session::ContextGraphObservat
)];
for observation in observations {
let mut line = format!(
"- #{} [{}/{}] {}",
"- #{} [{}/{}{}] {}",
observation.id,
observation.observation_type,
observation.priority,
if observation.pinned { "/pinned" } else { "" },
observation.entity_name
);
if let Some(session_id) = observation.session_id.as_deref() {
@@ -3424,6 +3482,9 @@ fn format_graph_recall_human(
entry.observation_count,
entry.max_observation_priority
);
if entry.has_pinned_observation {
line.push_str(" | pinned");
}
if let Some(session_id) = entry.entity.session_id.as_deref() {
line.push_str(&format!(" | {}", short_session(session_id)));
}
@@ -5463,6 +5524,7 @@ mod tests {
"7",
"--type",
"completion_summary",
"--pinned",
"--summary",
"Finished auth callback recovery",
"--detail",
@@ -5479,6 +5541,7 @@ mod tests {
entity_id,
observation_type,
priority,
pinned,
summary,
details,
json,
@@ -5488,6 +5551,7 @@ mod tests {
assert_eq!(entity_id, 7);
assert_eq!(observation_type, "completion_summary");
assert!(matches!(priority, ObservationPriorityArg::Normal));
assert!(pinned);
assert_eq!(summary, "Finished auth callback recovery");
assert_eq!(details, vec!["tests_run=2"]);
assert!(json);
@@ -5496,6 +5560,60 @@ mod tests {
}
}
#[test]
fn cli_parses_graph_pin_observation_command() {
let cli = Cli::try_parse_from([
"ecc",
"graph",
"pin-observation",
"--observation-id",
"42",
"--json",
])
.expect("graph pin-observation should parse");
match cli.command {
Some(Commands::Graph {
command:
GraphCommands::PinObservation {
observation_id,
json,
},
}) => {
assert_eq!(observation_id, 42);
assert!(json);
}
_ => panic!("expected graph pin-observation subcommand"),
}
}
#[test]
fn cli_parses_graph_unpin_observation_command() {
let cli = Cli::try_parse_from([
"ecc",
"graph",
"unpin-observation",
"--observation-id",
"42",
"--json",
])
.expect("graph unpin-observation should parse");
match cli.command {
Some(Commands::Graph {
command:
GraphCommands::UnpinObservation {
observation_id,
json,
},
}) => {
assert_eq!(observation_id, 42);
assert!(json);
}
_ => panic!("expected graph unpin-observation subcommand"),
}
}
#[test]
fn cli_parses_graph_compact_command() {
let cli = Cli::try_parse_from([
@@ -5701,6 +5819,7 @@ mod tests {
relation_count: 2,
observation_count: 1,
max_observation_priority: session::ContextObservationPriority::High,
has_pinned_observation: true,
}],
Some("sess-12345678"),
"auth callback recovery",
@@ -5709,6 +5828,7 @@ mod tests {
assert!(text.contains("Relevant memory: 1 entries"));
assert!(text.contains("[file] callback.ts | score 319 | relations 2 | observations 1"));
assert!(text.contains("priority high"));
assert!(text.contains("| pinned"));
assert!(text.contains("matches auth, callback, recovery"));
assert!(text.contains("path src/routes/auth/callback.ts"));
}
@@ -5723,6 +5843,7 @@ mod tests {
entity_name: "sess-12345678".to_string(),
observation_type: "completion_summary".to_string(),
priority: session::ContextObservationPriority::High,
pinned: true,
summary: "Finished auth callback recovery with 2 tests".to_string(),
details: BTreeMap::from([("tests_run".to_string(), "2".to_string())]),
created_at: chrono::DateTime::parse_from_rfc3339("2026-04-10T01:02:03Z")
@@ -5731,7 +5852,7 @@ mod tests {
}]);
assert!(text.contains("Context graph observations: 1"));
assert!(text.contains("[completion_summary/high] sess-12345678"));
assert!(text.contains("[completion_summary/high/pinned] sess-12345678"));
assert!(text.contains("summary Finished auth callback recovery with 2 tests"));
}

View File

@@ -199,6 +199,7 @@ pub struct ContextGraphObservation {
pub entity_name: String,
pub observation_type: String,
pub priority: ContextObservationPriority,
pub pinned: bool,
pub summary: String,
pub details: BTreeMap<String, String>,
pub created_at: DateTime<Utc>,
@@ -212,6 +213,7 @@ pub struct ContextGraphRecallEntry {
pub relation_count: usize,
pub observation_count: usize,
pub max_observation_priority: ContextObservationPriority,
pub has_pinned_observation: bool,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]

View File

@@ -268,6 +268,7 @@ impl StateStore {
entity_id INTEGER NOT NULL REFERENCES context_graph_entities(id) ON DELETE CASCADE,
observation_type TEXT NOT NULL,
priority INTEGER NOT NULL DEFAULT 1,
pinned INTEGER NOT NULL DEFAULT 0,
summary TEXT NOT NULL,
details_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL
@@ -473,6 +474,14 @@ impl StateStore {
)
.context("Failed to add priority column to context_graph_observations table")?;
}
if !self.has_column("context_graph_observations", "pinned")? {
self.conn
.execute(
"ALTER TABLE context_graph_observations ADD COLUMN pinned INTEGER NOT NULL DEFAULT 0",
[],
)
.context("Failed to add pinned column to context_graph_observations table")?;
}
if !self.has_column("daemon_activity", "last_dispatch_deferred")? {
self.conn
@@ -2103,7 +2112,12 @@ impl StateStore {
SELECT MAX(priority)
FROM context_graph_observations o
WHERE o.entity_id = e.id
), 1) AS max_observation_priority
), 1) AS max_observation_priority,
COALESCE((
SELECT MAX(pinned)
FROM context_graph_observations o
WHERE o.entity_id = e.id
), 0) AS has_pinned_observation
FROM context_graph_entities e
WHERE (?1 IS NULL OR e.session_id = ?1)
ORDER BY e.updated_at DESC, e.id DESC
@@ -2120,12 +2134,14 @@ impl StateStore {
let observation_count = row.get::<_, i64>(11)?.max(0) as usize;
let max_observation_priority =
ContextObservationPriority::from_db_value(row.get::<_, i64>(12)?);
let has_pinned_observation = row.get::<_, i64>(13)? != 0;
Ok((
entity,
relation_count,
observation_text,
observation_count,
max_observation_priority,
has_pinned_observation,
))
},
)?
@@ -2141,6 +2157,7 @@ impl StateStore {
observation_text,
observation_count,
max_observation_priority,
has_pinned_observation,
)| {
let matched_terms =
context_graph_matched_terms(&entity, &observation_text, &terms);
@@ -2154,6 +2171,7 @@ impl StateStore {
relation_count,
observation_count,
max_observation_priority,
has_pinned_observation,
entity.updated_at,
now,
),
@@ -2162,6 +2180,7 @@ impl StateStore {
relation_count,
observation_count,
max_observation_priority,
has_pinned_observation,
})
},
)
@@ -2250,6 +2269,7 @@ impl StateStore {
entity_id: i64,
observation_type: &str,
priority: ContextObservationPriority,
pinned: bool,
summary: &str,
details: &BTreeMap<String, String>,
) -> Result<ContextGraphObservation> {
@@ -2268,13 +2288,14 @@ impl StateStore {
let details_json = serde_json::to_string(details)?;
self.conn.execute(
"INSERT INTO context_graph_observations (
session_id, entity_id, observation_type, priority, summary, details_json, created_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
session_id, entity_id, observation_type, priority, pinned, summary, details_json, created_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
rusqlite::params![
session_id,
entity_id,
observation_type.trim(),
priority.as_db_value(),
pinned as i64,
summary.trim(),
details_json,
now,
@@ -2289,7 +2310,7 @@ impl StateStore {
self.conn
.query_row(
"SELECT o.id, o.session_id, o.entity_id, e.entity_type, e.name,
o.observation_type, o.priority, o.summary, o.details_json, o.created_at
o.observation_type, o.priority, o.pinned, o.summary, o.details_json, o.created_at
FROM context_graph_observations o
JOIN context_graph_entities e ON e.id = o.entity_id
WHERE o.id = ?1",
@@ -2299,6 +2320,34 @@ impl StateStore {
.map_err(Into::into)
}
pub fn set_context_observation_pinned(
&self,
observation_id: i64,
pinned: bool,
) -> Result<Option<ContextGraphObservation>> {
let changed = self.conn.execute(
"UPDATE context_graph_observations
SET pinned = ?2
WHERE id = ?1",
rusqlite::params![observation_id, pinned as i64],
)?;
if changed == 0 {
return Ok(None);
}
self.conn
.query_row(
"SELECT o.id, o.session_id, o.entity_id, e.entity_type, e.name,
o.observation_type, o.priority, o.pinned, o.summary, o.details_json, o.created_at
FROM context_graph_observations o
JOIN context_graph_entities e ON e.id = o.entity_id
WHERE o.id = ?1",
rusqlite::params![observation_id],
map_context_graph_observation,
)
.optional()
.map_err(Into::into)
}
pub fn compact_context_graph(
&self,
session_id: Option<&str>,
@@ -2312,6 +2361,7 @@ impl StateStore {
session_id: &str,
observation_type: &str,
priority: ContextObservationPriority,
pinned: bool,
summary: &str,
details: &BTreeMap<String, String>,
) -> Result<ContextGraphObservation> {
@@ -2321,6 +2371,7 @@ impl StateStore {
session_entity.id,
observation_type,
priority,
pinned,
summary,
details,
)
@@ -2333,11 +2384,11 @@ impl StateStore {
) -> Result<Vec<ContextGraphObservation>> {
let mut stmt = self.conn.prepare(
"SELECT o.id, o.session_id, o.entity_id, e.entity_type, e.name,
o.observation_type, o.priority, o.summary, o.details_json, o.created_at
o.observation_type, o.priority, o.pinned, o.summary, o.details_json, o.created_at
FROM context_graph_observations o
JOIN context_graph_entities e ON e.id = o.entity_id
WHERE (?1 IS NULL OR o.entity_id = ?1)
ORDER BY o.created_at DESC, o.id DESC
ORDER BY o.pinned DESC, o.created_at DESC, o.id DESC
LIMIT ?2",
)?;
@@ -2414,7 +2465,7 @@ impl StateStore {
SELECT o.id,
ROW_NUMBER() OVER (
PARTITION BY o.entity_id, o.observation_type, o.summary
ORDER BY o.created_at DESC, o.id DESC
ORDER BY o.pinned DESC, o.created_at DESC, o.id DESC
) AS rn
FROM context_graph_observations o
JOIN context_graph_entities e ON e.id = o.entity_id
@@ -2435,6 +2486,7 @@ impl StateStore {
JOIN context_graph_entities e ON e.id = o.entity_id
WHERE (?1 IS NULL OR e.session_id = ?1)
AND (?2 IS NULL OR o.entity_id = ?2)
AND o.pinned = 0
)",
rusqlite::params![session_id, entity_id],
)?
@@ -2453,6 +2505,7 @@ impl StateStore {
JOIN context_graph_entities e ON e.id = o.entity_id
WHERE (?1 IS NULL OR e.session_id = ?1)
AND (?2 IS NULL OR o.entity_id = ?2)
AND o.pinned = 0
) ranked
WHERE ranked.rn > ?3
)",
@@ -3464,12 +3517,12 @@ fn map_context_graph_observation(
row: &rusqlite::Row<'_>,
) -> rusqlite::Result<ContextGraphObservation> {
let details_json = row
.get::<_, Option<String>>(8)?
.get::<_, Option<String>>(9)?
.unwrap_or_else(|| "{}".to_string());
let details = serde_json::from_str(&details_json).map_err(|error| {
rusqlite::Error::FromSqlConversionFailure(8, rusqlite::types::Type::Text, Box::new(error))
rusqlite::Error::FromSqlConversionFailure(9, rusqlite::types::Type::Text, Box::new(error))
})?;
let created_at = parse_store_timestamp(row.get::<_, String>(9)?, 9)?;
let created_at = parse_store_timestamp(row.get::<_, String>(10)?, 10)?;
Ok(ContextGraphObservation {
id: row.get(0)?,
@@ -3479,7 +3532,8 @@ fn map_context_graph_observation(
entity_name: row.get(4)?,
observation_type: row.get(5)?,
priority: ContextObservationPriority::from_db_value(row.get::<_, i64>(6)?),
summary: row.get(7)?,
pinned: row.get::<_, i64>(7)? != 0,
summary: row.get(8)?,
details,
created_at,
})
@@ -3534,6 +3588,7 @@ fn context_graph_recall_score(
relation_count: usize,
observation_count: usize,
max_observation_priority: ContextObservationPriority,
has_pinned_observation: bool,
updated_at: chrono::DateTime<chrono::Utc>,
now: chrono::DateTime<chrono::Utc>,
) -> u64 {
@@ -3554,6 +3609,7 @@ fn context_graph_recall_score(
+ (relation_count.min(9) as u64 * 10)
+ (observation_count.min(6) as u64 * 8)
+ (max_observation_priority.as_db_value() as u64 * 18)
+ if has_pinned_observation { 48 } else { 0 }
+ recency_bonus
}
@@ -4376,6 +4432,7 @@ mod tests {
entity.id,
"note",
ContextObservationPriority::Normal,
false,
"Customer wiped setup and got charged twice",
&BTreeMap::from([("customer".to_string(), "viktor".to_string())]),
)?;
@@ -4386,6 +4443,7 @@ mod tests {
assert_eq!(observations[0].entity_name, "Prefer recovery-first routing");
assert_eq!(observations[0].observation_type, "note");
assert_eq!(observations[0].priority, ContextObservationPriority::Normal);
assert!(!observations[0].pinned);
assert_eq!(
observations[0].details.get("customer"),
Some(&"viktor".to_string())
@@ -4503,6 +4561,7 @@ mod tests {
entity.id,
"completion_summary",
ContextObservationPriority::Normal,
false,
&summary,
&BTreeMap::new(),
)?;
@@ -4586,6 +4645,7 @@ mod tests {
recovery.id,
"incident_note",
ContextObservationPriority::High,
true,
"Previous auth callback recovery incident affected Viktor after a wipe",
&BTreeMap::new(),
)?;
@@ -4605,6 +4665,7 @@ mod tests {
results[0].max_observation_priority,
ContextObservationPriority::High
);
assert!(results[0].has_pinned_observation);
assert_eq!(results[1].entity.id, callback.id);
assert!(results[1]
.matched_terms
@@ -4620,11 +4681,134 @@ mod tests {
results[1].max_observation_priority,
ContextObservationPriority::Normal
);
assert!(!results[1].has_pinned_observation);
assert!(!results.iter().any(|entry| entry.entity.id == unrelated.id));
Ok(())
}
#[test]
fn compact_context_graph_preserves_pinned_observations() -> Result<()> {
let tempdir = TestDir::new("store-context-pinned-observations")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let now = Utc::now();
db.insert_session(&Session {
id: "session-1".to_string(),
task: "deep memory".to_string(),
project: "workspace".to_string(),
task_group: "knowledge".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
let entity = db.upsert_context_entity(
Some("session-1"),
"incident",
"billing-recovery",
None,
"Recovery notes",
&BTreeMap::new(),
)?;
db.add_context_observation(
Some("session-1"),
entity.id,
"incident_note",
ContextObservationPriority::High,
true,
"Pinned billing recovery memory",
&BTreeMap::new(),
)?;
std::thread::sleep(std::time::Duration::from_millis(2));
db.add_context_observation(
Some("session-1"),
entity.id,
"incident_note",
ContextObservationPriority::Normal,
false,
"Newest unpinned memory",
&BTreeMap::new(),
)?;
let stats = db.compact_context_graph(None, 1)?;
assert_eq!(stats.observations_retained, 2);
let observations = db.list_context_observations(Some(entity.id), 10)?;
assert_eq!(observations.len(), 2);
assert!(observations.iter().any(|entry| entry.pinned));
assert!(observations
.iter()
.any(|entry| entry.summary == "Pinned billing recovery memory"));
assert!(observations
.iter()
.any(|entry| entry.summary == "Newest unpinned memory"));
Ok(())
}
#[test]
fn set_context_observation_pinned_updates_existing_observation() -> Result<()> {
let tempdir = TestDir::new("store-context-pin-toggle")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let now = Utc::now();
db.insert_session(&Session {
id: "session-1".to_string(),
task: "deep memory".to_string(),
project: "workspace".to_string(),
task_group: "knowledge".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
let entity = db.upsert_context_entity(
Some("session-1"),
"incident",
"billing-recovery",
None,
"Recovery notes",
&BTreeMap::new(),
)?;
let observation = db.add_context_observation(
Some("session-1"),
entity.id,
"incident_note",
ContextObservationPriority::Normal,
false,
"Temporarily useful note",
&BTreeMap::new(),
)?;
assert!(!observation.pinned);
let pinned = db
.set_context_observation_pinned(observation.id, true)?
.expect("observation should exist");
assert!(pinned.pinned);
let unpinned = db
.set_context_observation_pinned(observation.id, false)?
.expect("observation should still exist");
assert!(!unpinned.pinned);
Ok(())
}
#[test]
fn context_graph_detail_includes_incoming_and_outgoing_relations() -> Result<()> {
let tempdir = TestDir::new("store-context-relations")?;

View File

@@ -4261,6 +4261,7 @@ impl Dashboard {
&session.id,
observation_type,
priority,
false,
&observation_summary,
&details,
) {
@@ -5374,6 +5375,9 @@ impl Dashboard {
entry.observation_count,
entry.max_observation_priority
);
if entry.has_pinned_observation {
line.push_str(" | pinned");
}
if let Some(session_id) = entry.entity.session_id.as_deref() {
if session_id != session.id {
line.push_str(&format!(" | {}", format_session_id(session_id)));
@@ -5395,8 +5399,9 @@ impl Dashboard {
if let Ok(observations) = self.db.list_context_observations(Some(entry.entity.id), 1) {
if let Some(observation) = observations.first() {
lines.push(format!(
" memory [{}] {}",
" memory [{}{}] {}",
observation.priority,
if observation.pinned { "/pinned" } else { "" },
truncate_for_dashboard(&observation.summary, 72)
));
}
@@ -10544,6 +10549,7 @@ diff --git a/src/lib.rs b/src/lib.rs\n\
entity.id,
"completion_summary",
ContextObservationPriority::Normal,
true,
"Recovered auth callback incident with billing fallback",
&BTreeMap::new(),
)?;
@@ -10551,10 +10557,11 @@ diff --git a/src/lib.rs b/src/lib.rs\n\
let text = dashboard.selected_session_metrics_text();
assert!(text.contains("Relevant memory"));
assert!(text.contains("[file] callback.ts"));
assert!(text.contains("| pinned"));
assert!(text.contains("matches auth, callback, recovery"));
assert!(
text.contains("memory [normal] Recovered auth callback incident with billing fallback")
);
assert!(text.contains(
"memory [normal/pinned] Recovered auth callback incident with billing fallback"
));
Ok(())
}