feat: add ecc2 memory connectors

This commit is contained in:
Affaan Mustafa
2026-04-10 06:14:13 -07:00
parent 8cc92c59a6
commit d49ceacb7d
4 changed files with 349 additions and 2 deletions

View File

@@ -6,10 +6,12 @@ mod session;
mod tui;
mod worktree;
use anyhow::Result;
use anyhow::{Context, Result};
use clap::Parser;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use tracing_subscriber::EnvFilter;
@@ -502,6 +504,17 @@ enum GraphCommands {
#[arg(long)]
json: bool,
},
/// Import external memory from a configured connector
ConnectorSync {
/// Connector name from ecc2.toml
name: String,
/// Maximum non-empty records to process
#[arg(long, default_value_t = 256)]
limit: usize,
/// Emit machine-readable JSON instead of the human summary
#[arg(long)]
json: bool,
},
/// Recall relevant context graph entities for a query
Recall {
/// Filter by source session ID or alias
@@ -552,6 +565,29 @@ enum MessageKindArg {
Conflict,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
struct GraphConnectorSyncStats {
connector_name: String,
records_read: usize,
entities_upserted: usize,
observations_added: usize,
skipped_records: usize,
}
#[derive(Debug, Clone, Default, Deserialize)]
#[serde(default)]
struct JsonlMemoryConnectorRecord {
session_id: Option<String>,
entity_type: Option<String>,
entity_name: String,
path: Option<String>,
entity_summary: Option<String>,
metadata: BTreeMap<String, String>,
observation_type: Option<String>,
summary: String,
details: BTreeMap<String, String>,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
@@ -1352,6 +1388,14 @@ async fn main() -> Result<()> {
);
}
}
GraphCommands::ConnectorSync { name, limit, json } => {
let stats = sync_memory_connector(&db, &cfg, &name, limit)?;
if json {
println!("{}", serde_json::to_string_pretty(&stats)?);
} else {
println!("{}", format_graph_connector_sync_stats_human(&stats));
}
}
GraphCommands::Recall {
session_id,
query,
@@ -1532,6 +1576,133 @@ fn sync_runtime_session_metrics(
Ok(())
}
fn sync_memory_connector(
db: &session::store::StateStore,
cfg: &config::Config,
name: &str,
limit: usize,
) -> Result<GraphConnectorSyncStats> {
let connector = cfg
.memory_connectors
.get(name)
.ok_or_else(|| anyhow::anyhow!("Unknown memory connector: {name}"))?;
match connector {
config::MemoryConnectorConfig::JsonlFile(settings) => {
sync_jsonl_memory_connector(db, name, settings, limit)
}
}
}
fn sync_jsonl_memory_connector(
db: &session::store::StateStore,
name: &str,
settings: &config::MemoryConnectorJsonlFileConfig,
limit: usize,
) -> Result<GraphConnectorSyncStats> {
if settings.path.as_os_str().is_empty() {
anyhow::bail!("memory connector {name} has no path configured");
}
let default_session_id = settings
.session_id
.as_deref()
.map(|value| resolve_session_id(db, value))
.transpose()?;
let file = File::open(&settings.path)
.with_context(|| format!("open memory connector file {}", settings.path.display()))?;
let reader = BufReader::new(file);
let mut stats = GraphConnectorSyncStats {
connector_name: name.to_string(),
..Default::default()
};
for line in reader.lines() {
let line = line?;
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if stats.records_read >= limit {
break;
}
stats.records_read += 1;
let record: JsonlMemoryConnectorRecord = match serde_json::from_str(trimmed) {
Ok(record) => record,
Err(_) => {
stats.skipped_records += 1;
continue;
}
};
let session_id = match record.session_id.as_deref() {
Some(value) => match resolve_session_id(db, value) {
Ok(resolved) => Some(resolved),
Err(_) => {
stats.skipped_records += 1;
continue;
}
},
None => default_session_id.clone(),
};
let entity_type = record
.entity_type
.as_deref()
.or(settings.default_entity_type.as_deref())
.map(str::trim)
.filter(|value| !value.is_empty());
let observation_type = record
.observation_type
.as_deref()
.or(settings.default_observation_type.as_deref())
.map(str::trim)
.filter(|value| !value.is_empty());
let entity_name = record.entity_name.trim();
let summary = record.summary.trim();
let Some(entity_type) = entity_type else {
stats.skipped_records += 1;
continue;
};
let Some(observation_type) = observation_type else {
stats.skipped_records += 1;
continue;
};
if entity_name.is_empty() || summary.is_empty() {
stats.skipped_records += 1;
continue;
}
let entity_summary = record
.entity_summary
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or(summary);
let entity = db.upsert_context_entity(
session_id.as_deref(),
entity_type,
entity_name,
record.path.as_deref(),
entity_summary,
&record.metadata,
)?;
db.add_context_observation(
session_id.as_deref(),
entity.id,
observation_type,
summary,
&record.details,
)?;
stats.entities_upserted += 1;
stats.observations_added += 1;
}
Ok(stats)
}
fn build_message(
kind: MessageKindArg,
text: String,
@@ -2480,6 +2651,17 @@ fn format_graph_compaction_stats_human(
.join("\n")
}
fn format_graph_connector_sync_stats_human(stats: &GraphConnectorSyncStats) -> String {
[
format!("Memory connector sync complete: {}", stats.connector_name),
format!("- records read {}", stats.records_read),
format!("- entities upserted {}", stats.entities_upserted),
format!("- observations added {}", stats.observations_added),
format!("- skipped records {}", stats.skipped_records),
]
.join("\n")
}
fn format_graph_entity_detail_human(detail: &session::ContextGraphEntityDetail) -> String {
let mut lines = vec![format_graph_entity_human(&detail.entity)];
lines.push(String::new());
@@ -4488,6 +4670,31 @@ mod tests {
}
}
#[test]
fn cli_parses_graph_connector_sync_command() {
let cli = Cli::try_parse_from([
"ecc",
"graph",
"connector-sync",
"hermes_notes",
"--limit",
"32",
"--json",
])
.expect("graph connector-sync should parse");
match cli.command {
Some(Commands::Graph {
command: GraphCommands::ConnectorSync { name, limit, json },
}) => {
assert_eq!(name, "hermes_notes");
assert_eq!(limit, 32);
assert!(json);
}
_ => panic!("expected graph connector-sync subcommand"),
}
}
#[test]
fn format_decisions_human_renders_details() {
let text = format_decisions_human(
@@ -4650,6 +4857,96 @@ mod tests {
assert!(text.contains("- observations retained 9"));
}
#[test]
fn format_graph_connector_sync_stats_human_renders_counts() {
let text = format_graph_connector_sync_stats_human(&GraphConnectorSyncStats {
connector_name: "hermes_notes".to_string(),
records_read: 4,
entities_upserted: 3,
observations_added: 3,
skipped_records: 1,
});
assert!(text.contains("Memory connector sync complete: hermes_notes"));
assert!(text.contains("- records read 4"));
assert!(text.contains("- entities upserted 3"));
assert!(text.contains("- observations added 3"));
assert!(text.contains("- skipped records 1"));
}
#[test]
fn sync_memory_connector_imports_jsonl_observations() -> Result<()> {
let tempdir = TestDir::new("graph-connector-sync")?;
let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?;
let now = chrono::Utc::now();
db.insert_session(&session::Session {
id: "session-1".to_string(),
task: "recovery incident".to_string(),
project: "ecc-tools".to_string(),
task_group: "incident".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: session::SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: session::SessionMetrics::default(),
})?;
let connector_path = tempdir.path().join("hermes-memory.jsonl");
std::fs::write(
&connector_path,
[
serde_json::json!({
"entity_name": "Auth callback recovery",
"summary": "Customer wiped setup and got charged twice",
"details": {"customer": "viktor"}
})
.to_string(),
serde_json::json!({
"session_id": "latest",
"entity_type": "file",
"entity_name": "callback.ts",
"path": "src/routes/auth/callback.ts",
"observation_type": "incident_note",
"summary": "Recovery flow needs portal-first routing"
})
.to_string(),
]
.join("\n"),
)?;
let mut cfg = config::Config::default();
cfg.memory_connectors.insert(
"hermes_notes".to_string(),
config::MemoryConnectorConfig::JsonlFile(config::MemoryConnectorJsonlFileConfig {
path: connector_path,
session_id: Some("latest".to_string()),
default_entity_type: Some("incident".to_string()),
default_observation_type: Some("external_note".to_string()),
}),
);
let stats = sync_memory_connector(&db, &cfg, "hermes_notes", 10)?;
assert_eq!(stats.records_read, 2);
assert_eq!(stats.entities_upserted, 2);
assert_eq!(stats.observations_added, 2);
assert_eq!(stats.skipped_records, 0);
let recalled = db.recall_context_entities(None, "charged twice routing", 5)?;
assert_eq!(recalled.len(), 2);
assert!(recalled
.iter()
.any(|entry| entry.entity.name == "Auth callback recovery"));
assert!(recalled
.iter()
.any(|entry| entry.entity.name == "callback.ts"));
Ok(())
}
#[test]
fn format_graph_sync_stats_human_renders_counts() {
let text = format_graph_sync_stats_human(