feat: add ecc2 markdown directory memory connector

This commit is contained in:
Affaan Mustafa
2026-04-10 06:38:33 -07:00
parent 5258a75382
commit 406722b5ef
2 changed files with 255 additions and 11 deletions

View File

@@ -109,6 +109,7 @@ pub enum MemoryConnectorConfig {
JsonlFile(MemoryConnectorJsonlFileConfig),
JsonlDirectory(MemoryConnectorJsonlDirectoryConfig),
MarkdownFile(MemoryConnectorMarkdownFileConfig),
MarkdownDirectory(MemoryConnectorMarkdownDirectoryConfig),
DotenvFile(MemoryConnectorDotenvFileConfig),
}
@@ -140,6 +141,16 @@ pub struct MemoryConnectorMarkdownFileConfig {
pub default_observation_type: Option<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct MemoryConnectorMarkdownDirectoryConfig {
pub path: PathBuf,
pub recurse: bool,
pub session_id: Option<String>,
pub default_entity_type: Option<String>,
pub default_observation_type: Option<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct MemoryConnectorDotenvFileConfig {
@@ -1382,6 +1393,43 @@ default_observation_type = "external_note"
}
}
#[test]
fn memory_markdown_directory_connectors_deserialize_from_toml() {
let config: Config = toml::from_str(
r#"
[memory_connectors.workspace_notes]
kind = "markdown_directory"
path = "/tmp/hermes-memory"
recurse = true
session_id = "latest"
default_entity_type = "note_section"
default_observation_type = "external_note"
"#,
)
.unwrap();
let connector = config
.memory_connectors
.get("workspace_notes")
.expect("connector should deserialize");
match connector {
crate::config::MemoryConnectorConfig::MarkdownDirectory(settings) => {
assert_eq!(settings.path, PathBuf::from("/tmp/hermes-memory"));
assert!(settings.recurse);
assert_eq!(settings.session_id.as_deref(), Some("latest"));
assert_eq!(
settings.default_entity_type.as_deref(),
Some("note_section")
);
assert_eq!(
settings.default_observation_type.as_deref(),
Some("external_note")
);
}
_ => panic!("expected markdown_directory connector"),
}
}
#[test]
fn memory_dotenv_file_connectors_deserialize_from_toml() {
let config: Config = toml::from_str(

View File

@@ -1649,6 +1649,9 @@ fn sync_memory_connector(
config::MemoryConnectorConfig::MarkdownFile(settings) => {
sync_markdown_memory_connector(db, name, settings, limit)
}
config::MemoryConnectorConfig::MarkdownDirectory(settings) => {
sync_markdown_directory_memory_connector(db, name, settings, limit)
}
config::MemoryConnectorConfig::DotenvFile(settings) => {
sync_dotenv_memory_connector(db, name, settings, limit)
}
@@ -1817,14 +1820,89 @@ fn sync_markdown_memory_connector(
anyhow::bail!("memory connector {name} has no path configured");
}
let body = std::fs::read_to_string(&settings.path)
.with_context(|| format!("read memory connector file {}", settings.path.display()))?;
let default_session_id = settings
.session_id
.as_deref()
.map(|value| resolve_session_id(db, value))
.transpose()?;
let sections = parse_markdown_memory_sections(&settings.path, &body, limit);
sync_markdown_memory_path(
db,
name,
"markdown_file",
&settings.path,
default_session_id.as_deref(),
settings.default_entity_type.as_deref(),
settings.default_observation_type.as_deref(),
limit,
)
}
fn sync_markdown_directory_memory_connector(
db: &session::store::StateStore,
name: &str,
settings: &config::MemoryConnectorMarkdownDirectoryConfig,
limit: usize,
) -> Result<GraphConnectorSyncStats> {
if settings.path.as_os_str().is_empty() {
anyhow::bail!("memory connector {name} has no path configured");
}
if !settings.path.is_dir() {
anyhow::bail!(
"memory connector {name} path is not a directory: {}",
settings.path.display()
);
}
let paths = collect_markdown_paths(&settings.path, settings.recurse)?;
let default_session_id = settings
.session_id
.as_deref()
.map(|value| resolve_session_id(db, value))
.transpose()?;
let mut stats = GraphConnectorSyncStats {
connector_name: name.to_string(),
..Default::default()
};
let mut remaining = limit;
for path in paths {
if remaining == 0 {
break;
}
let file_stats = sync_markdown_memory_path(
db,
name,
"markdown_directory",
&path,
default_session_id.as_deref(),
settings.default_entity_type.as_deref(),
settings.default_observation_type.as_deref(),
remaining,
)?;
remaining = remaining.saturating_sub(file_stats.records_read);
stats.records_read += file_stats.records_read;
stats.entities_upserted += file_stats.entities_upserted;
stats.observations_added += file_stats.observations_added;
stats.skipped_records += file_stats.skipped_records;
}
Ok(stats)
}
fn sync_markdown_memory_path(
db: &session::store::StateStore,
name: &str,
connector_kind: &str,
path: &Path,
default_session_id: Option<&str>,
default_entity_type: Option<&str>,
default_observation_type: Option<&str>,
limit: usize,
) -> Result<GraphConnectorSyncStats> {
let body = std::fs::read_to_string(path)
.with_context(|| format!("read memory connector file {}", path.display()))?;
let sections = parse_markdown_memory_sections(path, &body, limit);
let mut stats = GraphConnectorSyncStats {
connector_name: name.to_string(),
..Default::default()
@@ -1836,21 +1914,18 @@ fn sync_markdown_memory_connector(
if !section.body.is_empty() {
details.insert("body".to_string(), section.body.clone());
}
details.insert(
"source_path".to_string(),
settings.path.display().to_string(),
);
details.insert("source_path".to_string(), path.display().to_string());
details.insert("line".to_string(), section.line_number.to_string());
let mut metadata = BTreeMap::new();
metadata.insert("connector".to_string(), "markdown_file".to_string());
metadata.insert("connector".to_string(), connector_kind.to_string());
import_memory_connector_record(
db,
&mut stats,
default_session_id.as_deref(),
settings.default_entity_type.as_deref(),
settings.default_observation_type.as_deref(),
default_session_id,
default_entity_type,
default_observation_type,
JsonlMemoryConnectorRecord {
session_id: None,
entity_type: None,
@@ -1995,6 +2070,13 @@ fn collect_jsonl_paths(root: &Path, recurse: bool) -> Result<Vec<PathBuf>> {
Ok(paths)
}
fn collect_markdown_paths(root: &Path, recurse: bool) -> Result<Vec<PathBuf>> {
let mut paths = Vec::new();
collect_markdown_paths_inner(root, recurse, &mut paths)?;
paths.sort();
Ok(paths)
}
fn collect_jsonl_paths_inner(root: &Path, recurse: bool, paths: &mut Vec<PathBuf>) -> Result<()> {
for entry in std::fs::read_dir(root)
.with_context(|| format!("read memory connector directory {}", root.display()))?
@@ -2018,6 +2100,35 @@ fn collect_jsonl_paths_inner(root: &Path, recurse: bool, paths: &mut Vec<PathBuf
Ok(())
}
fn collect_markdown_paths_inner(
root: &Path,
recurse: bool,
paths: &mut Vec<PathBuf>,
) -> Result<()> {
for entry in std::fs::read_dir(root)
.with_context(|| format!("read memory connector directory {}", root.display()))?
{
let entry = entry?;
let path = entry.path();
if path.is_dir() {
if recurse {
collect_markdown_paths_inner(&path, recurse, paths)?;
}
continue;
}
let is_markdown = path
.extension()
.and_then(|value| value.to_str())
.is_some_and(|value| {
value.eq_ignore_ascii_case("md") || value.eq_ignore_ascii_case("markdown")
});
if is_markdown {
paths.push(path);
}
}
Ok(())
}
fn parse_dotenv_memory_entries(
path: &Path,
body: &str,
@@ -5820,6 +5931,91 @@ Guide users to repair before reinstall so wiped setups do not buy twice.
Ok(())
}
#[test]
fn sync_memory_connector_imports_markdown_directory_sections() -> Result<()> {
let tempdir = TestDir::new("graph-connector-sync-markdown-dir")?;
let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?;
let now = chrono::Utc::now();
db.insert_session(&session::Session {
id: "session-1".to_string(),
task: "knowledge import".to_string(),
project: "everything-claude-code".to_string(),
task_group: "memory".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: session::SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: session::SessionMetrics::default(),
})?;
let connector_dir = tempdir.path().join("workspace-notes");
fs::create_dir_all(connector_dir.join("nested"))?;
fs::write(
connector_dir.join("incident.md"),
r#"# Billing incident
Customer wiped setup and got charged twice after reinstalling.
## Portal routing
Route existing installs to portal first before presenting checkout again.
"#,
)?;
fs::write(
connector_dir.join("nested").join("docs.markdown"),
r#"# Docs fix
Guide users to repair before reinstall so wiped setups do not buy twice.
"#,
)?;
fs::write(connector_dir.join("ignore.txt"), "not imported")?;
let mut cfg = config::Config::default();
cfg.memory_connectors.insert(
"workspace_notes".to_string(),
config::MemoryConnectorConfig::MarkdownDirectory(
config::MemoryConnectorMarkdownDirectoryConfig {
path: connector_dir.clone(),
recurse: true,
session_id: Some("latest".to_string()),
default_entity_type: Some("note_section".to_string()),
default_observation_type: Some("external_note".to_string()),
},
),
);
let stats = sync_memory_connector(&db, &cfg, "workspace_notes", 10)?;
assert_eq!(stats.records_read, 3);
assert_eq!(stats.entities_upserted, 3);
assert_eq!(stats.observations_added, 3);
assert_eq!(stats.skipped_records, 0);
let recalled = db.recall_context_entities(None, "charged twice portal docs", 10)?;
assert!(recalled
.iter()
.any(|entry| entry.entity.name == "Billing incident"));
assert!(recalled
.iter()
.any(|entry| entry.entity.name == "Portal routing"));
assert!(recalled.iter().any(|entry| entry.entity.name == "Docs fix"));
let docs_fix = recalled
.iter()
.find(|entry| entry.entity.name == "Docs fix")
.expect("docs section should exist");
let expected_anchor_path = format!(
"{}#docs-fix",
connector_dir.join("nested").join("docs.markdown").display()
);
assert_eq!(
docs_fix.entity.path.as_deref(),
Some(expected_anchor_path.as_str())
);
Ok(())
}
#[test]
fn sync_memory_connector_imports_dotenv_entries_safely() -> Result<()> {
let tempdir = TestDir::new("graph-connector-sync-dotenv")?;