feat: add ecc2 memory connector status reporting

This commit is contained in:
Affaan Mustafa
2026-04-10 07:16:41 -07:00
parent 9c294f7815
commit 9c525009d7
2 changed files with 374 additions and 0 deletions

View File

@@ -543,6 +543,12 @@ enum GraphCommands {
#[arg(long)] #[arg(long)]
json: bool, json: bool,
}, },
/// Show configured memory connectors plus checkpoint status
Connectors {
/// Emit machine-readable JSON instead of the human summary
#[arg(long)]
json: bool,
},
/// Recall relevant context graph entities for a query /// Recall relevant context graph entities for a query
Recall { Recall {
/// Filter by source session ID or alias /// Filter by source session ID or alias
@@ -633,6 +639,25 @@ struct GraphConnectorSyncReport {
connectors: Vec<GraphConnectorSyncStats>, connectors: Vec<GraphConnectorSyncStats>,
} }
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
struct GraphConnectorStatus {
connector_name: String,
connector_kind: String,
source_path: String,
recurse: bool,
default_session_id: Option<String>,
default_entity_type: Option<String>,
default_observation_type: Option<String>,
synced_sources: usize,
last_synced_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
struct GraphConnectorStatusReport {
configured_connectors: usize,
connectors: Vec<GraphConnectorStatus>,
}
#[derive(Debug, Clone, Default, Deserialize)] #[derive(Debug, Clone, Default, Deserialize)]
#[serde(default)] #[serde(default)]
struct JsonlMemoryConnectorRecord { struct JsonlMemoryConnectorRecord {
@@ -1529,6 +1554,14 @@ async fn main() -> Result<()> {
} }
} }
} }
GraphCommands::Connectors { json } => {
let report = memory_connector_status_report(&db, &cfg)?;
if json {
println!("{}", serde_json::to_string_pretty(&report)?);
} else {
println!("{}", format_graph_connector_status_report_human(&report));
}
}
GraphCommands::Recall { GraphCommands::Recall {
session_id, session_id,
query, query,
@@ -1760,6 +1793,95 @@ fn sync_all_memory_connectors(
Ok(report) Ok(report)
} }
fn memory_connector_status_report(
db: &session::store::StateStore,
cfg: &config::Config,
) -> Result<GraphConnectorStatusReport> {
let mut report = GraphConnectorStatusReport {
configured_connectors: cfg.memory_connectors.len(),
connectors: Vec::with_capacity(cfg.memory_connectors.len()),
};
for (name, connector) in &cfg.memory_connectors {
let checkpoint = db.connector_checkpoint_summary(name)?;
let (
connector_kind,
source_path,
recurse,
default_session_id,
default_entity_type,
default_observation_type,
) = describe_memory_connector(connector);
report.connectors.push(GraphConnectorStatus {
connector_name: name.to_string(),
connector_kind,
source_path,
recurse,
default_session_id,
default_entity_type,
default_observation_type,
synced_sources: checkpoint.synced_sources,
last_synced_at: checkpoint.last_synced_at,
});
}
Ok(report)
}
fn describe_memory_connector(
connector: &config::MemoryConnectorConfig,
) -> (
String,
String,
bool,
Option<String>,
Option<String>,
Option<String>,
) {
match connector {
config::MemoryConnectorConfig::JsonlFile(settings) => (
"jsonl_file".to_string(),
settings.path.display().to_string(),
false,
settings.session_id.clone(),
settings.default_entity_type.clone(),
settings.default_observation_type.clone(),
),
config::MemoryConnectorConfig::JsonlDirectory(settings) => (
"jsonl_directory".to_string(),
settings.path.display().to_string(),
settings.recurse,
settings.session_id.clone(),
settings.default_entity_type.clone(),
settings.default_observation_type.clone(),
),
config::MemoryConnectorConfig::MarkdownFile(settings) => (
"markdown_file".to_string(),
settings.path.display().to_string(),
false,
settings.session_id.clone(),
settings.default_entity_type.clone(),
settings.default_observation_type.clone(),
),
config::MemoryConnectorConfig::MarkdownDirectory(settings) => (
"markdown_directory".to_string(),
settings.path.display().to_string(),
settings.recurse,
settings.session_id.clone(),
settings.default_entity_type.clone(),
settings.default_observation_type.clone(),
),
config::MemoryConnectorConfig::DotenvFile(settings) => (
"dotenv_file".to_string(),
settings.path.display().to_string(),
false,
settings.session_id.clone(),
settings.default_entity_type.clone(),
settings.default_observation_type.clone(),
),
}
}
fn sync_jsonl_memory_connector( fn sync_jsonl_memory_connector(
db: &session::store::StateStore, db: &session::store::StateStore,
name: &str, name: &str,
@@ -3578,6 +3700,48 @@ fn format_graph_connector_sync_report_human(report: &GraphConnectorSyncReport) -
lines.join("\n") lines.join("\n")
} }
fn format_graph_connector_status_report_human(report: &GraphConnectorStatusReport) -> String {
let mut lines = vec![format!(
"Memory connectors: {} configured",
report.configured_connectors
)];
if report.connectors.is_empty() {
lines.push("- none".to_string());
return lines.join("\n");
}
for connector in &report.connectors {
lines.push(format!(
"- {} [{}]",
connector.connector_name, connector.connector_kind
));
lines.push(format!(" source {}", connector.source_path));
if connector.recurse {
lines.push(" recurse true".to_string());
}
lines.push(format!(" synced sources {}", connector.synced_sources));
lines.push(format!(
" last synced {}",
connector
.last_synced_at
.map(|value| value.to_rfc3339())
.unwrap_or_else(|| "never".to_string())
));
if let Some(session_id) = &connector.default_session_id {
lines.push(format!(" default session {}", session_id));
}
if let Some(entity_type) = &connector.default_entity_type {
lines.push(format!(" default entity type {}", entity_type));
}
if let Some(observation_type) = &connector.default_observation_type {
lines.push(format!(" default observation type {}", observation_type));
}
}
lines.join("\n")
}
fn format_graph_entity_detail_human(detail: &session::ContextGraphEntityDetail) -> String { fn format_graph_entity_detail_human(detail: &session::ContextGraphEntityDetail) -> String {
let mut lines = vec![format_graph_entity_human(&detail.entity)]; let mut lines = vec![format_graph_entity_human(&detail.entity)];
lines.push(String::new()); lines.push(String::new());
@@ -5709,6 +5873,21 @@ mod tests {
} }
} }
#[test]
fn cli_parses_graph_connectors_command() {
let cli = Cli::try_parse_from(["ecc", "graph", "connectors", "--json"])
.expect("graph connectors should parse");
match cli.command {
Some(Commands::Graph {
command: GraphCommands::Connectors { json },
}) => {
assert!(json);
}
_ => panic!("expected graph connectors subcommand"),
}
}
#[test] #[test]
fn format_decisions_human_renders_details() { fn format_decisions_human_renders_details() {
let text = format_decisions_human( let text = format_decisions_human(
@@ -5934,6 +6113,143 @@ mod tests {
assert!(text.contains(" skipped unchanged sources 2")); assert!(text.contains(" skipped unchanged sources 2"));
} }
#[test]
fn format_graph_connector_status_report_human_renders_connector_details() {
let text = format_graph_connector_status_report_human(&GraphConnectorStatusReport {
configured_connectors: 2,
connectors: vec![
GraphConnectorStatus {
connector_name: "hermes_notes".to_string(),
connector_kind: "jsonl_directory".to_string(),
source_path: "/tmp/hermes-notes".to_string(),
recurse: true,
default_session_id: Some("latest".to_string()),
default_entity_type: Some("incident".to_string()),
default_observation_type: Some("external_note".to_string()),
synced_sources: 3,
last_synced_at: Some(
chrono::DateTime::parse_from_rfc3339("2026-04-10T12:34:56Z")
.unwrap()
.with_timezone(&chrono::Utc),
),
},
GraphConnectorStatus {
connector_name: "workspace_env".to_string(),
connector_kind: "dotenv_file".to_string(),
source_path: "/tmp/.env".to_string(),
recurse: false,
default_session_id: None,
default_entity_type: None,
default_observation_type: None,
synced_sources: 0,
last_synced_at: None,
},
],
});
assert!(text.contains("Memory connectors: 2 configured"));
assert!(text.contains("- hermes_notes [jsonl_directory]"));
assert!(text.contains(" source /tmp/hermes-notes"));
assert!(text.contains(" recurse true"));
assert!(text.contains(" synced sources 3"));
assert!(text.contains(" last synced 2026-04-10T12:34:56+00:00"));
assert!(text.contains(" default session latest"));
assert!(text.contains(" default entity type incident"));
assert!(text.contains(" default observation type external_note"));
assert!(text.contains("- workspace_env [dotenv_file]"));
assert!(text.contains(" last synced never"));
}
#[test]
fn memory_connector_status_report_includes_checkpoint_state() -> Result<()> {
let tempdir = TestDir::new("graph-connector-status-report")?;
let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?;
let markdown_path = tempdir.path().join("workspace-memory.md");
fs::write(
&markdown_path,
r#"# Billing incident
Customer wiped setup and got charged twice after reinstalling.
"#,
)?;
let mut cfg = config::Config::default();
cfg.memory_connectors.insert(
"workspace_note".to_string(),
config::MemoryConnectorConfig::MarkdownFile(
config::MemoryConnectorMarkdownFileConfig {
path: markdown_path.clone(),
session_id: Some("latest".to_string()),
default_entity_type: Some("note_section".to_string()),
default_observation_type: Some("external_note".to_string()),
},
),
);
cfg.memory_connectors.insert(
"workspace_env".to_string(),
config::MemoryConnectorConfig::DotenvFile(config::MemoryConnectorDotenvFileConfig {
path: tempdir.path().join(".env"),
session_id: None,
default_entity_type: Some("service_config".to_string()),
default_observation_type: Some("external_config".to_string()),
key_prefixes: vec!["PUBLIC_".to_string()],
include_keys: Vec::new(),
exclude_keys: Vec::new(),
include_safe_values: true,
}),
);
db.upsert_connector_source_checkpoint(
"workspace_note",
&markdown_path.display().to_string(),
"sig-a",
)?;
let report = memory_connector_status_report(&db, &cfg)?;
assert_eq!(report.configured_connectors, 2);
assert_eq!(
report
.connectors
.iter()
.map(|connector| connector.connector_name.as_str())
.collect::<Vec<_>>(),
vec!["workspace_env", "workspace_note"]
);
let workspace_env = report
.connectors
.iter()
.find(|connector| connector.connector_name == "workspace_env")
.expect("workspace_env connector should exist");
assert_eq!(workspace_env.connector_kind, "dotenv_file");
assert_eq!(workspace_env.synced_sources, 0);
assert!(workspace_env.last_synced_at.is_none());
let workspace_note = report
.connectors
.iter()
.find(|connector| connector.connector_name == "workspace_note")
.expect("workspace_note connector should exist");
assert_eq!(workspace_note.connector_kind, "markdown_file");
assert_eq!(
workspace_note.source_path,
markdown_path.display().to_string()
);
assert_eq!(workspace_note.default_session_id.as_deref(), Some("latest"));
assert_eq!(
workspace_note.default_entity_type.as_deref(),
Some("note_section")
);
assert_eq!(
workspace_note.default_observation_type.as_deref(),
Some("external_note")
);
assert_eq!(workspace_note.synced_sources, 1);
assert!(workspace_note.last_synced_at.is_some());
Ok(())
}
#[test] #[test]
fn sync_memory_connector_imports_jsonl_observations() -> Result<()> { fn sync_memory_connector_imports_jsonl_observations() -> Result<()> {
let tempdir = TestDir::new("graph-connector-sync")?; let tempdir = TestDir::new("graph-connector-sync")?;

View File

@@ -43,6 +43,13 @@ pub struct FileActivityOverlap {
pub timestamp: chrono::DateTime<chrono::Utc>, pub timestamp: chrono::DateTime<chrono::Utc>,
} }
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct ConnectorCheckpointSummary {
pub connector_name: String,
pub synced_sources: usize,
pub last_synced_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct ConflictIncident { pub struct ConflictIncident {
pub id: i64, pub id: i64,
@@ -2441,6 +2448,32 @@ impl StateStore {
Ok(()) Ok(())
} }
pub fn connector_checkpoint_summary(
&self,
connector_name: &str,
) -> Result<ConnectorCheckpointSummary> {
self.conn
.query_row(
"SELECT COUNT(*), MAX(updated_at)
FROM context_graph_connector_checkpoints
WHERE connector_name = ?1",
rusqlite::params![connector_name],
|row| {
let synced_sources = row.get::<_, i64>(0)? as usize;
let last_synced_at = row
.get::<_, Option<String>>(1)?
.map(|raw| parse_store_timestamp(raw, 1))
.transpose()?;
Ok(ConnectorCheckpointSummary {
connector_name: connector_name.to_string(),
synced_sources,
last_synced_at,
})
},
)
.map_err(Into::into)
}
fn compact_context_graph_observations( fn compact_context_graph_observations(
&self, &self,
session_id: Option<&str>, session_id: Option<&str>,
@@ -4809,6 +4842,31 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn connector_checkpoint_summary_reports_synced_sources_and_timestamp() -> Result<()> {
let tempdir = TestDir::new("store-connector-checkpoint-summary")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let empty = db.connector_checkpoint_summary("workspace_notes")?;
assert_eq!(empty.connector_name, "workspace_notes");
assert_eq!(empty.synced_sources, 0);
assert!(empty.last_synced_at.is_none());
db.upsert_connector_source_checkpoint(
"workspace_notes",
"/tmp/notes/incident.md",
"sig-a",
)?;
db.upsert_connector_source_checkpoint("workspace_notes", "/tmp/notes/docs.md", "sig-b")?;
let summary = db.connector_checkpoint_summary("workspace_notes")?;
assert_eq!(summary.connector_name, "workspace_notes");
assert_eq!(summary.synced_sources, 2);
assert!(summary.last_synced_at.is_some());
Ok(())
}
#[test] #[test]
fn context_graph_detail_includes_incoming_and_outgoing_relations() -> Result<()> { fn context_graph_detail_includes_incoming_and_outgoing_relations() -> Result<()> {
let tempdir = TestDir::new("store-context-relations")?; let tempdir = TestDir::new("store-context-relations")?;