feat: add ecc2 legacy schedule migration import

This commit is contained in:
Affaan Mustafa
2026-04-10 11:06:14 -07:00
parent 046af44065
commit 790cb0205c
3 changed files with 699 additions and 6 deletions

View File

@@ -186,6 +186,9 @@ It is mostly:
ECC 2.0 now ships a bounded migration audit entrypoint:
- `ecc migrate audit --source ~/.hermes`
- `ecc migrate plan --source ~/.hermes --output migration-plan.md`
- `ecc migrate scaffold --source ~/.hermes --output-dir migration-artifacts`
- `ecc migrate import-schedules --source ~/.hermes --dry-run`
Use that first to inventory the legacy workspace and map detected surfaces onto the current ECC2 scheduler, remote dispatch, memory graph, templates, and manual-translation lanes.

View File

@@ -83,6 +83,7 @@ These stay local and should be configured per operator:
## Suggested Bring-Up Order
0. Run `ecc migrate audit --source ~/.hermes` first to inventory the legacy workspace and see which parts already map onto ECC2.
0.5. Generate and review artifacts with `ecc migrate plan` / `ecc migrate scaffold`, then preview recurring jobs with `ecc migrate import-schedules --dry-run`.
1. Install ECC and verify the baseline harness setup.
2. Install Hermes and point it at ECC-imported skills.
3. Register the MCP servers you actually use every day.

View File

