From d3b680b6db0557f10080ee3c3c270fd5a67a2e00 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Fri, 10 Apr 2026 06:20:15 -0700 Subject: [PATCH] feat: add ecc2 directory memory connectors --- ecc2/src/config/mod.rs | 44 +++++++++ ecc2/src/main.rs | 209 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 247 insertions(+), 6 deletions(-) diff --git a/ecc2/src/config/mod.rs b/ecc2/src/config/mod.rs index 702e66af..0443384f 100644 --- a/ecc2/src/config/mod.rs +++ b/ecc2/src/config/mod.rs @@ -107,6 +107,7 @@ pub struct OrchestrationTemplateStepConfig { #[serde(tag = "kind", rename_all = "snake_case")] pub enum MemoryConnectorConfig { JsonlFile(MemoryConnectorJsonlFileConfig), + JsonlDirectory(MemoryConnectorJsonlDirectoryConfig), } #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] @@ -118,6 +119,16 @@ pub struct MemoryConnectorJsonlFileConfig { pub default_observation_type: Option, } +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] +pub struct MemoryConnectorJsonlDirectoryConfig { + pub path: PathBuf, + pub recurse: bool, + pub session_id: Option, + pub default_entity_type: Option, + pub default_observation_type: Option, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ResolvedOrchestrationTemplate { pub template_name: String, @@ -1276,6 +1287,39 @@ default_observation_type = "external_note" Some("external_note") ); } + _ => panic!("expected jsonl_file connector"), + } + } + + #[test] + fn memory_jsonl_directory_connectors_deserialize_from_toml() { + let config: Config = toml::from_str( + r#" +[memory_connectors.hermes_dir] +kind = "jsonl_directory" +path = "/tmp/hermes-memory" +recurse = true +default_entity_type = "incident" +default_observation_type = "external_note" +"#, + ) + .unwrap(); + + let connector = config + .memory_connectors + .get("hermes_dir") + .expect("connector should deserialize"); + match connector { + crate::config::MemoryConnectorConfig::JsonlDirectory(settings) => { + assert_eq!(settings.path, PathBuf::from("/tmp/hermes-memory")); + assert!(settings.recurse); + assert_eq!(settings.default_entity_type.as_deref(), Some("incident")); + assert_eq!( + settings.default_observation_type.as_deref(), + Some("external_note") + ); + } + _ => panic!("expected jsonl_directory connector"), } } diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 993f92dd..8ba123ab 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::fs::File; use std::io::{BufRead, BufReader}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use tracing_subscriber::EnvFilter; #[derive(Parser, Debug)] @@ -1591,6 +1591,9 @@ fn sync_memory_connector( config::MemoryConnectorConfig::JsonlFile(settings) => { sync_jsonl_memory_connector(db, name, settings, limit) } + config::MemoryConnectorConfig::JsonlDirectory(settings) => { + sync_jsonl_directory_memory_connector(db, name, settings, limit) + } } } @@ -1604,15 +1607,91 @@ fn sync_jsonl_memory_connector( anyhow::bail!("memory connector {name} has no path configured"); } + let file = File::open(&settings.path) + .with_context(|| format!("open memory connector file {}", settings.path.display()))?; + let reader = BufReader::new(file); let default_session_id = settings .session_id .as_deref() .map(|value| resolve_session_id(db, value)) .transpose()?; - let file = File::open(&settings.path) - .with_context(|| format!("open memory connector file {}", settings.path.display()))?; - let reader = BufReader::new(file); + sync_jsonl_memory_reader( + db, + name, + reader, + default_session_id.as_deref(), + settings.default_entity_type.as_deref(), + settings.default_observation_type.as_deref(), + limit, + ) +} + +fn sync_jsonl_directory_memory_connector( + db: &session::store::StateStore, + name: &str, + settings: &config::MemoryConnectorJsonlDirectoryConfig, + limit: usize, +) -> Result { + if settings.path.as_os_str().is_empty() { + anyhow::bail!("memory connector {name} has no path configured"); + } + if !settings.path.is_dir() { + anyhow::bail!( + "memory connector {name} path is not a directory: {}", + settings.path.display() + ); + } + + let paths = collect_jsonl_paths(&settings.path, settings.recurse)?; + let default_session_id = settings + .session_id + .as_deref() + .map(|value| resolve_session_id(db, value)) + .transpose()?; + + let mut stats = GraphConnectorSyncStats { + connector_name: name.to_string(), + ..Default::default() + }; + + let mut remaining = limit; + for path in paths { + if remaining == 0 { + break; + } + let file = File::open(&path) + .with_context(|| format!("open memory connector file {}", path.display()))?; + let reader = BufReader::new(file); + let file_stats = sync_jsonl_memory_reader( + db, + name, + reader, + default_session_id.as_deref(), + settings.default_entity_type.as_deref(), + settings.default_observation_type.as_deref(), + remaining, + )?; + remaining = remaining.saturating_sub(file_stats.records_read); + stats.records_read += file_stats.records_read; + stats.entities_upserted += file_stats.entities_upserted; + stats.observations_added += file_stats.observations_added; + stats.skipped_records += file_stats.skipped_records; + } + + Ok(stats) +} + +fn sync_jsonl_memory_reader( + db: &session::store::StateStore, + name: &str, + reader: R, + default_session_id: Option<&str>, + default_entity_type: Option<&str>, + default_observation_type: Option<&str>, + limit: usize, +) -> Result { + let default_session_id = default_session_id.map(str::to_string); let mut stats = GraphConnectorSyncStats { connector_name: name.to_string(), ..Default::default() @@ -1650,13 +1729,13 @@ fn sync_jsonl_memory_connector( let entity_type = record .entity_type .as_deref() - .or(settings.default_entity_type.as_deref()) + .or(default_entity_type) .map(str::trim) .filter(|value| !value.is_empty()); let observation_type = record .observation_type .as_deref() - .or(settings.default_observation_type.as_deref()) + .or(default_observation_type) .map(str::trim) .filter(|value| !value.is_empty()); let entity_name = record.entity_name.trim(); @@ -1703,6 +1782,36 @@ fn sync_jsonl_memory_connector( Ok(stats) } +fn collect_jsonl_paths(root: &Path, recurse: bool) -> Result> { + let mut paths = Vec::new(); + collect_jsonl_paths_inner(root, recurse, &mut paths)?; + paths.sort(); + Ok(paths) +} + +fn collect_jsonl_paths_inner(root: &Path, recurse: bool, paths: &mut Vec) -> Result<()> { + for entry in std::fs::read_dir(root) + .with_context(|| format!("read memory connector directory {}", root.display()))? + { + let entry = entry?; + let path = entry.path(); + if path.is_dir() { + if recurse { + collect_jsonl_paths_inner(&path, recurse, paths)?; + } + continue; + } + if path + .extension() + .and_then(|value| value.to_str()) + .is_some_and(|value| value.eq_ignore_ascii_case("jsonl")) + { + paths.push(path); + } + } + Ok(()) +} + fn build_message( kind: MessageKindArg, text: String, @@ -4947,6 +5056,94 @@ mod tests { Ok(()) } + #[test] + fn sync_memory_connector_imports_jsonl_directory_observations() -> Result<()> { + let tempdir = TestDir::new("graph-connector-sync-dir")?; + let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?; + let now = chrono::Utc::now(); + db.insert_session(&session::Session { + id: "session-1".to_string(), + task: "recovery incident".to_string(), + project: "ecc-tools".to_string(), + task_group: "incident".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: session::SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: session::SessionMetrics::default(), + })?; + + let connector_dir = tempdir.path().join("hermes-memory"); + fs::create_dir_all(connector_dir.join("nested"))?; + fs::write( + connector_dir.join("a.jsonl"), + [ + serde_json::json!({ + "entity_name": "Auth callback recovery", + "summary": "Customer wiped setup and got charged twice", + }) + .to_string(), + serde_json::json!({ + "entity_name": "Portal routing", + "summary": "Route existing installs to portal first", + }) + .to_string(), + ] + .join("\n"), + )?; + fs::write( + connector_dir.join("nested").join("b.jsonl"), + [ + serde_json::json!({ + "entity_name": "Billing UX note", + "summary": "Warn against buying twice after wiping setup", + }) + .to_string(), + "{invalid json}".to_string(), + ] + .join("\n"), + )?; + fs::write(connector_dir.join("ignore.txt"), "not imported")?; + + let mut cfg = config::Config::default(); + cfg.memory_connectors.insert( + "hermes_dir".to_string(), + config::MemoryConnectorConfig::JsonlDirectory( + config::MemoryConnectorJsonlDirectoryConfig { + path: connector_dir, + recurse: true, + session_id: Some("latest".to_string()), + default_entity_type: Some("incident".to_string()), + default_observation_type: Some("external_note".to_string()), + }, + ), + ); + + let stats = sync_memory_connector(&db, &cfg, "hermes_dir", 10)?; + assert_eq!(stats.records_read, 4); + assert_eq!(stats.entities_upserted, 3); + assert_eq!(stats.observations_added, 3); + assert_eq!(stats.skipped_records, 1); + + let recalled = db.recall_context_entities(None, "charged twice portal billing", 10)?; + assert_eq!(recalled.len(), 3); + assert!(recalled + .iter() + .any(|entry| entry.entity.name == "Auth callback recovery")); + assert!(recalled + .iter() + .any(|entry| entry.entity.name == "Portal routing")); + assert!(recalled + .iter() + .any(|entry| entry.entity.name == "Billing UX note")); + + Ok(()) + } + #[test] fn format_graph_sync_stats_human_renders_counts() { let text = format_graph_sync_stats_human(