feat: add ecc2 directory memory connectors

This commit is contained in:
Affaan Mustafa
2026-04-10 06:20:15 -07:00
parent d49ceacb7d
commit d3b680b6db
2 changed files with 247 additions and 6 deletions

View File

@@ -107,6 +107,7 @@ pub struct OrchestrationTemplateStepConfig {
#[serde(tag = "kind", rename_all = "snake_case")] #[serde(tag = "kind", rename_all = "snake_case")]
pub enum MemoryConnectorConfig { pub enum MemoryConnectorConfig {
JsonlFile(MemoryConnectorJsonlFileConfig), JsonlFile(MemoryConnectorJsonlFileConfig),
JsonlDirectory(MemoryConnectorJsonlDirectoryConfig),
} }
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
@@ -118,6 +119,16 @@ pub struct MemoryConnectorJsonlFileConfig {
pub default_observation_type: Option<String>, pub default_observation_type: Option<String>,
} }
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct MemoryConnectorJsonlDirectoryConfig {
pub path: PathBuf,
pub recurse: bool,
pub session_id: Option<String>,
pub default_entity_type: Option<String>,
pub default_observation_type: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ResolvedOrchestrationTemplate { pub struct ResolvedOrchestrationTemplate {
pub template_name: String, pub template_name: String,
@@ -1276,6 +1287,39 @@ default_observation_type = "external_note"
Some("external_note") Some("external_note")
); );
} }
_ => panic!("expected jsonl_file connector"),
}
}
#[test]
fn memory_jsonl_directory_connectors_deserialize_from_toml() {
let config: Config = toml::from_str(
r#"
[memory_connectors.hermes_dir]
kind = "jsonl_directory"
path = "/tmp/hermes-memory"
recurse = true
default_entity_type = "incident"
default_observation_type = "external_note"
"#,
)
.unwrap();
let connector = config
.memory_connectors
.get("hermes_dir")
.expect("connector should deserialize");
match connector {
crate::config::MemoryConnectorConfig::JsonlDirectory(settings) => {
assert_eq!(settings.path, PathBuf::from("/tmp/hermes-memory"));
assert!(settings.recurse);
assert_eq!(settings.default_entity_type.as_deref(), Some("incident"));
assert_eq!(
settings.default_observation_type.as_deref(),
Some("external_note")
);
}
_ => panic!("expected jsonl_directory connector"),
} }
} }

View File

