From 22a5a8de6d81855372b06603d87529c784e645dc Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Fri, 10 Apr 2026 06:26:42 -0700 Subject: [PATCH] feat: add ecc2 markdown memory connectors --- ecc2/src/config/mod.rs | 45 ++++ ecc2/src/main.rs | 453 +++++++++++++++++++++++++++++++++++------ 2 files changed, 438 insertions(+), 60 deletions(-) diff --git a/ecc2/src/config/mod.rs b/ecc2/src/config/mod.rs index 0443384f..e058ba78 100644 --- a/ecc2/src/config/mod.rs +++ b/ecc2/src/config/mod.rs @@ -108,6 +108,7 @@ pub struct OrchestrationTemplateStepConfig { pub enum MemoryConnectorConfig { JsonlFile(MemoryConnectorJsonlFileConfig), JsonlDirectory(MemoryConnectorJsonlDirectoryConfig), + MarkdownFile(MemoryConnectorMarkdownFileConfig), } #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] @@ -129,6 +130,15 @@ pub struct MemoryConnectorJsonlDirectoryConfig { pub default_observation_type: Option, } +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] +pub struct MemoryConnectorMarkdownFileConfig { + pub path: PathBuf, + pub session_id: Option, + pub default_entity_type: Option, + pub default_observation_type: Option, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ResolvedOrchestrationTemplate { pub template_name: String, @@ -1323,6 +1333,41 @@ default_observation_type = "external_note" } } + #[test] + fn memory_markdown_file_connectors_deserialize_from_toml() { + let config: Config = toml::from_str( + r#" +[memory_connectors.workspace_note] +kind = "markdown_file" +path = "/tmp/hermes-memory.md" +session_id = "latest" +default_entity_type = "note_section" +default_observation_type = "external_note" +"#, + ) + .unwrap(); + + let connector = config + .memory_connectors + .get("workspace_note") + .expect("connector should deserialize"); + match connector { + crate::config::MemoryConnectorConfig::MarkdownFile(settings) => { + assert_eq!(settings.path, PathBuf::from("/tmp/hermes-memory.md")); + assert_eq!(settings.session_id.as_deref(), Some("latest")); + assert_eq!( + settings.default_entity_type.as_deref(), + Some("note_section") + ); + assert_eq!( + settings.default_observation_type.as_deref(), + Some("external_note") + ); + } + _ => panic!("expected markdown_file connector"), + } + } + #[test] fn completion_summary_notifications_deserialize_from_toml() { let config: Config = toml::from_str( diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 8ba123ab..acf3015c 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -588,6 +588,18 @@ struct JsonlMemoryConnectorRecord { details: BTreeMap, } +const MARKDOWN_CONNECTOR_SUMMARY_LIMIT: usize = 160; +const MARKDOWN_CONNECTOR_BODY_LIMIT: usize = 4000; + +#[derive(Debug, Clone)] +struct MarkdownMemorySection { + heading: String, + path: String, + summary: String, + body: String, + line_number: usize, +} + #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() @@ -1594,6 +1606,9 @@ fn sync_memory_connector( config::MemoryConnectorConfig::JsonlDirectory(settings) => { sync_jsonl_directory_memory_connector(db, name, settings, limit) } + config::MemoryConnectorConfig::MarkdownFile(settings) => { + sync_markdown_memory_connector(db, name, settings, limit) + } } } @@ -1716,72 +1731,152 @@ fn sync_jsonl_memory_reader( } }; - let session_id = match record.session_id.as_deref() { - Some(value) => match resolve_session_id(db, value) { - Ok(resolved) => Some(resolved), - Err(_) => { - stats.skipped_records += 1; - continue; - } - }, - None => default_session_id.clone(), - }; - let entity_type = record - .entity_type - .as_deref() - .or(default_entity_type) - .map(str::trim) - .filter(|value| !value.is_empty()); - let observation_type = record - .observation_type - .as_deref() - .or(default_observation_type) - .map(str::trim) - .filter(|value| !value.is_empty()); - let entity_name = record.entity_name.trim(); - let summary = record.summary.trim(); - - let Some(entity_type) = entity_type else { - stats.skipped_records += 1; - continue; - }; - let Some(observation_type) = observation_type else { - stats.skipped_records += 1; - continue; - }; - if entity_name.is_empty() || summary.is_empty() { - stats.skipped_records += 1; - continue; - } - - let entity_summary = record - .entity_summary - .as_deref() - .map(str::trim) - .filter(|value| !value.is_empty()) - .unwrap_or(summary); - let entity = db.upsert_context_entity( - session_id.as_deref(), - entity_type, - entity_name, - record.path.as_deref(), - entity_summary, - &record.metadata, + import_memory_connector_record( + db, + &mut stats, + default_session_id.as_deref(), + default_entity_type, + default_observation_type, + record, )?; - db.add_context_observation( - session_id.as_deref(), - entity.id, - observation_type, - summary, - &record.details, - )?; - stats.entities_upserted += 1; - stats.observations_added += 1; } Ok(stats) } +fn sync_markdown_memory_connector( + db: &session::store::StateStore, + name: &str, + settings: &config::MemoryConnectorMarkdownFileConfig, + limit: usize, +) -> Result { + if settings.path.as_os_str().is_empty() { + anyhow::bail!("memory connector {name} has no path configured"); + } + + let body = std::fs::read_to_string(&settings.path) + .with_context(|| format!("read memory connector file {}", settings.path.display()))?; + let default_session_id = settings + .session_id + .as_deref() + .map(|value| resolve_session_id(db, value)) + .transpose()?; + let sections = parse_markdown_memory_sections(&settings.path, &body, limit); + let mut stats = GraphConnectorSyncStats { + connector_name: name.to_string(), + ..Default::default() + }; + + for section in sections { + stats.records_read += 1; + let mut details = BTreeMap::new(); + if !section.body.is_empty() { + details.insert("body".to_string(), section.body.clone()); + } + details.insert( + "source_path".to_string(), + settings.path.display().to_string(), + ); + details.insert("line".to_string(), section.line_number.to_string()); + + let mut metadata = BTreeMap::new(); + metadata.insert("connector".to_string(), "markdown_file".to_string()); + + import_memory_connector_record( + db, + &mut stats, + default_session_id.as_deref(), + settings.default_entity_type.as_deref(), + settings.default_observation_type.as_deref(), + JsonlMemoryConnectorRecord { + session_id: None, + entity_type: None, + entity_name: section.heading, + path: Some(section.path), + entity_summary: Some(section.summary.clone()), + metadata, + observation_type: None, + summary: section.summary, + details, + }, + )?; + } + + Ok(stats) +} + +fn import_memory_connector_record( + db: &session::store::StateStore, + stats: &mut GraphConnectorSyncStats, + default_session_id: Option<&str>, + default_entity_type: Option<&str>, + default_observation_type: Option<&str>, + record: JsonlMemoryConnectorRecord, +) -> Result<()> { + let session_id = match record.session_id.as_deref() { + Some(value) => match resolve_session_id(db, value) { + Ok(resolved) => Some(resolved), + Err(_) => { + stats.skipped_records += 1; + return Ok(()); + } + }, + None => default_session_id.map(str::to_string), + }; + let entity_type = record + .entity_type + .as_deref() + .or(default_entity_type) + .map(str::trim) + .filter(|value| !value.is_empty()); + let observation_type = record + .observation_type + .as_deref() + .or(default_observation_type) + .map(str::trim) + .filter(|value| !value.is_empty()); + let entity_name = record.entity_name.trim(); + let summary = record.summary.trim(); + + let Some(entity_type) = entity_type else { + stats.skipped_records += 1; + return Ok(()); + }; + let Some(observation_type) = observation_type else { + stats.skipped_records += 1; + return Ok(()); + }; + if entity_name.is_empty() || summary.is_empty() { + stats.skipped_records += 1; + return Ok(()); + } + + let entity_summary = record + .entity_summary + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + .unwrap_or(summary); + let entity = db.upsert_context_entity( + session_id.as_deref(), + entity_type, + entity_name, + record.path.as_deref(), + entity_summary, + &record.metadata, + )?; + db.add_context_observation( + session_id.as_deref(), + entity.id, + observation_type, + summary, + &record.details, + )?; + stats.entities_upserted += 1; + stats.observations_added += 1; + Ok(()) +} + fn collect_jsonl_paths(root: &Path, recurse: bool) -> Result> { let mut paths = Vec::new(); collect_jsonl_paths_inner(root, recurse, &mut paths)?; @@ -1812,6 +1907,157 @@ fn collect_jsonl_paths_inner(root: &Path, recurse: bool, paths: &mut Vec Vec { + if limit == 0 { + return Vec::new(); + } + + let source_path = path.display().to_string(); + let fallback_heading = path + .file_stem() + .and_then(|value| value.to_str()) + .filter(|value| !value.trim().is_empty()) + .unwrap_or("note") + .trim() + .to_string(); + + let mut sections = Vec::new(); + let mut preamble = Vec::new(); + let mut current_heading: Option<(String, usize)> = None; + let mut current_body = Vec::new(); + + for (index, line) in body.lines().enumerate() { + let line_number = index + 1; + if let Some(heading) = markdown_heading_title(line) { + if let Some((title, start_line)) = current_heading.take() { + if let Some(section) = markdown_memory_section( + &source_path, + &title, + start_line, + ¤t_body.join("\n"), + ) { + sections.push(section); + } + } else if !preamble.join("\n").trim().is_empty() { + if let Some(section) = markdown_memory_section( + &source_path, + &fallback_heading, + 1, + &preamble.join("\n"), + ) { + sections.push(section); + } + } + + current_heading = Some((heading.to_string(), line_number)); + current_body.clear(); + continue; + } + + if current_heading.is_some() { + current_body.push(line.to_string()); + } else { + preamble.push(line.to_string()); + } + } + + if let Some((title, start_line)) = current_heading { + if let Some(section) = + markdown_memory_section(&source_path, &title, start_line, ¤t_body.join("\n")) + { + sections.push(section); + } + } else if let Some(section) = + markdown_memory_section(&source_path, &fallback_heading, 1, &preamble.join("\n")) + { + sections.push(section); + } + + sections.truncate(limit); + sections +} + +fn markdown_heading_title(line: &str) -> Option<&str> { + let trimmed = line.trim_start(); + let hashes = trimmed.chars().take_while(|ch| *ch == '#').count(); + if hashes == 0 || hashes > 6 { + return None; + } + let title = trimmed[hashes..].trim_start(); + if title.is_empty() { + return None; + } + Some(title.trim()) +} + +fn markdown_memory_section( + source_path: &str, + heading: &str, + line_number: usize, + body: &str, +) -> Option { + let heading = heading.trim(); + if heading.is_empty() { + return None; + } + let normalized_body = body.trim(); + let summary = markdown_section_summary(heading, normalized_body); + if summary.is_empty() { + return None; + } + let slug = markdown_heading_slug(heading); + let path = if slug.is_empty() { + source_path.to_string() + } else { + format!("{source_path}#{slug}") + }; + + Some(MarkdownMemorySection { + heading: truncate_connector_text(heading, MARKDOWN_CONNECTOR_SUMMARY_LIMIT), + path, + summary, + body: truncate_connector_text(normalized_body, MARKDOWN_CONNECTOR_BODY_LIMIT), + line_number, + }) +} + +fn markdown_section_summary(heading: &str, body: &str) -> String { + let candidate = body + .lines() + .map(str::trim) + .find(|line| !line.is_empty()) + .unwrap_or(heading); + truncate_connector_text(candidate, MARKDOWN_CONNECTOR_SUMMARY_LIMIT) +} + +fn markdown_heading_slug(value: &str) -> String { + let mut slug = String::new(); + let mut last_dash = false; + for ch in value.chars() { + if ch.is_ascii_alphanumeric() { + slug.push(ch.to_ascii_lowercase()); + last_dash = false; + } else if !last_dash { + slug.push('-'); + last_dash = true; + } + } + slug.trim_matches('-').to_string() +} + +fn truncate_connector_text(value: &str, max_chars: usize) -> String { + let trimmed = value.trim(); + if trimmed.chars().count() <= max_chars { + return trimmed.to_string(); + } + let truncated: String = trimmed.chars().take(max_chars.saturating_sub(1)).collect(); + format!("{truncated}…") +} + fn build_message( kind: MessageKindArg, text: String, @@ -5144,6 +5390,93 @@ mod tests { Ok(()) } + #[test] + fn sync_memory_connector_imports_markdown_file_sections() -> Result<()> { + let tempdir = TestDir::new("graph-connector-sync-markdown")?; + let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?; + let now = chrono::Utc::now(); + db.insert_session(&session::Session { + id: "session-1".to_string(), + task: "knowledge import".to_string(), + project: "everything-claude-code".to_string(), + task_group: "memory".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: session::SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: session::SessionMetrics::default(), + })?; + + let connector_path = tempdir.path().join("workspace-memory.md"); + fs::write( + &connector_path, + r#"# Billing incident +Customer wiped setup and got charged twice after reinstalling. + +## Portal routing +Route existing installs to portal first before presenting checkout again. + +## Docs fix +Guide users to repair before reinstall so wiped setups do not buy twice. +"#, + )?; + + let mut cfg = config::Config::default(); + cfg.memory_connectors.insert( + "workspace_note".to_string(), + config::MemoryConnectorConfig::MarkdownFile( + config::MemoryConnectorMarkdownFileConfig { + path: connector_path.clone(), + session_id: Some("latest".to_string()), + default_entity_type: Some("note_section".to_string()), + default_observation_type: Some("external_note".to_string()), + }, + ), + ); + + let stats = sync_memory_connector(&db, &cfg, "workspace_note", 10)?; + assert_eq!(stats.records_read, 3); + assert_eq!(stats.entities_upserted, 3); + assert_eq!(stats.observations_added, 3); + assert_eq!(stats.skipped_records, 0); + + let recalled = db.recall_context_entities(None, "charged twice reinstall", 10)?; + assert!(recalled + .iter() + .any(|entry| entry.entity.name == "Billing incident")); + assert!(recalled.iter().any(|entry| entry.entity.name == "Docs fix")); + + let billing = recalled + .iter() + .find(|entry| entry.entity.name == "Billing incident") + .expect("billing section should exist"); + let expected_anchor_path = format!("{}#billing-incident", connector_path.display()); + assert_eq!( + billing.entity.path.as_deref(), + Some(expected_anchor_path.as_str()) + ); + let observations = db.list_context_observations(Some(billing.entity.id), 5)?; + assert_eq!(observations.len(), 1); + let expected_source_path = connector_path.display().to_string(); + assert_eq!( + observations[0] + .details + .get("source_path") + .map(String::as_str), + Some(expected_source_path.as_str()) + ); + assert!(observations[0] + .details + .get("body") + .is_some_and(|value: &String| value.contains("charged twice"))); + + Ok(()) + } + #[test] fn format_graph_sync_stats_human_renders_counts() { let text = format_graph_sync_stats_human(