feat: add ecc2 graph observations

This commit is contained in:
Affaan Mustafa
2026-04-10 06:02:24 -07:00
parent 727d9380cb
commit 77c9082deb
4 changed files with 610 additions and 28 deletions

View File

@@ -14,9 +14,10 @@ use crate::observability::{ToolCallEvent, ToolLogEntry, ToolLogPage};
use super::output::{OutputLine, OutputStream, OUTPUT_BUFFER_LIMIT};
use super::{
default_project_label, default_task_group_label, normalize_group_label, ContextGraphEntity,
ContextGraphEntityDetail, ContextGraphRecallEntry, ContextGraphRelation, ContextGraphSyncStats,
DecisionLogEntry, FileActivityAction, FileActivityEntry, Session, SessionAgentProfile,
SessionMessage, SessionMetrics, SessionState, WorktreeInfo,
ContextGraphEntityDetail, ContextGraphObservation, ContextGraphRecallEntry,
ContextGraphRelation, ContextGraphSyncStats, DecisionLogEntry, FileActivityAction,
FileActivityEntry, Session, SessionAgentProfile, SessionMessage, SessionMetrics, SessionState,
WorktreeInfo,
};
pub struct StateStore {
@@ -259,6 +260,16 @@ impl StateStore {
UNIQUE(from_entity_id, to_entity_id, relation_type)
);
CREATE TABLE IF NOT EXISTS context_graph_observations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT REFERENCES sessions(id) ON DELETE SET NULL,
entity_id INTEGER NOT NULL REFERENCES context_graph_entities(id) ON DELETE CASCADE,
observation_type TEXT NOT NULL,
summary TEXT NOT NULL,
details_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS pending_worktree_queue (
session_id TEXT PRIMARY KEY REFERENCES sessions(id) ON DELETE CASCADE,
repo_root TEXT NOT NULL,
@@ -319,6 +330,8 @@ impl StateStore {
ON context_graph_relations(from_entity_id, created_at, id);
CREATE INDEX IF NOT EXISTS idx_context_graph_relations_to
ON context_graph_relations(to_entity_id, created_at, id);
CREATE INDEX IF NOT EXISTS idx_context_graph_observations_entity
ON context_graph_observations(entity_id, created_at, id);
CREATE INDEX IF NOT EXISTS idx_conflict_incidents_sessions
ON conflict_incidents(first_session_id, second_session_id, resolved_at, updated_at);
CREATE INDEX IF NOT EXISTS idx_pending_worktree_queue_requested_at
@@ -2047,7 +2060,22 @@ impl StateStore {
SELECT COUNT(*)
FROM context_graph_relations r
WHERE r.from_entity_id = e.id OR r.to_entity_id = e.id
) AS relation_count
) AS relation_count,
COALESCE((
SELECT group_concat(summary, ' ')
FROM (
SELECT summary
FROM context_graph_observations o
WHERE o.entity_id = e.id
ORDER BY o.created_at DESC, o.id DESC
LIMIT 4
)
), '') AS observation_text,
(
SELECT COUNT(*)
FROM context_graph_observations o
WHERE o.entity_id = e.id
) AS observation_count
FROM context_graph_entities e
WHERE (?1 IS NULL OR e.session_id = ?1)
ORDER BY e.updated_at DESC, e.id DESC
@@ -2060,7 +2088,9 @@ impl StateStore {
|row| {
let entity = map_context_graph_entity(row)?;
let relation_count = row.get::<_, i64>(9)?.max(0) as usize;
Ok((entity, relation_count))
let observation_text = row.get::<_, String>(10)?;
let observation_count = row.get::<_, i64>(11)?.max(0) as usize;
Ok((entity, relation_count, observation_text, observation_count))
},
)?
.collect::<Result<Vec<_>, _>>()?;
@@ -2068,24 +2098,29 @@ impl StateStore {
let now = chrono::Utc::now();
let mut entries = candidates
.into_iter()
.filter_map(|(entity, relation_count)| {
let matched_terms = context_graph_matched_terms(&entity, &terms);
if matched_terms.is_empty() {
return None;
}
.filter_map(
|(entity, relation_count, observation_text, observation_count)| {
let matched_terms =
context_graph_matched_terms(&entity, &observation_text, &terms);
if matched_terms.is_empty() {
return None;
}
Some(ContextGraphRecallEntry {
score: context_graph_recall_score(
matched_terms.len(),
Some(ContextGraphRecallEntry {
score: context_graph_recall_score(
matched_terms.len(),
relation_count,
observation_count,
entity.updated_at,
now,
),
entity,
matched_terms,
relation_count,
entity.updated_at,
now,
),
entity,
matched_terms,
relation_count,
})
})
observation_count,
})
},
)
.collect::<Vec<_>>();
entries.sort_by(|left, right| {
@@ -2165,6 +2200,95 @@ impl StateStore {
}))
}
pub fn add_context_observation(
&self,
session_id: Option<&str>,
entity_id: i64,
observation_type: &str,
summary: &str,
details: &BTreeMap<String, String>,
) -> Result<ContextGraphObservation> {
if observation_type.trim().is_empty() {
return Err(anyhow::anyhow!(
"Context graph observation type cannot be empty"
));
}
if summary.trim().is_empty() {
return Err(anyhow::anyhow!(
"Context graph observation summary cannot be empty"
));
}
let now = chrono::Utc::now().to_rfc3339();
let details_json = serde_json::to_string(details)?;
self.conn.execute(
"INSERT INTO context_graph_observations (
session_id, entity_id, observation_type, summary, details_json, created_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
rusqlite::params![
session_id,
entity_id,
observation_type.trim(),
summary.trim(),
details_json,
now,
],
)?;
let observation_id = self.conn.last_insert_rowid();
self.conn
.query_row(
"SELECT o.id, o.session_id, o.entity_id, e.entity_type, e.name,
o.observation_type, o.summary, o.details_json, o.created_at
FROM context_graph_observations o
JOIN context_graph_entities e ON e.id = o.entity_id
WHERE o.id = ?1",
rusqlite::params![observation_id],
map_context_graph_observation,
)
.map_err(Into::into)
}
pub fn add_session_observation(
&self,
session_id: &str,
observation_type: &str,
summary: &str,
details: &BTreeMap<String, String>,
) -> Result<ContextGraphObservation> {
let session_entity = self.sync_context_graph_session(session_id)?;
self.add_context_observation(
Some(session_id),
session_entity.id,
observation_type,
summary,
details,
)
}
pub fn list_context_observations(
&self,
entity_id: Option<i64>,
limit: usize,
) -> Result<Vec<ContextGraphObservation>> {
let mut stmt = self.conn.prepare(
"SELECT o.id, o.session_id, o.entity_id, e.entity_type, e.name,
o.observation_type, o.summary, o.details_json, o.created_at
FROM context_graph_observations o
JOIN context_graph_entities e ON e.id = o.entity_id
WHERE (?1 IS NULL OR o.entity_id = ?1)
ORDER BY o.created_at DESC, o.id DESC
LIMIT ?2",
)?;
let entries = stmt
.query_map(
rusqlite::params![entity_id, limit as i64],
map_context_graph_observation,
)?
.collect::<Result<Vec<_>, _>>()?;
Ok(entries)
}
pub fn upsert_context_relation(
&self,
session_id: Option<&str>,
@@ -3147,6 +3271,30 @@ fn map_context_graph_relation(row: &rusqlite::Row<'_>) -> rusqlite::Result<Conte
})
}
fn map_context_graph_observation(
row: &rusqlite::Row<'_>,
) -> rusqlite::Result<ContextGraphObservation> {
let details_json = row
.get::<_, Option<String>>(7)?
.unwrap_or_else(|| "{}".to_string());
let details = serde_json::from_str(&details_json).map_err(|error| {
rusqlite::Error::FromSqlConversionFailure(7, rusqlite::types::Type::Text, Box::new(error))
})?;
let created_at = parse_store_timestamp(row.get::<_, String>(8)?, 8)?;
Ok(ContextGraphObservation {
id: row.get(0)?,
session_id: row.get(1)?,
entity_id: row.get(2)?,
entity_type: row.get(3)?,
entity_name: row.get(4)?,
observation_type: row.get(5)?,
summary: row.get(6)?,
details,
created_at,
})
}
fn context_graph_recall_terms(query: &str) -> Vec<String> {
let mut terms = Vec::new();
for raw_term in
@@ -3161,7 +3309,11 @@ fn context_graph_recall_terms(query: &str) -> Vec<String> {
terms
}
fn context_graph_matched_terms(entity: &ContextGraphEntity, terms: &[String]) -> Vec<String> {
fn context_graph_matched_terms(
entity: &ContextGraphEntity,
observation_text: &str,
terms: &[String],
) -> Vec<String> {
let mut haystacks = vec![
entity.entity_type.to_ascii_lowercase(),
entity.name.to_ascii_lowercase(),
@@ -3174,6 +3326,9 @@ fn context_graph_matched_terms(entity: &ContextGraphEntity, terms: &[String]) ->
haystacks.push(key.to_ascii_lowercase());
haystacks.push(value.to_ascii_lowercase());
}
if !observation_text.trim().is_empty() {
haystacks.push(observation_text.to_ascii_lowercase());
}
let mut matched = Vec::new();
for term in terms {
@@ -3187,6 +3342,7 @@ fn context_graph_matched_terms(entity: &ContextGraphEntity, terms: &[String]) ->
fn context_graph_recall_score(
matched_term_count: usize,
relation_count: usize,
observation_count: usize,
updated_at: chrono::DateTime<chrono::Utc>,
now: chrono::DateTime<chrono::Utc>,
) -> u64 {
@@ -3203,7 +3359,10 @@ fn context_graph_recall_score(
}
};
(matched_term_count as u64 * 100) + (relation_count.min(9) as u64 * 10) + recency_bonus
(matched_term_count as u64 * 100)
+ (relation_count.min(9) as u64 * 10)
+ (observation_count.min(6) as u64 * 8)
+ recency_bonus
}
fn parse_store_timestamp(
@@ -3990,6 +4149,57 @@ mod tests {
Ok(())
}
#[test]
fn add_and_list_context_observations() -> Result<()> {
let tempdir = TestDir::new("store-context-observations")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let now = Utc::now();
db.insert_session(&Session {
id: "session-1".to_string(),
task: "deep memory".to_string(),
project: "workspace".to_string(),
task_group: "knowledge".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
let entity = db.upsert_context_entity(
Some("session-1"),
"decision",
"Prefer recovery-first routing",
None,
"Recovered installs should go through the portal first",
&BTreeMap::new(),
)?;
let observation = db.add_context_observation(
Some("session-1"),
entity.id,
"note",
"Customer wiped setup and got charged twice",
&BTreeMap::from([("customer".to_string(), "viktor".to_string())]),
)?;
let observations = db.list_context_observations(Some(entity.id), 10)?;
assert_eq!(observations.len(), 1);
assert_eq!(observations[0].id, observation.id);
assert_eq!(observations[0].entity_name, "Prefer recovery-first routing");
assert_eq!(observations[0].observation_type, "note");
assert_eq!(
observations[0].details.get("customer"),
Some(&"viktor".to_string())
);
Ok(())
}
#[test]
fn recall_context_entities_ranks_matching_entities() -> Result<()> {
let tempdir = TestDir::new("store-context-recall")?;
@@ -4051,6 +4261,13 @@ mod tests {
"references",
"Callback route references the dashboard summary",
)?;
db.add_context_observation(
Some("session-1"),
recovery.id,
"incident_note",
"Previous auth callback recovery incident affected Viktor after a wipe",
&BTreeMap::new(),
)?;
let results =
db.recall_context_entities(Some("session-1"), "Investigate auth callback recovery", 3)?;
@@ -4068,6 +4285,7 @@ mod tests {
.any(|term| term == "recovery"));
assert_eq!(results[0].relation_count, 2);
assert_eq!(results[1].entity.id, recovery.id);
assert_eq!(results[1].observation_count, 1);
assert!(!results.iter().any(|entry| entry.entity.id == unrelated.id));
Ok(())