@@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader}; use std::io::{BufRead, BufReader};
use std::path::PathBuf; use std::path::{Path, PathBuf};
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@@ -1591,6 +1591,9 @@ fn sync_memory_connector(
config::MemoryConnectorConfig::JsonlFile(settings) => { config::MemoryConnectorConfig::JsonlFile(settings) => {
sync_jsonl_memory_connector(db, name, settings, limit) sync_jsonl_memory_connector(db, name, settings, limit)
} }
config::MemoryConnectorConfig::JsonlDirectory(settings) => {
sync_jsonl_directory_memory_connector(db, name, settings, limit)
}
} }
} }
@@ -1604,15 +1607,91 @@ fn sync_jsonl_memory_connector(
anyhow::bail!("memory connector {name} has no path configured"); anyhow::bail!("memory connector {name} has no path configured");
} }
let file = File::open(&settings.path)
.with_context(|| format!("open memory connector file {}", settings.path.display()))?;
let reader = BufReader::new(file);
let default_session_id = settings let default_session_id = settings
.session_id .session_id
.as_deref() .as_deref()
.map(|value| resolve_session_id(db, value)) .map(|value| resolve_session_id(db, value))
.transpose()?; .transpose()?;
let file = File::open(&settings.path)
.with_context(|| format!("open memory connector file {}", settings.path.display()))?;
let reader = BufReader::new(file);
sync_jsonl_memory_reader(
db,
name,
reader,
default_session_id.as_deref(),
settings.default_entity_type.as_deref(),
settings.default_observation_type.as_deref(),
limit,
)
}
fn sync_jsonl_directory_memory_connector(
db: &session::store::StateStore,
name: &str,
settings: &config::MemoryConnectorJsonlDirectoryConfig,
limit: usize,
) -> Result<GraphConnectorSyncStats> {
if settings.path.as_os_str().is_empty() {
anyhow::bail!("memory connector {name} has no path configured");
}
if !settings.path.is_dir() {
anyhow::bail!(
"memory connector {name} path is not a directory: {}",
settings.path.display()
);
}
let paths = collect_jsonl_paths(&settings.path, settings.recurse)?;
let default_session_id = settings
.session_id
.as_deref()
.map(|value| resolve_session_id(db, value))
.transpose()?;
let mut stats = GraphConnectorSyncStats {
connector_name: name.to_string(),
..Default::default()
};
let mut remaining = limit;
for path in paths {
if remaining == 0 {
break;
}
let file = File::open(&path)
.with_context(|| format!("open memory connector file {}", path.display()))?;
let reader = BufReader::new(file);
let file_stats = sync_jsonl_memory_reader(
db,
name,
reader,
default_session_id.as_deref(),
settings.default_entity_type.as_deref(),
settings.default_observation_type.as_deref(),
remaining,
)?;
remaining = remaining.saturating_sub(file_stats.records_read);
stats.records_read += file_stats.records_read;
stats.entities_upserted += file_stats.entities_upserted;
stats.observations_added += file_stats.observations_added;
stats.skipped_records += file_stats.skipped_records;
}
Ok(stats)
}
fn sync_jsonl_memory_reader<R: BufRead>(
db: &session::store::StateStore,
name: &str,
reader: R,
default_session_id: Option<&str>,
default_entity_type: Option<&str>,
default_observation_type: Option<&str>,
limit: usize,
) -> Result<GraphConnectorSyncStats> {
let default_session_id = default_session_id.map(str::to_string);
let mut stats = GraphConnectorSyncStats { let mut stats = GraphConnectorSyncStats {
connector_name: name.to_string(), connector_name: name.to_string(),
..Default::default() ..Default::default()
@@ -1650,13 +1729,13 @@ fn sync_jsonl_memory_connector(
let entity_type = record let entity_type = record
.entity_type .entity_type
.as_deref() .as_deref()
.or(settings.default_entity_type.as_deref()) .or(default_entity_type)
.map(str::trim) .map(str::trim)
.filter(|value| !value.is_empty()); .filter(|value| !value.is_empty());
let observation_type = record let observation_type = record
.observation_type .observation_type
.as_deref() .as_deref()
.or(settings.default_observation_type.as_deref()) .or(default_observation_type)
.map(str::trim) .map(str::trim)
.filter(|value| !value.is_empty()); .filter(|value| !value.is_empty());
let entity_name = record.entity_name.trim(); let entity_name = record.entity_name.trim();
@@ -1703,6 +1782,36 @@ fn sync_jsonl_memory_connector(
Ok(stats) Ok(stats)
} }
fn collect_jsonl_paths(root: &Path, recurse: bool) -> Result<Vec<PathBuf>> {
let mut paths = Vec::new();
collect_jsonl_paths_inner(root, recurse, &mut paths)?;
paths.sort();
Ok(paths)
}
fn collect_jsonl_paths_inner(root: &Path, recurse: bool, paths: &mut Vec<PathBuf>) -> Result<()> {
for entry in std::fs::read_dir(root)
.with_context(|| format!("read memory connector directory {}", root.display()))?
{
let entry = entry?;
let path = entry.path();
if path.is_dir() {
if recurse {
collect_jsonl_paths_inner(&path, recurse, paths)?;
}
continue;
}
if path
.extension()
.and_then(|value| value.to_str())
.is_some_and(|value| value.eq_ignore_ascii_case("jsonl"))
{
paths.push(path);
}
}
Ok(())
}
fn build_message( fn build_message(
kind: MessageKindArg, kind: MessageKindArg,
text: String, text: String,
@@ -4947,6 +5056,94 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn sync_memory_connector_imports_jsonl_directory_observations() -> Result<()> {
let tempdir = TestDir::new("graph-connector-sync-dir")?;
let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?;
let now = chrono::Utc::now();
db.insert_session(&session::Session {
id: "session-1".to_string(),
task: "recovery incident".to_string(),
project: "ecc-tools".to_string(),
task_group: "incident".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: session::SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: session::SessionMetrics::default(),
})?;
let connector_dir = tempdir.path().join("hermes-memory");
fs::create_dir_all(connector_dir.join("nested"))?;
fs::write(
connector_dir.join("a.jsonl"),
[
serde_json::json!({
"entity_name": "Auth callback recovery",
"summary": "Customer wiped setup and got charged twice",
})
.to_string(),
serde_json::json!({
"entity_name": "Portal routing",
"summary": "Route existing installs to portal first",
})
.to_string(),
]
.join("\n"),
)?;
fs::write(
connector_dir.join("nested").join("b.jsonl"),
[
serde_json::json!({
"entity_name": "Billing UX note",
"summary": "Warn against buying twice after wiping setup",
})
.to_string(),
"{invalid json}".to_string(),
]
.join("\n"),
)?;
fs::write(connector_dir.join("ignore.txt"), "not imported")?;
let mut cfg = config::Config::default();
cfg.memory_connectors.insert(
"hermes_dir".to_string(),
config::MemoryConnectorConfig::JsonlDirectory(
config::MemoryConnectorJsonlDirectoryConfig {
path: connector_dir,
recurse: true,
session_id: Some("latest".to_string()),
default_entity_type: Some("incident".to_string()),
default_observation_type: Some("external_note".to_string()),
},
),
);
let stats = sync_memory_connector(&db, &cfg, "hermes_dir", 10)?;
assert_eq!(stats.records_read, 4);
assert_eq!(stats.entities_upserted, 3);
assert_eq!(stats.observations_added, 3);
assert_eq!(stats.skipped_records, 1);
let recalled = db.recall_context_entities(None, "charged twice portal billing", 10)?;
assert_eq!(recalled.len(), 3);
assert!(recalled
.iter()
.any(|entry| entry.entity.name == "Auth callback recovery"));
assert!(recalled
.iter()
.any(|entry| entry.entity.name == "Portal routing"));
assert!(recalled
.iter()
.any(|entry| entry.entity.name == "Billing UX note"));
Ok(())
}
#[test] #[test]
fn format_graph_sync_stats_human_renders_counts() { fn format_graph_sync_stats_human_renders_counts() {
let text = format_graph_sync_stats_human( let text = format_graph_sync_stats_human(