From 406722b5efa096ebe5cf17ccc35c4e3169dee871 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Fri, 10 Apr 2026 06:38:33 -0700 Subject: [PATCH] feat: add ecc2 markdown directory memory connector --- ecc2/src/config/mod.rs | 48 +++++++++ ecc2/src/main.rs | 218 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 255 insertions(+), 11 deletions(-) diff --git a/ecc2/src/config/mod.rs b/ecc2/src/config/mod.rs index 4ef420be..2f2309d0 100644 --- a/ecc2/src/config/mod.rs +++ b/ecc2/src/config/mod.rs @@ -109,6 +109,7 @@ pub enum MemoryConnectorConfig { JsonlFile(MemoryConnectorJsonlFileConfig), JsonlDirectory(MemoryConnectorJsonlDirectoryConfig), MarkdownFile(MemoryConnectorMarkdownFileConfig), + MarkdownDirectory(MemoryConnectorMarkdownDirectoryConfig), DotenvFile(MemoryConnectorDotenvFileConfig), } @@ -140,6 +141,16 @@ pub struct MemoryConnectorMarkdownFileConfig { pub default_observation_type: Option, } +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] +pub struct MemoryConnectorMarkdownDirectoryConfig { + pub path: PathBuf, + pub recurse: bool, + pub session_id: Option, + pub default_entity_type: Option, + pub default_observation_type: Option, +} + #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] #[serde(default)] pub struct MemoryConnectorDotenvFileConfig { @@ -1382,6 +1393,43 @@ default_observation_type = "external_note" } } + #[test] + fn memory_markdown_directory_connectors_deserialize_from_toml() { + let config: Config = toml::from_str( + r#" +[memory_connectors.workspace_notes] +kind = "markdown_directory" +path = "/tmp/hermes-memory" +recurse = true +session_id = "latest" +default_entity_type = "note_section" +default_observation_type = "external_note" +"#, + ) + .unwrap(); + + let connector = config + .memory_connectors + .get("workspace_notes") + .expect("connector should deserialize"); + match connector { + crate::config::MemoryConnectorConfig::MarkdownDirectory(settings) => { + assert_eq!(settings.path, PathBuf::from("/tmp/hermes-memory")); + assert!(settings.recurse); + assert_eq!(settings.session_id.as_deref(), Some("latest")); + assert_eq!( + settings.default_entity_type.as_deref(), + Some("note_section") + ); + assert_eq!( + settings.default_observation_type.as_deref(), + Some("external_note") + ); + } + _ => panic!("expected markdown_directory connector"), + } + } + #[test] fn memory_dotenv_file_connectors_deserialize_from_toml() { let config: Config = toml::from_str( diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index b85ba625..e0b4ea36 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -1649,6 +1649,9 @@ fn sync_memory_connector( config::MemoryConnectorConfig::MarkdownFile(settings) => { sync_markdown_memory_connector(db, name, settings, limit) } + config::MemoryConnectorConfig::MarkdownDirectory(settings) => { + sync_markdown_directory_memory_connector(db, name, settings, limit) + } config::MemoryConnectorConfig::DotenvFile(settings) => { sync_dotenv_memory_connector(db, name, settings, limit) } @@ -1817,14 +1820,89 @@ fn sync_markdown_memory_connector( anyhow::bail!("memory connector {name} has no path configured"); } - let body = std::fs::read_to_string(&settings.path) - .with_context(|| format!("read memory connector file {}", settings.path.display()))?; let default_session_id = settings .session_id .as_deref() .map(|value| resolve_session_id(db, value)) .transpose()?; - let sections = parse_markdown_memory_sections(&settings.path, &body, limit); + sync_markdown_memory_path( + db, + name, + "markdown_file", + &settings.path, + default_session_id.as_deref(), + settings.default_entity_type.as_deref(), + settings.default_observation_type.as_deref(), + limit, + ) +} + +fn sync_markdown_directory_memory_connector( + db: &session::store::StateStore, + name: &str, + settings: &config::MemoryConnectorMarkdownDirectoryConfig, + limit: usize, +) -> Result { + if settings.path.as_os_str().is_empty() { + anyhow::bail!("memory connector {name} has no path configured"); + } + if !settings.path.is_dir() { + anyhow::bail!( + "memory connector {name} path is not a directory: {}", + settings.path.display() + ); + } + + let paths = collect_markdown_paths(&settings.path, settings.recurse)?; + let default_session_id = settings + .session_id + .as_deref() + .map(|value| resolve_session_id(db, value)) + .transpose()?; + + let mut stats = GraphConnectorSyncStats { + connector_name: name.to_string(), + ..Default::default() + }; + + let mut remaining = limit; + for path in paths { + if remaining == 0 { + break; + } + let file_stats = sync_markdown_memory_path( + db, + name, + "markdown_directory", + &path, + default_session_id.as_deref(), + settings.default_entity_type.as_deref(), + settings.default_observation_type.as_deref(), + remaining, + )?; + remaining = remaining.saturating_sub(file_stats.records_read); + stats.records_read += file_stats.records_read; + stats.entities_upserted += file_stats.entities_upserted; + stats.observations_added += file_stats.observations_added; + stats.skipped_records += file_stats.skipped_records; + } + + Ok(stats) +} + +fn sync_markdown_memory_path( + db: &session::store::StateStore, + name: &str, + connector_kind: &str, + path: &Path, + default_session_id: Option<&str>, + default_entity_type: Option<&str>, + default_observation_type: Option<&str>, + limit: usize, +) -> Result { + let body = std::fs::read_to_string(path) + .with_context(|| format!("read memory connector file {}", path.display()))?; + let sections = parse_markdown_memory_sections(path, &body, limit); let mut stats = GraphConnectorSyncStats { connector_name: name.to_string(), ..Default::default() @@ -1836,21 +1914,18 @@ fn sync_markdown_memory_connector( if !section.body.is_empty() { details.insert("body".to_string(), section.body.clone()); } - details.insert( - "source_path".to_string(), - settings.path.display().to_string(), - ); + details.insert("source_path".to_string(), path.display().to_string()); details.insert("line".to_string(), section.line_number.to_string()); let mut metadata = BTreeMap::new(); - metadata.insert("connector".to_string(), "markdown_file".to_string()); + metadata.insert("connector".to_string(), connector_kind.to_string()); import_memory_connector_record( db, &mut stats, - default_session_id.as_deref(), - settings.default_entity_type.as_deref(), - settings.default_observation_type.as_deref(), + default_session_id, + default_entity_type, + default_observation_type, JsonlMemoryConnectorRecord { session_id: None, entity_type: None, @@ -1995,6 +2070,13 @@ fn collect_jsonl_paths(root: &Path, recurse: bool) -> Result> { Ok(paths) } +fn collect_markdown_paths(root: &Path, recurse: bool) -> Result> { + let mut paths = Vec::new(); + collect_markdown_paths_inner(root, recurse, &mut paths)?; + paths.sort(); + Ok(paths) +} + fn collect_jsonl_paths_inner(root: &Path, recurse: bool, paths: &mut Vec) -> Result<()> { for entry in std::fs::read_dir(root) .with_context(|| format!("read memory connector directory {}", root.display()))? @@ -2018,6 +2100,35 @@ fn collect_jsonl_paths_inner(root: &Path, recurse: bool, paths: &mut Vec, +) -> Result<()> { + for entry in std::fs::read_dir(root) + .with_context(|| format!("read memory connector directory {}", root.display()))? + { + let entry = entry?; + let path = entry.path(); + if path.is_dir() { + if recurse { + collect_markdown_paths_inner(&path, recurse, paths)?; + } + continue; + } + let is_markdown = path + .extension() + .and_then(|value| value.to_str()) + .is_some_and(|value| { + value.eq_ignore_ascii_case("md") || value.eq_ignore_ascii_case("markdown") + }); + if is_markdown { + paths.push(path); + } + } + Ok(()) +} + fn parse_dotenv_memory_entries( path: &Path, body: &str, @@ -5820,6 +5931,91 @@ Guide users to repair before reinstall so wiped setups do not buy twice. Ok(()) } + #[test] + fn sync_memory_connector_imports_markdown_directory_sections() -> Result<()> { + let tempdir = TestDir::new("graph-connector-sync-markdown-dir")?; + let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?; + let now = chrono::Utc::now(); + db.insert_session(&session::Session { + id: "session-1".to_string(), + task: "knowledge import".to_string(), + project: "everything-claude-code".to_string(), + task_group: "memory".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: session::SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: session::SessionMetrics::default(), + })?; + + let connector_dir = tempdir.path().join("workspace-notes"); + fs::create_dir_all(connector_dir.join("nested"))?; + fs::write( + connector_dir.join("incident.md"), + r#"# Billing incident +Customer wiped setup and got charged twice after reinstalling. + +## Portal routing +Route existing installs to portal first before presenting checkout again. +"#, + )?; + fs::write( + connector_dir.join("nested").join("docs.markdown"), + r#"# Docs fix +Guide users to repair before reinstall so wiped setups do not buy twice. +"#, + )?; + fs::write(connector_dir.join("ignore.txt"), "not imported")?; + + let mut cfg = config::Config::default(); + cfg.memory_connectors.insert( + "workspace_notes".to_string(), + config::MemoryConnectorConfig::MarkdownDirectory( + config::MemoryConnectorMarkdownDirectoryConfig { + path: connector_dir.clone(), + recurse: true, + session_id: Some("latest".to_string()), + default_entity_type: Some("note_section".to_string()), + default_observation_type: Some("external_note".to_string()), + }, + ), + ); + + let stats = sync_memory_connector(&db, &cfg, "workspace_notes", 10)?; + assert_eq!(stats.records_read, 3); + assert_eq!(stats.entities_upserted, 3); + assert_eq!(stats.observations_added, 3); + assert_eq!(stats.skipped_records, 0); + + let recalled = db.recall_context_entities(None, "charged twice portal docs", 10)?; + assert!(recalled + .iter() + .any(|entry| entry.entity.name == "Billing incident")); + assert!(recalled + .iter() + .any(|entry| entry.entity.name == "Portal routing")); + assert!(recalled.iter().any(|entry| entry.entity.name == "Docs fix")); + + let docs_fix = recalled + .iter() + .find(|entry| entry.entity.name == "Docs fix") + .expect("docs section should exist"); + let expected_anchor_path = format!( + "{}#docs-fix", + connector_dir.join("nested").join("docs.markdown").display() + ); + assert_eq!( + docs_fix.entity.path.as_deref(), + Some(expected_anchor_path.as_str()) + ); + + Ok(()) + } + #[test] fn sync_memory_connector_imports_dotenv_entries_safely() -> Result<()> { let tempdir = TestDir::new("graph-connector-sync-dotenv")?;