feat: add ecc2 legacy workspace memory import

This commit is contained in:
Affaan Mustafa
2026-04-10 11:10:40 -07:00
parent 790cb0205c
commit b6426ade32
3 changed files with 233 additions and 1 deletions

View File

@@ -620,6 +620,18 @@ enum MigrationCommands {
#[arg(long)]
json: bool,
},
/// Import legacy workspace memory into the ECC2 context graph
ImportMemory {
/// Path to the legacy Hermes/OpenClaw workspace root
#[arg(long)]
source: PathBuf,
/// Maximum imported records across all synthesized connectors
#[arg(long, default_value_t = 100)]
limit: usize,
/// Emit machine-readable JSON instead of the human summary
#[arg(long)]
json: bool,
},
}
#[derive(clap::Subcommand, Debug)]
@@ -1019,6 +1031,13 @@ struct LegacyScheduleImportReport {
jobs: Vec<LegacyScheduleImportJobReport>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct LegacyMemoryImportReport {
source: String,
connectors_detected: usize,
report: GraphConnectorSyncReport,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
struct RemoteDispatchHttpRequest {
task: String,
@@ -1800,6 +1819,18 @@ async fn main() -> Result<()> {
println!("{}", format_legacy_schedule_import_human(&report));
}
}
MigrationCommands::ImportMemory {
source,
limit,
json,
} => {
let report = import_legacy_memory(&db, &cfg, &source, limit)?;
if json {
println!("{}", serde_json::to_string_pretty(&report)?);
} else {
println!("{}", format_legacy_memory_import_human(&report));
}
}
},
Some(Commands::Graph { command }) => match command {
GraphCommands::AddEntity {
@@ -5284,6 +5315,65 @@ fn import_legacy_schedules(
Ok(report)
}
fn import_legacy_memory(
db: &session::store::StateStore,
cfg: &config::Config,
source: &Path,
limit: usize,
) -> Result<LegacyMemoryImportReport> {
let source = source
.canonicalize()
.with_context(|| format!("Legacy workspace not found: {}", source.display()))?;
if !source.is_dir() {
anyhow::bail!(
"Legacy workspace source must be a directory: {}",
source.display()
);
}
let mut import_cfg = cfg.clone();
import_cfg.memory_connectors.clear();
let workspace_dir = source.join("workspace");
if workspace_dir.is_dir() {
if !collect_markdown_paths(&workspace_dir, true)?.is_empty() {
import_cfg.memory_connectors.insert(
"legacy_workspace_markdown".to_string(),
config::MemoryConnectorConfig::MarkdownDirectory(
config::MemoryConnectorMarkdownDirectoryConfig {
path: workspace_dir.clone(),
recurse: true,
session_id: None,
default_entity_type: Some("legacy_workspace_note".to_string()),
default_observation_type: Some("legacy_workspace_memory".to_string()),
},
),
);
}
if !collect_jsonl_paths(&workspace_dir, true)?.is_empty() {
import_cfg.memory_connectors.insert(
"legacy_workspace_jsonl".to_string(),
config::MemoryConnectorConfig::JsonlDirectory(
config::MemoryConnectorJsonlDirectoryConfig {
path: workspace_dir,
recurse: true,
session_id: None,
default_entity_type: Some("legacy_workspace_record".to_string()),
default_observation_type: Some("legacy_workspace_memory".to_string()),
},
),
);
}
}
let report = sync_all_memory_connectors(db, &import_cfg, limit)?;
Ok(LegacyMemoryImportReport {
source: source.display().to_string(),
connectors_detected: import_cfg.memory_connectors.len(),
report,
})
}
fn build_legacy_migration_plan_report(
audit: &LegacyMigrationAuditReport,
) -> LegacyMigrationPlanReport {
@@ -5696,6 +5786,41 @@ fn format_legacy_schedule_import_human(report: &LegacyScheduleImportReport) -> S
lines.join("\n")
}
fn format_legacy_memory_import_human(report: &LegacyMemoryImportReport) -> String {
let mut lines = vec![
format!(
"Legacy workspace memory import complete for {}",
report.source
),
format!("- connectors detected {}", report.connectors_detected),
format!("- connectors synced {}", report.report.connectors_synced),
format!("- records read {}", report.report.records_read),
format!("- entities upserted {}", report.report.entities_upserted),
format!("- observations added {}", report.report.observations_added),
format!("- skipped records {}", report.report.skipped_records),
format!(
"- skipped unchanged sources {}",
report.report.skipped_unchanged_sources
),
];
if !report.report.connectors.is_empty() {
lines.push("Connectors".to_string());
for connector in &report.report.connectors {
lines.push(format!(
"- {} | records {} | entities {} | observations {} | skipped unchanged {}",
connector.connector_name,
connector.records_read,
connector.entities_upserted,
connector.observations_added,
connector.skipped_unchanged_sources
));
}
}
lines.join("\n")
}
fn format_graph_recall_human(
entries: &[session::ContextGraphRecallEntry],
session_id: Option<&str>,
@@ -8233,6 +8358,37 @@ mod tests {
}
}
#[test]
fn cli_parses_migrate_import_memory_command() {
let cli = Cli::try_parse_from([
"ecc",
"migrate",
"import-memory",
"--source",
"/tmp/hermes",
"--limit",
"24",
"--json",
])
.expect("migrate import-memory should parse");
match cli.command {
Some(Commands::Migrate {
command:
MigrationCommands::ImportMemory {
source,
limit,
json,
},
}) => {
assert_eq!(source, PathBuf::from("/tmp/hermes"));
assert_eq!(limit, 24);
assert!(json);
}
_ => panic!("expected migrate import-memory subcommand"),
}
}
#[test]
fn legacy_migration_audit_report_maps_detected_artifacts() -> Result<()> {
let tempdir = TestDir::new("legacy-migration-audit")?;
@@ -8490,6 +8646,81 @@ mod tests {
Ok(())
}
#[test]
fn import_legacy_memory_imports_workspace_markdown_and_jsonl() -> Result<()> {
let tempdir = TestDir::new("legacy-memory-import")?;
let root = tempdir.path();
fs::create_dir_all(root.join("workspace/notes"))?;
fs::create_dir_all(root.join("workspace/memory"))?;
fs::write(
root.join("workspace/notes/recovery.md"),
r#"# Billing incident
Customer wiped setup and got charged twice after reinstalling.
## Portal routing
Route existing installs to portal first before checkout.
"#,
)?;
fs::write(
root.join("workspace/memory/hermes.jsonl"),
[
serde_json::json!({
"entity_name": "Billing recovery checklist",
"summary": "Use portal-first routing before offering checkout again"
})
.to_string(),
serde_json::json!({
"entity_name": "Repair before reinstall",
"summary": "Recommend ecc repair before purchase flows"
})
.to_string(),
]
.join("\n"),
)?;
let tempdb = TestDir::new("legacy-memory-import-db")?;
let db = StateStore::open(&tempdb.path().join("state.db"))?;
let report = import_legacy_memory(&db, &config::Config::default(), root, 10)?;
assert_eq!(report.connectors_detected, 2);
assert_eq!(report.report.connectors_synced, 2);
assert_eq!(report.report.records_read, 4);
assert_eq!(report.report.entities_upserted, 4);
assert_eq!(report.report.observations_added, 4);
let recalled = db.recall_context_entities(None, "charged twice portal reinstall", 10)?;
assert!(recalled
.iter()
.any(|entry| entry.entity.name == "Billing incident"));
assert!(recalled
.iter()
.any(|entry| entry.entity.name == "Billing recovery checklist"));
assert!(recalled
.iter()
.any(|entry| entry.entity.name == "Repair before reinstall"));
Ok(())
}
#[test]
fn import_legacy_memory_reports_no_workspace_connectors_when_absent() -> Result<()> {
let tempdir = TestDir::new("legacy-memory-import-empty")?;
let root = tempdir.path();
fs::create_dir_all(root.join("skills"))?;
let tempdb = TestDir::new("legacy-memory-import-empty-db")?;
let db = StateStore::open(&tempdb.path().join("state.db"))?;
let report = import_legacy_memory(&db, &config::Config::default(), root, 10)?;
assert_eq!(report.connectors_detected, 0);
assert_eq!(report.report.connectors_synced, 0);
assert_eq!(report.report.records_read, 0);
assert_eq!(report.report.entities_upserted, 0);
assert_eq!(report.report.observations_added, 0);
Ok(())
}
#[test]
fn legacy_migration_scaffold_writes_plan_and_config_files() -> Result<()> {
let tempdir = TestDir::new("legacy-migration-scaffold")?;