feat: add ecc2 shared context graph cli

This commit is contained in:
Affaan Mustafa
2026-04-10 03:50:21 -07:00
parent 194bf605c2
commit 8653d6d5d5
4 changed files with 999 additions and 8 deletions

View File

@@ -2,7 +2,7 @@ use anyhow::{Context, Result};
use rusqlite::{Connection, OptionalExtension};
use serde::Serialize;
use std::cmp::Reverse;
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
@@ -13,9 +13,10 @@ use crate::observability::{ToolCallEvent, ToolLogEntry, ToolLogPage};
use super::output::{OutputLine, OutputStream, OUTPUT_BUFFER_LIMIT};
use super::{
default_project_label, default_task_group_label, normalize_group_label, DecisionLogEntry,
FileActivityAction, FileActivityEntry, Session, SessionAgentProfile, SessionMessage,
SessionMetrics, SessionState, WorktreeInfo,
default_project_label, default_task_group_label, normalize_group_label, ContextGraphEntity,
ContextGraphEntityDetail, ContextGraphRelation, DecisionLogEntry, FileActivityAction,
FileActivityEntry, Session, SessionAgentProfile, SessionMessage, SessionMetrics, SessionState,
WorktreeInfo,
};
pub struct StateStore {
@@ -234,6 +235,30 @@ impl StateStore {
timestamp TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS context_graph_entities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT REFERENCES sessions(id) ON DELETE SET NULL,
entity_key TEXT NOT NULL UNIQUE,
entity_type TEXT NOT NULL,
name TEXT NOT NULL,
path TEXT,
summary TEXT NOT NULL DEFAULT '',
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS context_graph_relations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT REFERENCES sessions(id) ON DELETE SET NULL,
from_entity_id INTEGER NOT NULL REFERENCES context_graph_entities(id) ON DELETE CASCADE,
to_entity_id INTEGER NOT NULL REFERENCES context_graph_entities(id) ON DELETE CASCADE,
relation_type TEXT NOT NULL,
summary TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
UNIQUE(from_entity_id, to_entity_id, relation_type)
);
CREATE TABLE IF NOT EXISTS pending_worktree_queue (
session_id TEXT PRIMARY KEY REFERENCES sessions(id) ON DELETE CASCADE,
repo_root TEXT NOT NULL,
@@ -288,6 +313,12 @@ impl StateStore {
ON session_output(session_id, id);
CREATE INDEX IF NOT EXISTS idx_decision_log_session
ON decision_log(session_id, timestamp, id);
CREATE INDEX IF NOT EXISTS idx_context_graph_entities_session
ON context_graph_entities(session_id, entity_type, updated_at, id);
CREATE INDEX IF NOT EXISTS idx_context_graph_relations_from
ON context_graph_relations(from_entity_id, created_at, id);
CREATE INDEX IF NOT EXISTS idx_context_graph_relations_to
ON context_graph_relations(to_entity_id, created_at, id);
CREATE INDEX IF NOT EXISTS idx_conflict_incidents_sessions
ON conflict_incidents(first_session_id, second_session_id, resolved_at, updated_at);
CREATE INDEX IF NOT EXISTS idx_pending_worktree_queue_requested_at
@@ -1652,6 +1683,241 @@ impl StateStore {
Ok(entries)
}
pub fn upsert_context_entity(
&self,
session_id: Option<&str>,
entity_type: &str,
name: &str,
path: Option<&str>,
summary: &str,
metadata: &BTreeMap<String, String>,
) -> Result<ContextGraphEntity> {
let entity_type = entity_type.trim();
if entity_type.is_empty() {
return Err(anyhow::anyhow!("Context graph entity type cannot be empty"));
}
let name = name.trim();
if name.is_empty() {
return Err(anyhow::anyhow!("Context graph entity name cannot be empty"));
}
let normalized_path = path.map(str::trim).filter(|value| !value.is_empty());
let summary = summary.trim();
let entity_key = context_graph_entity_key(entity_type, name, normalized_path);
let metadata_json = serde_json::to_string(metadata)
.context("Failed to serialize context graph metadata")?;
let timestamp = chrono::Utc::now().to_rfc3339();
self.conn.execute(
"INSERT INTO context_graph_entities (
session_id, entity_key, entity_type, name, path, summary, metadata_json, created_at, updated_at
)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)
ON CONFLICT(entity_key) DO UPDATE SET
session_id = COALESCE(excluded.session_id, context_graph_entities.session_id),
summary = CASE
WHEN excluded.summary <> '' THEN excluded.summary
ELSE context_graph_entities.summary
END,
metadata_json = excluded.metadata_json,
updated_at = excluded.updated_at",
rusqlite::params![
session_id,
entity_key,
entity_type,
name,
normalized_path,
summary,
metadata_json,
timestamp,
],
)?;
self.conn
.query_row(
"SELECT id, session_id, entity_type, name, path, summary, metadata_json, created_at, updated_at
FROM context_graph_entities
WHERE entity_key = ?1",
rusqlite::params![entity_key],
map_context_graph_entity,
)
.map_err(Into::into)
}
pub fn list_context_entities(
&self,
session_id: Option<&str>,
entity_type: Option<&str>,
limit: usize,
) -> Result<Vec<ContextGraphEntity>> {
let mut stmt = self.conn.prepare(
"SELECT id, session_id, entity_type, name, path, summary, metadata_json, created_at, updated_at
FROM context_graph_entities
WHERE (?1 IS NULL OR session_id = ?1)
AND (?2 IS NULL OR entity_type = ?2)
ORDER BY updated_at DESC, id DESC
LIMIT ?3",
)?;
let entries = stmt
.query_map(
rusqlite::params![session_id, entity_type, limit as i64],
map_context_graph_entity,
)?
.collect::<Result<Vec<_>, _>>()?;
Ok(entries)
}
pub fn get_context_entity_detail(
&self,
entity_id: i64,
relation_limit: usize,
) -> Result<Option<ContextGraphEntityDetail>> {
let entity = self
.conn
.query_row(
"SELECT id, session_id, entity_type, name, path, summary, metadata_json, created_at, updated_at
FROM context_graph_entities
WHERE id = ?1",
rusqlite::params![entity_id],
map_context_graph_entity,
)
.optional()?;
let Some(entity) = entity else {
return Ok(None);
};
let mut outgoing_stmt = self.conn.prepare(
"SELECT r.id, r.session_id,
r.from_entity_id, src.entity_type, src.name,
r.to_entity_id, dst.entity_type, dst.name,
r.relation_type, r.summary, r.created_at
FROM context_graph_relations r
JOIN context_graph_entities src ON src.id = r.from_entity_id
JOIN context_graph_entities dst ON dst.id = r.to_entity_id
WHERE r.from_entity_id = ?1
ORDER BY r.created_at DESC, r.id DESC
LIMIT ?2",
)?;
let outgoing = outgoing_stmt
.query_map(
rusqlite::params![entity_id, relation_limit as i64],
map_context_graph_relation,
)?
.collect::<Result<Vec<_>, _>>()?;
let mut incoming_stmt = self.conn.prepare(
"SELECT r.id, r.session_id,
r.from_entity_id, src.entity_type, src.name,
r.to_entity_id, dst.entity_type, dst.name,
r.relation_type, r.summary, r.created_at
FROM context_graph_relations r
JOIN context_graph_entities src ON src.id = r.from_entity_id
JOIN context_graph_entities dst ON dst.id = r.to_entity_id
WHERE r.to_entity_id = ?1
ORDER BY r.created_at DESC, r.id DESC
LIMIT ?2",
)?;
let incoming = incoming_stmt
.query_map(
rusqlite::params![entity_id, relation_limit as i64],
map_context_graph_relation,
)?
.collect::<Result<Vec<_>, _>>()?;
Ok(Some(ContextGraphEntityDetail {
entity,
outgoing,
incoming,
}))
}
pub fn upsert_context_relation(
&self,
session_id: Option<&str>,
from_entity_id: i64,
to_entity_id: i64,
relation_type: &str,
summary: &str,
) -> Result<ContextGraphRelation> {
let relation_type = relation_type.trim();
if relation_type.is_empty() {
return Err(anyhow::anyhow!(
"Context graph relation type cannot be empty"
));
}
let summary = summary.trim();
let timestamp = chrono::Utc::now().to_rfc3339();
self.conn.execute(
"INSERT INTO context_graph_relations (
session_id, from_entity_id, to_entity_id, relation_type, summary, created_at
)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)
ON CONFLICT(from_entity_id, to_entity_id, relation_type) DO UPDATE SET
session_id = COALESCE(excluded.session_id, context_graph_relations.session_id),
summary = CASE
WHEN excluded.summary <> '' THEN excluded.summary
ELSE context_graph_relations.summary
END",
rusqlite::params![
session_id,
from_entity_id,
to_entity_id,
relation_type,
summary,
timestamp,
],
)?;
self.conn
.query_row(
"SELECT r.id, r.session_id,
r.from_entity_id, src.entity_type, src.name,
r.to_entity_id, dst.entity_type, dst.name,
r.relation_type, r.summary, r.created_at
FROM context_graph_relations r
JOIN context_graph_entities src ON src.id = r.from_entity_id
JOIN context_graph_entities dst ON dst.id = r.to_entity_id
WHERE r.from_entity_id = ?1
AND r.to_entity_id = ?2
AND r.relation_type = ?3",
rusqlite::params![from_entity_id, to_entity_id, relation_type],
map_context_graph_relation,
)
.map_err(Into::into)
}
pub fn list_context_relations(
&self,
entity_id: Option<i64>,
limit: usize,
) -> Result<Vec<ContextGraphRelation>> {
let mut stmt = self.conn.prepare(
"SELECT r.id, r.session_id,
r.from_entity_id, src.entity_type, src.name,
r.to_entity_id, dst.entity_type, dst.name,
r.relation_type, r.summary, r.created_at
FROM context_graph_relations r
JOIN context_graph_entities src ON src.id = r.from_entity_id
JOIN context_graph_entities dst ON dst.id = r.to_entity_id
WHERE (?1 IS NULL OR r.from_entity_id = ?1 OR r.to_entity_id = ?1)
ORDER BY r.created_at DESC, r.id DESC
LIMIT ?2",
)?;
let relations = stmt
.query_map(
rusqlite::params![entity_id, limit as i64],
map_context_graph_relation,
)?
.collect::<Result<Vec<_>, _>>()?;
Ok(relations)
}
pub fn daemon_activity(&self) -> Result<DaemonActivity> {
self.conn
.query_row(
@@ -2509,6 +2775,71 @@ fn map_decision_log_entry(row: &rusqlite::Row<'_>) -> rusqlite::Result<DecisionL
})
}
fn map_context_graph_entity(row: &rusqlite::Row<'_>) -> rusqlite::Result<ContextGraphEntity> {
let metadata_json = row
.get::<_, Option<String>>(6)?
.unwrap_or_else(|| "{}".to_string());
let metadata = serde_json::from_str(&metadata_json).map_err(|error| {
rusqlite::Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(error))
})?;
let created_at = parse_store_timestamp(row.get::<_, String>(7)?, 7)?;
let updated_at = parse_store_timestamp(row.get::<_, String>(8)?, 8)?;
Ok(ContextGraphEntity {
id: row.get(0)?,
session_id: row.get(1)?,
entity_type: row.get(2)?,
name: row.get(3)?,
path: row.get(4)?,
summary: row.get(5)?,
metadata,
created_at,
updated_at,
})
}
fn map_context_graph_relation(row: &rusqlite::Row<'_>) -> rusqlite::Result<ContextGraphRelation> {
let created_at = parse_store_timestamp(row.get::<_, String>(10)?, 10)?;
Ok(ContextGraphRelation {
id: row.get(0)?,
session_id: row.get(1)?,
from_entity_id: row.get(2)?,
from_entity_type: row.get(3)?,
from_entity_name: row.get(4)?,
to_entity_id: row.get(5)?,
to_entity_type: row.get(6)?,
to_entity_name: row.get(7)?,
relation_type: row.get(8)?,
summary: row.get(9)?,
created_at,
})
}
fn parse_store_timestamp(
raw: String,
column: usize,
) -> rusqlite::Result<chrono::DateTime<chrono::Utc>> {
chrono::DateTime::parse_from_rfc3339(&raw)
.map(|value| value.with_timezone(&chrono::Utc))
.map_err(|error| {
rusqlite::Error::FromSqlConversionFailure(
column,
rusqlite::types::Type::Text,
Box::new(error),
)
})
}
fn context_graph_entity_key(entity_type: &str, name: &str, path: Option<&str>) -> String {
format!(
"{}::{}::{}",
entity_type.trim().to_ascii_lowercase(),
name.trim().to_ascii_lowercase(),
path.unwrap_or("").trim()
)
}
fn file_overlap_is_relevant(current: &FileActivityEntry, other: &FileActivityEntry) -> bool {
current.path == other.path
&& !(matches!(current.action, FileActivityAction::Read)
@@ -3194,6 +3525,156 @@ mod tests {
Ok(())
}
#[test]
fn upsert_and_filter_context_graph_entities() -> Result<()> {
let tempdir = TestDir::new("store-context-entities")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let now = Utc::now();
db.insert_session(&Session {
id: "session-1".to_string(),
task: "context graph".to_string(),
project: "workspace".to_string(),
task_group: "knowledge".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
let mut metadata = BTreeMap::new();
metadata.insert("language".to_string(), "rust".to_string());
let file = db.upsert_context_entity(
Some("session-1"),
"file",
"dashboard.rs",
Some("ecc2/src/tui/dashboard.rs"),
"Primary dashboard surface",
&metadata,
)?;
let updated = db.upsert_context_entity(
Some("session-1"),
"file",
"dashboard.rs",
Some("ecc2/src/tui/dashboard.rs"),
"Updated dashboard summary",
&metadata,
)?;
let decision = db.upsert_context_entity(
None,
"decision",
"Prefer SQLite graph storage",
None,
"Keeps graph queryable from CLI and TUI",
&BTreeMap::new(),
)?;
assert_eq!(file.id, updated.id);
assert_eq!(updated.summary, "Updated dashboard summary");
let session_entities = db.list_context_entities(Some("session-1"), Some("file"), 10)?;
assert_eq!(session_entities.len(), 1);
assert_eq!(session_entities[0].id, file.id);
assert_eq!(
session_entities[0].metadata.get("language"),
Some(&"rust".to_string())
);
let all_entities = db.list_context_entities(None, None, 10)?;
assert_eq!(all_entities.len(), 2);
assert!(all_entities.iter().any(|entity| entity.id == decision.id));
Ok(())
}
#[test]
fn context_graph_detail_includes_incoming_and_outgoing_relations() -> Result<()> {
let tempdir = TestDir::new("store-context-relations")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let now = Utc::now();
db.insert_session(&Session {
id: "session-1".to_string(),
task: "context graph".to_string(),
project: "workspace".to_string(),
task_group: "knowledge".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
let file = db.upsert_context_entity(
Some("session-1"),
"file",
"dashboard.rs",
Some("ecc2/src/tui/dashboard.rs"),
"",
&BTreeMap::new(),
)?;
let function = db.upsert_context_entity(
Some("session-1"),
"function",
"render_metrics",
Some("ecc2/src/tui/dashboard.rs"),
"",
&BTreeMap::new(),
)?;
let decision = db.upsert_context_entity(
Some("session-1"),
"decision",
"Persist graph in sqlite",
None,
"",
&BTreeMap::new(),
)?;
db.upsert_context_relation(
Some("session-1"),
file.id,
function.id,
"contains",
"Dashboard file contains metrics rendering logic",
)?;
db.upsert_context_relation(
Some("session-1"),
decision.id,
function.id,
"drives",
"Storage choice drives the function implementation",
)?;
let detail = db
.get_context_entity_detail(function.id, 10)?
.expect("detail should exist");
assert_eq!(detail.entity.name, "render_metrics");
assert_eq!(detail.incoming.len(), 2);
assert!(detail.outgoing.is_empty());
let relation_types = detail
.incoming
.iter()
.map(|relation| relation.relation_type.as_str())
.collect::<Vec<_>>();
assert!(relation_types.contains(&"contains"));
assert!(relation_types.contains(&"drives"));
let filtered_relations = db.list_context_relations(Some(function.id), 10)?;
assert_eq!(filtered_relations.len(), 2);
Ok(())
}
#[test]
fn refresh_session_durations_updates_running_and_terminal_sessions() -> Result<()> {
let tempdir = TestDir::new("store-duration-metrics")?;