From 9c525009d75af9e66286d86e8261985e6c8d52cb Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Fri, 10 Apr 2026 07:16:41 -0700 Subject: [PATCH] feat: add ecc2 memory connector status reporting --- ecc2/src/main.rs | 316 ++++++++++++++++++++++++++++++++++++++ ecc2/src/session/store.rs | 58 +++++++ 2 files changed, 374 insertions(+) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index fa145fba..273ed1b1 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -543,6 +543,12 @@ enum GraphCommands { #[arg(long)] json: bool, }, + /// Show configured memory connectors plus checkpoint status + Connectors { + /// Emit machine-readable JSON instead of the human summary + #[arg(long)] + json: bool, + }, /// Recall relevant context graph entities for a query Recall { /// Filter by source session ID or alias @@ -633,6 +639,25 @@ struct GraphConnectorSyncReport { connectors: Vec, } +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +struct GraphConnectorStatus { + connector_name: String, + connector_kind: String, + source_path: String, + recurse: bool, + default_session_id: Option, + default_entity_type: Option, + default_observation_type: Option, + synced_sources: usize, + last_synced_at: Option>, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +struct GraphConnectorStatusReport { + configured_connectors: usize, + connectors: Vec, +} + #[derive(Debug, Clone, Default, Deserialize)] #[serde(default)] struct JsonlMemoryConnectorRecord { @@ -1529,6 +1554,14 @@ async fn main() -> Result<()> { } } } + GraphCommands::Connectors { json } => { + let report = memory_connector_status_report(&db, &cfg)?; + if json { + println!("{}", serde_json::to_string_pretty(&report)?); + } else { + println!("{}", format_graph_connector_status_report_human(&report)); + } + } GraphCommands::Recall { session_id, query, @@ -1760,6 +1793,95 @@ fn sync_all_memory_connectors( Ok(report) } +fn memory_connector_status_report( + db: &session::store::StateStore, + cfg: &config::Config, +) -> Result { + let mut report = GraphConnectorStatusReport { + configured_connectors: cfg.memory_connectors.len(), + connectors: Vec::with_capacity(cfg.memory_connectors.len()), + }; + + for (name, connector) in &cfg.memory_connectors { + let checkpoint = db.connector_checkpoint_summary(name)?; + let ( + connector_kind, + source_path, + recurse, + default_session_id, + default_entity_type, + default_observation_type, + ) = describe_memory_connector(connector); + report.connectors.push(GraphConnectorStatus { + connector_name: name.to_string(), + connector_kind, + source_path, + recurse, + default_session_id, + default_entity_type, + default_observation_type, + synced_sources: checkpoint.synced_sources, + last_synced_at: checkpoint.last_synced_at, + }); + } + + Ok(report) +} + +fn describe_memory_connector( + connector: &config::MemoryConnectorConfig, +) -> ( + String, + String, + bool, + Option, + Option, + Option, +) { + match connector { + config::MemoryConnectorConfig::JsonlFile(settings) => ( + "jsonl_file".to_string(), + settings.path.display().to_string(), + false, + settings.session_id.clone(), + settings.default_entity_type.clone(), + settings.default_observation_type.clone(), + ), + config::MemoryConnectorConfig::JsonlDirectory(settings) => ( + "jsonl_directory".to_string(), + settings.path.display().to_string(), + settings.recurse, + settings.session_id.clone(), + settings.default_entity_type.clone(), + settings.default_observation_type.clone(), + ), + config::MemoryConnectorConfig::MarkdownFile(settings) => ( + "markdown_file".to_string(), + settings.path.display().to_string(), + false, + settings.session_id.clone(), + settings.default_entity_type.clone(), + settings.default_observation_type.clone(), + ), + config::MemoryConnectorConfig::MarkdownDirectory(settings) => ( + "markdown_directory".to_string(), + settings.path.display().to_string(), + settings.recurse, + settings.session_id.clone(), + settings.default_entity_type.clone(), + settings.default_observation_type.clone(), + ), + config::MemoryConnectorConfig::DotenvFile(settings) => ( + "dotenv_file".to_string(), + settings.path.display().to_string(), + false, + settings.session_id.clone(), + settings.default_entity_type.clone(), + settings.default_observation_type.clone(), + ), + } +} + fn sync_jsonl_memory_connector( db: &session::store::StateStore, name: &str, @@ -3578,6 +3700,48 @@ fn format_graph_connector_sync_report_human(report: &GraphConnectorSyncReport) - lines.join("\n") } +fn format_graph_connector_status_report_human(report: &GraphConnectorStatusReport) -> String { + let mut lines = vec![format!( + "Memory connectors: {} configured", + report.configured_connectors + )]; + + if report.connectors.is_empty() { + lines.push("- none".to_string()); + return lines.join("\n"); + } + + for connector in &report.connectors { + lines.push(format!( + "- {} [{}]", + connector.connector_name, connector.connector_kind + )); + lines.push(format!(" source {}", connector.source_path)); + if connector.recurse { + lines.push(" recurse true".to_string()); + } + lines.push(format!(" synced sources {}", connector.synced_sources)); + lines.push(format!( + " last synced {}", + connector + .last_synced_at + .map(|value| value.to_rfc3339()) + .unwrap_or_else(|| "never".to_string()) + )); + if let Some(session_id) = &connector.default_session_id { + lines.push(format!(" default session {}", session_id)); + } + if let Some(entity_type) = &connector.default_entity_type { + lines.push(format!(" default entity type {}", entity_type)); + } + if let Some(observation_type) = &connector.default_observation_type { + lines.push(format!(" default observation type {}", observation_type)); + } + } + + lines.join("\n") +} + fn format_graph_entity_detail_human(detail: &session::ContextGraphEntityDetail) -> String { let mut lines = vec![format_graph_entity_human(&detail.entity)]; lines.push(String::new()); @@ -5709,6 +5873,21 @@ mod tests { } } + #[test] + fn cli_parses_graph_connectors_command() { + let cli = Cli::try_parse_from(["ecc", "graph", "connectors", "--json"]) + .expect("graph connectors should parse"); + + match cli.command { + Some(Commands::Graph { + command: GraphCommands::Connectors { json }, + }) => { + assert!(json); + } + _ => panic!("expected graph connectors subcommand"), + } + } + #[test] fn format_decisions_human_renders_details() { let text = format_decisions_human( @@ -5934,6 +6113,143 @@ mod tests { assert!(text.contains(" skipped unchanged sources 2")); } + #[test] + fn format_graph_connector_status_report_human_renders_connector_details() { + let text = format_graph_connector_status_report_human(&GraphConnectorStatusReport { + configured_connectors: 2, + connectors: vec![ + GraphConnectorStatus { + connector_name: "hermes_notes".to_string(), + connector_kind: "jsonl_directory".to_string(), + source_path: "/tmp/hermes-notes".to_string(), + recurse: true, + default_session_id: Some("latest".to_string()), + default_entity_type: Some("incident".to_string()), + default_observation_type: Some("external_note".to_string()), + synced_sources: 3, + last_synced_at: Some( + chrono::DateTime::parse_from_rfc3339("2026-04-10T12:34:56Z") + .unwrap() + .with_timezone(&chrono::Utc), + ), + }, + GraphConnectorStatus { + connector_name: "workspace_env".to_string(), + connector_kind: "dotenv_file".to_string(), + source_path: "/tmp/.env".to_string(), + recurse: false, + default_session_id: None, + default_entity_type: None, + default_observation_type: None, + synced_sources: 0, + last_synced_at: None, + }, + ], + }); + + assert!(text.contains("Memory connectors: 2 configured")); + assert!(text.contains("- hermes_notes [jsonl_directory]")); + assert!(text.contains(" source /tmp/hermes-notes")); + assert!(text.contains(" recurse true")); + assert!(text.contains(" synced sources 3")); + assert!(text.contains(" last synced 2026-04-10T12:34:56+00:00")); + assert!(text.contains(" default session latest")); + assert!(text.contains(" default entity type incident")); + assert!(text.contains(" default observation type external_note")); + assert!(text.contains("- workspace_env [dotenv_file]")); + assert!(text.contains(" last synced never")); + } + + #[test] + fn memory_connector_status_report_includes_checkpoint_state() -> Result<()> { + let tempdir = TestDir::new("graph-connector-status-report")?; + let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?; + + let markdown_path = tempdir.path().join("workspace-memory.md"); + fs::write( + &markdown_path, + r#"# Billing incident +Customer wiped setup and got charged twice after reinstalling. +"#, + )?; + + let mut cfg = config::Config::default(); + cfg.memory_connectors.insert( + "workspace_note".to_string(), + config::MemoryConnectorConfig::MarkdownFile( + config::MemoryConnectorMarkdownFileConfig { + path: markdown_path.clone(), + session_id: Some("latest".to_string()), + default_entity_type: Some("note_section".to_string()), + default_observation_type: Some("external_note".to_string()), + }, + ), + ); + cfg.memory_connectors.insert( + "workspace_env".to_string(), + config::MemoryConnectorConfig::DotenvFile(config::MemoryConnectorDotenvFileConfig { + path: tempdir.path().join(".env"), + session_id: None, + default_entity_type: Some("service_config".to_string()), + default_observation_type: Some("external_config".to_string()), + key_prefixes: vec!["PUBLIC_".to_string()], + include_keys: Vec::new(), + exclude_keys: Vec::new(), + include_safe_values: true, + }), + ); + + db.upsert_connector_source_checkpoint( + "workspace_note", + &markdown_path.display().to_string(), + "sig-a", + )?; + + let report = memory_connector_status_report(&db, &cfg)?; + assert_eq!(report.configured_connectors, 2); + assert_eq!( + report + .connectors + .iter() + .map(|connector| connector.connector_name.as_str()) + .collect::>(), + vec!["workspace_env", "workspace_note"] + ); + + let workspace_env = report + .connectors + .iter() + .find(|connector| connector.connector_name == "workspace_env") + .expect("workspace_env connector should exist"); + assert_eq!(workspace_env.connector_kind, "dotenv_file"); + assert_eq!(workspace_env.synced_sources, 0); + assert!(workspace_env.last_synced_at.is_none()); + + let workspace_note = report + .connectors + .iter() + .find(|connector| connector.connector_name == "workspace_note") + .expect("workspace_note connector should exist"); + assert_eq!(workspace_note.connector_kind, "markdown_file"); + assert_eq!( + workspace_note.source_path, + markdown_path.display().to_string() + ); + assert_eq!(workspace_note.default_session_id.as_deref(), Some("latest")); + assert_eq!( + workspace_note.default_entity_type.as_deref(), + Some("note_section") + ); + assert_eq!( + workspace_note.default_observation_type.as_deref(), + Some("external_note") + ); + assert_eq!(workspace_note.synced_sources, 1); + assert!(workspace_note.last_synced_at.is_some()); + + Ok(()) + } + #[test] fn sync_memory_connector_imports_jsonl_observations() -> Result<()> { let tempdir = TestDir::new("graph-connector-sync")?; diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index b8465f62..4ec306be 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -43,6 +43,13 @@ pub struct FileActivityOverlap { pub timestamp: chrono::DateTime, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct ConnectorCheckpointSummary { + pub connector_name: String, + pub synced_sources: usize, + pub last_synced_at: Option>, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct ConflictIncident { pub id: i64, @@ -2441,6 +2448,32 @@ impl StateStore { Ok(()) } + pub fn connector_checkpoint_summary( + &self, + connector_name: &str, + ) -> Result { + self.conn + .query_row( + "SELECT COUNT(*), MAX(updated_at) + FROM context_graph_connector_checkpoints + WHERE connector_name = ?1", + rusqlite::params![connector_name], + |row| { + let synced_sources = row.get::<_, i64>(0)? as usize; + let last_synced_at = row + .get::<_, Option>(1)? + .map(|raw| parse_store_timestamp(raw, 1)) + .transpose()?; + Ok(ConnectorCheckpointSummary { + connector_name: connector_name.to_string(), + synced_sources, + last_synced_at, + }) + }, + ) + .map_err(Into::into) + } + fn compact_context_graph_observations( &self, session_id: Option<&str>, @@ -4809,6 +4842,31 @@ mod tests { Ok(()) } + #[test] + fn connector_checkpoint_summary_reports_synced_sources_and_timestamp() -> Result<()> { + let tempdir = TestDir::new("store-connector-checkpoint-summary")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + + let empty = db.connector_checkpoint_summary("workspace_notes")?; + assert_eq!(empty.connector_name, "workspace_notes"); + assert_eq!(empty.synced_sources, 0); + assert!(empty.last_synced_at.is_none()); + + db.upsert_connector_source_checkpoint( + "workspace_notes", + "/tmp/notes/incident.md", + "sig-a", + )?; + db.upsert_connector_source_checkpoint("workspace_notes", "/tmp/notes/docs.md", "sig-b")?; + + let summary = db.connector_checkpoint_summary("workspace_notes")?; + assert_eq!(summary.connector_name, "workspace_notes"); + assert_eq!(summary.synced_sources, 2); + assert!(summary.last_synced_at.is_some()); + + Ok(()) + } + #[test] fn context_graph_detail_includes_incoming_and_outgoing_relations() -> Result<()> { let tempdir = TestDir::new("store-context-relations")?;