mirror of
https://github.com/affaan-m/everything-claude-code.git
synced 2026-04-11 20:13:30 +08:00
feat: add ecc2 graph compaction
This commit is contained in:
@@ -13,17 +13,19 @@ use crate::observability::{ToolCallEvent, ToolLogEntry, ToolLogPage};
|
||||
|
||||
use super::output::{OutputLine, OutputStream, OUTPUT_BUFFER_LIMIT};
|
||||
use super::{
|
||||
default_project_label, default_task_group_label, normalize_group_label, ContextGraphEntity,
|
||||
ContextGraphEntityDetail, ContextGraphObservation, ContextGraphRecallEntry,
|
||||
ContextGraphRelation, ContextGraphSyncStats, DecisionLogEntry, FileActivityAction,
|
||||
FileActivityEntry, Session, SessionAgentProfile, SessionMessage, SessionMetrics, SessionState,
|
||||
WorktreeInfo,
|
||||
default_project_label, default_task_group_label, normalize_group_label,
|
||||
ContextGraphCompactionStats, ContextGraphEntity, ContextGraphEntityDetail,
|
||||
ContextGraphObservation, ContextGraphRecallEntry, ContextGraphRelation, ContextGraphSyncStats,
|
||||
DecisionLogEntry, FileActivityAction, FileActivityEntry, Session, SessionAgentProfile,
|
||||
SessionMessage, SessionMetrics, SessionState, WorktreeInfo,
|
||||
};
|
||||
|
||||
pub struct StateStore {
|
||||
conn: Connection,
|
||||
}
|
||||
|
||||
const DEFAULT_CONTEXT_GRAPH_OBSERVATION_RETENTION: usize = 12;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PendingWorktreeRequest {
|
||||
pub session_id: String,
|
||||
@@ -2235,6 +2237,11 @@ impl StateStore {
|
||||
],
|
||||
)?;
|
||||
let observation_id = self.conn.last_insert_rowid();
|
||||
self.compact_context_graph_observations(
|
||||
None,
|
||||
Some(entity_id),
|
||||
DEFAULT_CONTEXT_GRAPH_OBSERVATION_RETENTION,
|
||||
)?;
|
||||
self.conn
|
||||
.query_row(
|
||||
"SELECT o.id, o.session_id, o.entity_id, e.entity_type, e.name,
|
||||
@@ -2248,6 +2255,14 @@ impl StateStore {
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
pub fn compact_context_graph(
|
||||
&self,
|
||||
session_id: Option<&str>,
|
||||
keep_observations_per_entity: usize,
|
||||
) -> Result<ContextGraphCompactionStats> {
|
||||
self.compact_context_graph_observations(session_id, None, keep_observations_per_entity)
|
||||
}
|
||||
|
||||
pub fn add_session_observation(
|
||||
&self,
|
||||
session_id: &str,
|
||||
@@ -2289,6 +2304,94 @@ impl StateStore {
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
fn compact_context_graph_observations(
|
||||
&self,
|
||||
session_id: Option<&str>,
|
||||
entity_id: Option<i64>,
|
||||
keep_observations_per_entity: usize,
|
||||
) -> Result<ContextGraphCompactionStats> {
|
||||
let entities_scanned = self.conn.query_row(
|
||||
"SELECT COUNT(DISTINCT o.entity_id)
|
||||
FROM context_graph_observations o
|
||||
JOIN context_graph_entities e ON e.id = o.entity_id
|
||||
WHERE (?1 IS NULL OR e.session_id = ?1)
|
||||
AND (?2 IS NULL OR o.entity_id = ?2)",
|
||||
rusqlite::params![session_id, entity_id],
|
||||
|row| row.get::<_, i64>(0),
|
||||
)? as usize;
|
||||
|
||||
let duplicate_observations_deleted = self.conn.execute(
|
||||
"DELETE FROM context_graph_observations
|
||||
WHERE id IN (
|
||||
SELECT id
|
||||
FROM (
|
||||
SELECT o.id,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY o.entity_id, o.observation_type, o.summary
|
||||
ORDER BY o.created_at DESC, o.id DESC
|
||||
) AS rn
|
||||
FROM context_graph_observations o
|
||||
JOIN context_graph_entities e ON e.id = o.entity_id
|
||||
WHERE (?1 IS NULL OR e.session_id = ?1)
|
||||
AND (?2 IS NULL OR o.entity_id = ?2)
|
||||
) ranked
|
||||
WHERE ranked.rn > 1
|
||||
)",
|
||||
rusqlite::params![session_id, entity_id],
|
||||
)?;
|
||||
|
||||
let overflow_observations_deleted = if keep_observations_per_entity == 0 {
|
||||
self.conn.execute(
|
||||
"DELETE FROM context_graph_observations
|
||||
WHERE id IN (
|
||||
SELECT o.id
|
||||
FROM context_graph_observations o
|
||||
JOIN context_graph_entities e ON e.id = o.entity_id
|
||||
WHERE (?1 IS NULL OR e.session_id = ?1)
|
||||
AND (?2 IS NULL OR o.entity_id = ?2)
|
||||
)",
|
||||
rusqlite::params![session_id, entity_id],
|
||||
)?
|
||||
} else {
|
||||
self.conn.execute(
|
||||
"DELETE FROM context_graph_observations
|
||||
WHERE id IN (
|
||||
SELECT id
|
||||
FROM (
|
||||
SELECT o.id,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY o.entity_id
|
||||
ORDER BY o.created_at DESC, o.id DESC
|
||||
) AS rn
|
||||
FROM context_graph_observations o
|
||||
JOIN context_graph_entities e ON e.id = o.entity_id
|
||||
WHERE (?1 IS NULL OR e.session_id = ?1)
|
||||
AND (?2 IS NULL OR o.entity_id = ?2)
|
||||
) ranked
|
||||
WHERE ranked.rn > ?3
|
||||
)",
|
||||
rusqlite::params![session_id, entity_id, keep_observations_per_entity as i64],
|
||||
)?
|
||||
};
|
||||
|
||||
let observations_retained = self.conn.query_row(
|
||||
"SELECT COUNT(*)
|
||||
FROM context_graph_observations o
|
||||
JOIN context_graph_entities e ON e.id = o.entity_id
|
||||
WHERE (?1 IS NULL OR e.session_id = ?1)
|
||||
AND (?2 IS NULL OR o.entity_id = ?2)",
|
||||
rusqlite::params![session_id, entity_id],
|
||||
|row| row.get::<_, i64>(0),
|
||||
)? as usize;
|
||||
|
||||
Ok(ContextGraphCompactionStats {
|
||||
entities_scanned,
|
||||
duplicate_observations_deleted,
|
||||
overflow_observations_deleted,
|
||||
observations_retained,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn upsert_context_relation(
|
||||
&self,
|
||||
session_id: Option<&str>,
|
||||
@@ -4200,6 +4303,130 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compact_context_graph_prunes_duplicate_and_overflow_observations() -> Result<()> {
|
||||
let tempdir = TestDir::new("store-context-compaction")?;
|
||||
let db = StateStore::open(&tempdir.path().join("state.db"))?;
|
||||
let now = Utc::now();
|
||||
|
||||
db.insert_session(&Session {
|
||||
id: "session-1".to_string(),
|
||||
task: "deep memory".to_string(),
|
||||
project: "workspace".to_string(),
|
||||
task_group: "knowledge".to_string(),
|
||||
agent_type: "claude".to_string(),
|
||||
working_dir: PathBuf::from("/tmp"),
|
||||
state: SessionState::Running,
|
||||
pid: None,
|
||||
worktree: None,
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
last_heartbeat_at: now,
|
||||
metrics: SessionMetrics::default(),
|
||||
})?;
|
||||
|
||||
let entity = db.upsert_context_entity(
|
||||
Some("session-1"),
|
||||
"decision",
|
||||
"Prefer recovery-first routing",
|
||||
None,
|
||||
"Recovered installs should go through the portal first",
|
||||
&BTreeMap::new(),
|
||||
)?;
|
||||
|
||||
for summary in [
|
||||
"old duplicate",
|
||||
"keep me",
|
||||
"old duplicate",
|
||||
"recent",
|
||||
"latest",
|
||||
] {
|
||||
db.conn.execute(
|
||||
"INSERT INTO context_graph_observations (
|
||||
session_id, entity_id, observation_type, summary, details_json, created_at
|
||||
) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
|
||||
rusqlite::params![
|
||||
"session-1",
|
||||
entity.id,
|
||||
"note",
|
||||
summary,
|
||||
"{}",
|
||||
chrono::Utc::now().to_rfc3339(),
|
||||
],
|
||||
)?;
|
||||
std::thread::sleep(std::time::Duration::from_millis(2));
|
||||
}
|
||||
|
||||
let stats = db.compact_context_graph(None, 3)?;
|
||||
assert_eq!(stats.entities_scanned, 1);
|
||||
assert_eq!(stats.duplicate_observations_deleted, 1);
|
||||
assert_eq!(stats.overflow_observations_deleted, 1);
|
||||
assert_eq!(stats.observations_retained, 3);
|
||||
|
||||
let observations = db.list_context_observations(Some(entity.id), 10)?;
|
||||
let summaries = observations
|
||||
.iter()
|
||||
.map(|observation| observation.summary.as_str())
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(summaries, vec!["latest", "recent", "old duplicate"]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_context_observation_auto_compacts_entity_history() -> Result<()> {
|
||||
let tempdir = TestDir::new("store-context-auto-compaction")?;
|
||||
let db = StateStore::open(&tempdir.path().join("state.db"))?;
|
||||
let now = Utc::now();
|
||||
|
||||
db.insert_session(&Session {
|
||||
id: "session-1".to_string(),
|
||||
task: "deep memory".to_string(),
|
||||
project: "workspace".to_string(),
|
||||
task_group: "knowledge".to_string(),
|
||||
agent_type: "claude".to_string(),
|
||||
working_dir: PathBuf::from("/tmp"),
|
||||
state: SessionState::Running,
|
||||
pid: None,
|
||||
worktree: None,
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
last_heartbeat_at: now,
|
||||
metrics: SessionMetrics::default(),
|
||||
})?;
|
||||
|
||||
let entity = db.upsert_context_entity(
|
||||
Some("session-1"),
|
||||
"session",
|
||||
"session-1",
|
||||
None,
|
||||
"Deep-memory worker",
|
||||
&BTreeMap::new(),
|
||||
)?;
|
||||
|
||||
for index in 0..(DEFAULT_CONTEXT_GRAPH_OBSERVATION_RETENTION + 2) {
|
||||
let summary = format!("completion summary {}", index);
|
||||
db.add_context_observation(
|
||||
Some("session-1"),
|
||||
entity.id,
|
||||
"completion_summary",
|
||||
&summary,
|
||||
&BTreeMap::new(),
|
||||
)?;
|
||||
std::thread::sleep(std::time::Duration::from_millis(2));
|
||||
}
|
||||
|
||||
let observations = db.list_context_observations(Some(entity.id), 20)?;
|
||||
assert_eq!(
|
||||
observations.len(),
|
||||
DEFAULT_CONTEXT_GRAPH_OBSERVATION_RETENTION
|
||||
);
|
||||
assert_eq!(observations[0].summary, "completion summary 13");
|
||||
assert_eq!(observations.last().unwrap().summary, "completion summary 2");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn recall_context_entities_ranks_matching_entities() -> Result<()> {
|
||||
let tempdir = TestDir::new("store-context-recall")?;
|
||||
|
||||
Reference in New Issue
Block a user