@@ -608,6 +608,18 @@ enum MigrationCommands {
#[arg(long)]
json: bool,
},
/// Import recurring jobs from a legacy cron/jobs.json into ECC2 schedules
ImportSchedules {
/// Path to the legacy Hermes/OpenClaw workspace root
#[arg(long)]
source: PathBuf,
/// Preview detected jobs without creating ECC2 schedules
#[arg(long)]
dry_run: bool,
/// Emit machine-readable JSON instead of the human summary
#[arg(long)]
json: bool,
},
}
#[derive(clap::Subcommand, Debug)]
@@ -966,6 +978,47 @@ struct LegacyMigrationScaffoldReport {
steps_scaffolded: usize,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
enum LegacyScheduleImportJobStatus {
Ready,
Imported,
Disabled,
Invalid,
Skipped,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct LegacyScheduleImportJobReport {
source_path: String,
job_name: String,
cron_expr: Option<String>,
task: Option<String>,
agent: Option<String>,
profile: Option<String>,
project: Option<String>,
task_group: Option<String>,
use_worktree: Option<bool>,
status: LegacyScheduleImportJobStatus,
reason: Option<String>,
command_snippet: Option<String>,
imported_schedule_id: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct LegacyScheduleImportReport {
source: String,
source_path: String,
dry_run: bool,
jobs_detected: usize,
ready_jobs: usize,
imported_jobs: usize,
disabled_jobs: usize,
invalid_jobs: usize,
skipped_jobs: usize,
jobs: Vec<LegacyScheduleImportJobReport>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
struct RemoteDispatchHttpRequest {
task: String,
@@ -1735,6 +1788,18 @@ async fn main() -> Result<()> {
println!("{}", format_legacy_migration_scaffold_human(&report));
}
}
MigrationCommands::ImportSchedules {
source,
dry_run,
json,
} => {
let report = import_legacy_schedules(&db, &cfg, &source, dry_run)?;
if json {
println!("{}", serde_json::to_string_pretty(&report)?);
} else {
println!("{}", format_legacy_schedule_import_human(&report));
}
}
},
Some(Commands::Graph { command }) => match command {
GraphCommands::AddEntity {
@@ -4882,10 +4947,362 @@ fn build_legacy_migration_next_steps(artifacts: &[LegacyMigrationArtifact]) -> V
steps
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct LegacyScheduleDraft {
source_path: String,
job_name: String,
cron_expr: Option<String>,
task: Option<String>,
agent: Option<String>,
profile: Option<String>,
project: Option<String>,
task_group: Option<String>,
use_worktree: Option<bool>,
enabled: bool,
}
fn load_legacy_schedule_drafts(source: &Path) -> Result<Vec<LegacyScheduleDraft>> {
let jobs_path = source.join("cron/jobs.json");
if !jobs_path.is_file() {
return Ok(Vec::new());
}
let text = fs::read_to_string(&jobs_path)
.with_context(|| format!("read legacy scheduler jobs: {}", jobs_path.display()))?;
let value: serde_json::Value = serde_json::from_str(&text)
.with_context(|| format!("parse legacy scheduler jobs JSON: {}", jobs_path.display()))?;
let source_path = jobs_path
.strip_prefix(source)
.unwrap_or(&jobs_path)
.display()
.to_string();
let entries: Vec<&serde_json::Value> = match &value {
serde_json::Value::Array(items) => items.iter().collect(),
serde_json::Value::Object(map) => {
if let Some(items) = ["jobs", "schedules", "tasks"]
.iter()
.find_map(|key| map.get(*key).and_then(serde_json::Value::as_array))
{
items.iter().collect()
} else {
vec![&value]
}
}
_ => anyhow::bail!(
"legacy scheduler jobs file must be a JSON object or array: {}",
jobs_path.display()
),
};
Ok(entries
.into_iter()
.enumerate()
.map(|(index, value)| build_legacy_schedule_draft(value, index, &source_path))
.collect())
}
fn build_legacy_schedule_draft(
value: &serde_json::Value,
index: usize,
source_path: &str,
) -> LegacyScheduleDraft {
let job_name = json_string_candidates(
value,
&[
&["name"],
&["id"],
&["title"],
&["job_name"],
&["task_name"],
],
)
.unwrap_or_else(|| format!("legacy-job-{}", index + 1));
let cron_expr = json_string_candidates(
value,
&[
&["cron"],
&["schedule"],
&["cron_expr"],
&["trigger", "cron"],
&["timing", "cron"],
],
);
let task = json_string_candidates(
value,
&[
&["task"],
&["prompt"],
&["goal"],
&["description"],
&["command"],
&["task", "prompt"],
&["task", "description"],
],
);
let enabled = !json_bool_candidates(value, &[&["disabled"]]).unwrap_or(false)
&& json_bool_candidates(value, &[&["enabled"], &["active"]]).unwrap_or(true);
LegacyScheduleDraft {
source_path: source_path.to_string(),
job_name,
cron_expr,
task,
agent: json_string_candidates(value, &[&["agent"], &["runner"]]),
profile: json_string_candidates(value, &[&["profile"], &["agent_profile"]]),
project: json_string_candidates(value, &[&["project"]]),
task_group: json_string_candidates(value, &[&["task_group"], &["group"]]),
use_worktree: json_bool_candidates(value, &[&["use_worktree"], &["worktree"]]),
enabled,
}
}
fn json_string_candidates(value: &serde_json::Value, paths: &[&[&str]]) -> Option<String> {
paths
.iter()
.find_map(|path| json_lookup(value, path))
.and_then(json_to_string)
}
fn json_bool_candidates(value: &serde_json::Value, paths: &[&[&str]]) -> Option<bool> {
paths.iter().find_map(|path| {
json_lookup(value, path).and_then(|value| match value {
serde_json::Value::Bool(boolean) => Some(*boolean),
serde_json::Value::String(text) => match text.trim().to_ascii_lowercase().as_str() {
"true" | "1" | "yes" | "on" => Some(true),
"false" | "0" | "no" | "off" => Some(false),
_ => None,
},
_ => None,
})
})
}
fn json_lookup<'a>(value: &'a serde_json::Value, path: &[&str]) -> Option<&'a serde_json::Value> {
let mut current = value;
for segment in path {
current = current.get(*segment)?;
}
Some(current)
}
fn json_to_string(value: &serde_json::Value) -> Option<String> {
match value {
serde_json::Value::String(text) => {
let trimmed = text.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
serde_json::Value::Number(number) => Some(number.to_string()),
_ => None,
}
}
fn shell_quote_double(value: &str) -> String {
format!(
"\"{}\"",
value
.replace('\\', "\\\\")
.replace('"', "\\\"")
.replace('\n', "\\n")
)
}
fn validate_schedule_cron_expr(expr: &str) -> Result<()> {
let trimmed = expr.trim();
let normalized = match trimmed.split_whitespace().count() {
5 => format!("0 {trimmed}"),
6 | 7 => trimmed.to_string(),
fields => {
anyhow::bail!(
"invalid cron expression `{trimmed}`: expected 5, 6, or 7 fields but found {fields}"
)
}
};
<cron::Schedule as std::str::FromStr>::from_str(&normalized)
.with_context(|| format!("invalid cron expression `{trimmed}`"))?;
Ok(())
}
fn build_legacy_schedule_add_command(draft: &LegacyScheduleDraft) -> Option<String> {
let cron_expr = draft.cron_expr.as_deref()?;
let task = draft.task.as_deref()?;
let mut parts = vec![
"ecc schedule add".to_string(),
format!("--cron {}", shell_quote_double(cron_expr)),
format!("--task {}", shell_quote_double(task)),
];
if let Some(agent) = draft.agent.as_deref() {
parts.push(format!("--agent {}", shell_quote_double(agent)));
}
if let Some(profile) = draft.profile.as_deref() {
parts.push(format!("--profile {}", shell_quote_double(profile)));
}
match draft.use_worktree {
Some(true) => parts.push("--worktree".to_string()),
Some(false) => parts.push("--no-worktree".to_string()),
None => {}
}
if let Some(project) = draft.project.as_deref() {
parts.push(format!("--project {}", shell_quote_double(project)));
}
if let Some(task_group) = draft.task_group.as_deref() {
parts.push(format!("--task-group {}", shell_quote_double(task_group)));
}
Some(parts.join(" "))
}
fn import_legacy_schedules(
db: &session::store::StateStore,
cfg: &config::Config,
source: &Path,
dry_run: bool,
) -> Result<LegacyScheduleImportReport> {
let source = source
.canonicalize()
.with_context(|| format!("Legacy workspace not found: {}", source.display()))?;
if !source.is_dir() {
anyhow::bail!(
"Legacy workspace source must be a directory: {}",
source.display()
);
}
let drafts = load_legacy_schedule_drafts(&source)?;
let source_path = source.join("cron/jobs.json");
let source_path = source_path
.strip_prefix(&source)
.unwrap_or(&source_path)
.display()
.to_string();
let mut report = LegacyScheduleImportReport {
source: source.display().to_string(),
source_path,
dry_run,
jobs_detected: drafts.len(),
ready_jobs: 0,
imported_jobs: 0,
disabled_jobs: 0,
invalid_jobs: 0,
skipped_jobs: 0,
jobs: Vec::new(),
};
for draft in drafts {
let mut item = LegacyScheduleImportJobReport {
source_path: draft.source_path.clone(),
job_name: draft.job_name.clone(),
cron_expr: draft.cron_expr.clone(),
task: draft.task.clone(),
agent: draft.agent.clone(),
profile: draft.profile.clone(),
project: draft.project.clone(),
task_group: draft.task_group.clone(),
use_worktree: draft.use_worktree,
status: LegacyScheduleImportJobStatus::Ready,
reason: None,
command_snippet: build_legacy_schedule_add_command(&draft),
imported_schedule_id: None,
};
if !draft.enabled {
item.status = LegacyScheduleImportJobStatus::Disabled;
item.reason = Some("disabled in legacy workspace".to_string());
report.disabled_jobs += 1;
report.jobs.push(item);
continue;
}
let cron_expr = match draft.cron_expr.as_deref() {
Some(value) => value,
None => {
item.status = LegacyScheduleImportJobStatus::Invalid;
item.reason = Some("missing cron expression".to_string());
report.invalid_jobs += 1;
report.jobs.push(item);
continue;
}
};
let task = match draft.task.as_deref() {
Some(value) => value,
None => {
item.status = LegacyScheduleImportJobStatus::Invalid;
item.reason = Some("missing task/prompt".to_string());
report.invalid_jobs += 1;
report.jobs.push(item);
continue;
}
};
if let Err(error) = validate_schedule_cron_expr(cron_expr) {
item.status = LegacyScheduleImportJobStatus::Invalid;
item.reason = Some(error.to_string());
report.invalid_jobs += 1;
report.jobs.push(item);
continue;
}
if let Some(profile) = draft.profile.as_deref() {
if let Err(error) = cfg.resolve_agent_profile(profile) {
item.status = LegacyScheduleImportJobStatus::Skipped;
item.reason = Some(format!("profile `{profile}` is not usable here: {error}"));
report.skipped_jobs += 1;
report.jobs.push(item);
continue;
}
}
report.ready_jobs += 1;
if dry_run {
report.jobs.push(item);
continue;
}
let schedule = session::manager::create_scheduled_task(
db,
cfg,
cron_expr,
task,
draft.agent.as_deref().unwrap_or(&cfg.default_agent),
draft.profile.as_deref(),
draft.use_worktree.unwrap_or(cfg.auto_create_worktrees),
session::SessionGrouping {
project: draft.project.clone(),
task_group: draft.task_group.clone(),
},
)?;
item.status = LegacyScheduleImportJobStatus::Imported;
item.imported_schedule_id = Some(schedule.id);
report.imported_jobs += 1;
report.jobs.push(item);
}
Ok(report)
}
fn build_legacy_migration_plan_report(
audit: &LegacyMigrationAuditReport,
) -> LegacyMigrationPlanReport {
let mut steps = Vec::new();
let legacy_schedule_drafts =
load_legacy_schedule_drafts(Path::new(&audit.source)).unwrap_or_default();
let schedule_commands = legacy_schedule_drafts
.iter()
.filter(|draft| draft.enabled)
.filter_map(build_legacy_schedule_add_command)
.collect::<Vec<_>>();
let disabled_schedule_jobs = legacy_schedule_drafts
.iter()
.filter(|draft| !draft.enabled)
.count();
let invalid_schedule_jobs = legacy_schedule_drafts
.iter()
.filter(|draft| draft.enabled && (draft.cron_expr.is_none() || draft.task.is_none()))
.count();
for artifact in &audit.artifacts {
let step = match artifact.category.as_str() {
@@ -4895,13 +5312,39 @@ fn build_legacy_migration_plan_report(
title: "Recreate Hermes/OpenClaw recurring jobs in ECC2 scheduler".to_string(),
target_surface: "ECC2 scheduler".to_string(),
source_paths: artifact.source_paths.clone(),
command_snippets: vec![
"ecc schedule add --cron \"<legacy-cron>\" --task \"Translate legacy recurring job from cron/scheduler.py\"".to_string(),
"ecc schedule list".to_string(),
"ecc daemon".to_string(),
],
command_snippets: if schedule_commands.is_empty() {
vec![
"ecc schedule add --cron \"<legacy-cron>\" --task \"Translate legacy recurring job from cron/scheduler.py\"".to_string(),
"ecc schedule list".to_string(),
"ecc daemon".to_string(),
]
} else {
let mut commands = schedule_commands.clone();
commands.push("ecc schedule list".to_string());
commands.push("ecc daemon".to_string());
commands
},
config_snippets: Vec::new(),
notes: artifact.notes.clone(),
notes: {
let mut notes = artifact.notes.clone();
if !schedule_commands.is_empty() {
notes.push(format!(
"Recovered {} concrete recurring job(s) from cron/jobs.json.",
schedule_commands.len()
));
}
if disabled_schedule_jobs > 0 {
notes.push(format!(
"{disabled_schedule_jobs} legacy recurring job(s) are disabled and were left out of generated ECC2 commands."
));
}
if invalid_schedule_jobs > 0 {
notes.push(format!(
"{invalid_schedule_jobs} legacy recurring job(s) were missing cron/task fields and still need manual translation."
));
}
notes
},
},
"gateway_dispatch" => LegacyMigrationPlanStep {
category: artifact.category.clone(),
@@ -5195,6 +5638,64 @@ fn format_legacy_migration_scaffold_human(report: &LegacyMigrationScaffoldReport
lines.join("\n")
}
fn format_legacy_schedule_import_human(report: &LegacyScheduleImportReport) -> String {
let mut lines = vec![
format!(
"Legacy schedule import {} for {}",
if report.dry_run {
"preview"
} else {
"complete"
},
report.source
),
format!("- source path {}", report.source_path),
format!("- jobs detected {}", report.jobs_detected),
format!("- ready jobs {}", report.ready_jobs),
format!("- imported jobs {}", report.imported_jobs),
format!("- disabled jobs {}", report.disabled_jobs),
format!("- invalid jobs {}", report.invalid_jobs),
format!("- skipped jobs {}", report.skipped_jobs),
];
if report.jobs.is_empty() {
lines.push("- no importable cron/jobs.json entries were found".to_string());
return lines.join("\n");
}
lines.push("Jobs".to_string());
for job in &report.jobs {
lines.push(format!(
"- {} [{}]",
job.job_name,
match job.status {
LegacyScheduleImportJobStatus::Ready => "ready",
LegacyScheduleImportJobStatus::Imported => "imported",
LegacyScheduleImportJobStatus::Disabled => "disabled",
LegacyScheduleImportJobStatus::Invalid => "invalid",
LegacyScheduleImportJobStatus::Skipped => "skipped",
}
));
if let Some(cron_expr) = job.cron_expr.as_deref() {
lines.push(format!(" cron {}", cron_expr));
}
if let Some(task) = job.task.as_deref() {
lines.push(format!(" task {}", task));
}
if let Some(command) = job.command_snippet.as_deref() {
lines.push(format!(" command {}", command));
}
if let Some(schedule_id) = job.imported_schedule_id {
lines.push(format!(" schedule {}", schedule_id));
}
if let Some(reason) = job.reason.as_deref() {
lines.push(format!(" note {}", reason));
}
}
lines.join("\n")
}
fn format_graph_recall_human(
entries: &[session::ContextGraphRecallEntry],
session_id: Option<&str>,
@@ -7702,6 +8203,36 @@ mod tests {
}
}
#[test]
fn cli_parses_migrate_import_schedules_command() {
let cli = Cli::try_parse_from([
"ecc",
"migrate",
"import-schedules",
"--source",
"/tmp/hermes",
"--dry-run",
"--json",
])
.expect("migrate import-schedules should parse");
match cli.command {
Some(Commands::Migrate {
command:
MigrationCommands::ImportSchedules {
source,
dry_run,
json,
},
}) => {
assert_eq!(source, PathBuf::from("/tmp/hermes"));
assert!(dry_run);
assert!(json);
}
_ => panic!("expected migrate import-schedules subcommand"),
}
}
#[test]
fn legacy_migration_audit_report_maps_detected_artifacts() -> Result<()> {
let tempdir = TestDir::new("legacy-migration-audit")?;
@@ -7773,8 +8304,32 @@ mod tests {
fn legacy_migration_plan_report_generates_workspace_connector_step() -> Result<()> {
let tempdir = TestDir::new("legacy-migration-plan")?;
let root = tempdir.path();
fs::create_dir_all(root.join("cron"))?;
fs::create_dir_all(root.join("workspace/notes"))?;
fs::write(root.join("config.yaml"), "model: claude\n")?;
fs::write(
root.join("cron/jobs.json"),
serde_json::json!({
"jobs": [
{
"name": "portal-recovery",
"cron": "*/15 * * * *",
"prompt": "Check portal-first recovery flow",
"agent": "codex",
"project": "billing-web",
"task_group": "recovery",
"use_worktree": false
},
{
"name": "paused-job",
"cron": "0 12 * * *",
"prompt": "This one stays paused",
"disabled": true
}
]
})
.to_string(),
)?;
fs::write(root.join("workspace/notes/recovery.md"), "# recovery\n")?;
let audit = build_legacy_migration_audit_report(root)?;
@@ -7794,6 +8349,24 @@ mod tests {
.command_snippets
.contains(&"ecc graph connector-sync hermes_workspace".to_string()));
let scheduler_step = plan
.steps
.iter()
.find(|step| step.category == "scheduler")
.expect("scheduler step");
assert!(scheduler_step
.command_snippets
.iter()
.any(|command| command.contains("ecc schedule add --cron \"*/15 * * * *\"")));
assert!(!scheduler_step
.command_snippets
.iter()
.any(|command| command.contains("<legacy-cron>")));
assert!(scheduler_step
.notes
.iter()
.any(|note| note.contains("disabled")));
let rendered = format_legacy_migration_plan_human(&plan);
assert!(rendered.contains("Legacy migration plan"));
assert!(rendered.contains("Import sanitized workspace memory through ECC2 connectors"));
@@ -7801,6 +8374,122 @@ mod tests {
Ok(())
}
#[test]
fn import_legacy_schedules_dry_run_reports_ready_disabled_and_invalid_jobs() -> Result<()> {
let tempdir = TestDir::new("legacy-schedule-import-dry-run")?;
let root = tempdir.path();
fs::create_dir_all(root.join("cron"))?;
fs::write(
root.join("cron/jobs.json"),
serde_json::json!({
"jobs": [
{
"name": "portal-recovery",
"cron": "*/15 * * * *",
"prompt": "Check portal-first recovery flow",
"agent": "codex",
"project": "billing-web",
"task_group": "recovery",
"use_worktree": false
},
{
"name": "paused-job",
"cron": "0 12 * * *",
"prompt": "This one stays paused",
"disabled": true
},
{
"name": "broken-job",
"prompt": "Missing cron"
}
]
})
.to_string(),
)?;
let tempdb = TestDir::new("legacy-schedule-import-dry-run-db")?;
let db = StateStore::open(&tempdb.path().join("state.db"))?;
let report = import_legacy_schedules(&db, &config::Config::default(), root, true)?;
assert!(report.dry_run);
assert_eq!(report.jobs_detected, 3);
assert_eq!(report.ready_jobs, 1);
assert_eq!(report.imported_jobs, 0);
assert_eq!(report.disabled_jobs, 1);
assert_eq!(report.invalid_jobs, 1);
assert_eq!(report.skipped_jobs, 0);
assert_eq!(report.jobs.len(), 3);
assert!(report
.jobs
.iter()
.any(|job| job.command_snippet.as_deref() == Some("ecc schedule add --cron \"*/15 * * * *\" --task \"Check portal-first recovery flow\" --agent \"codex\" --no-worktree --project \"billing-web\" --task-group \"recovery\"")));
Ok(())
}
#[test]
fn import_legacy_schedules_creates_real_ecc2_schedules() -> Result<()> {
let tempdir = TestDir::new("legacy-schedule-import-live")?;
let root = tempdir.path();
fs::create_dir_all(root.join("cron"))?;
fs::write(
root.join("cron/jobs.json"),
serde_json::json!({
"jobs": [
{
"name": "portal-recovery",
"cron": "*/15 * * * *",
"prompt": "Check portal-first recovery flow",
"agent": "codex",
"project": "billing-web",
"task_group": "recovery",
"use_worktree": false
}
]
})
.to_string(),
)?;
let target_repo = tempdir.path().join("target");
fs::create_dir_all(&target_repo)?;
fs::write(target_repo.join(".gitignore"), "target\n")?;
let tempdb = TestDir::new("legacy-schedule-import-live-db")?;
let db = StateStore::open(&tempdb.path().join("state.db"))?;
struct CurrentDirGuard(PathBuf);
impl Drop for CurrentDirGuard {
fn drop(&mut self) {
let _ = std::env::set_current_dir(&self.0);
}
}
let _cwd_guard = CurrentDirGuard(std::env::current_dir()?);
std::env::set_current_dir(&target_repo)?;
let report = import_legacy_schedules(&db, &config::Config::default(), root, false)?;
assert!(!report.dry_run);
assert_eq!(report.ready_jobs, 1);
assert_eq!(report.imported_jobs, 1);
assert_eq!(
report.jobs[0].status,
LegacyScheduleImportJobStatus::Imported
);
assert!(report.jobs[0].imported_schedule_id.is_some());
let schedules = db.list_scheduled_tasks()?;
assert_eq!(schedules.len(), 1);
assert_eq!(schedules[0].task, "Check portal-first recovery flow");
assert_eq!(schedules[0].agent_type, "codex");
assert_eq!(schedules[0].project, "billing-web");
assert_eq!(schedules[0].task_group, "recovery");
assert!(!schedules[0].use_worktree);
assert_eq!(
schedules[0].working_dir.canonicalize()?,
target_repo.canonicalize()?
);
Ok(())
}
#[test]
fn legacy_migration_scaffold_writes_plan_and_config_files() -> Result<()> {
let tempdir = TestDir::new("legacy-migration-scaffold")?;