From e48468a9e79a70c0e80ae85948280af1405f2084 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Thu, 9 Apr 2026 22:20:35 -0700 Subject: [PATCH] feat: add ecc2 conflict resolution protocol --- ecc2/src/config/mod.rs | 69 ++++++- ecc2/src/session/manager.rs | 365 +++++++++++++++++++++++++++++++++++- ecc2/src/session/store.rs | 302 +++++++++++++++++++++++++++++ ecc2/src/tui/dashboard.rs | 240 ++++++++++++++++++++++-- 4 files changed, 961 insertions(+), 15 deletions(-) diff --git a/ecc2/src/config/mod.rs b/ecc2/src/config/mod.rs index 8d8bbe62..ffe9cdbc 100644 --- a/ecc2/src/config/mod.rs +++ b/ecc2/src/config/mod.rs @@ -32,6 +32,22 @@ pub struct BudgetAlertThresholds { pub critical: f64, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ConflictResolutionStrategy { + Escalate, + LastWriteWins, + Merge, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] +pub struct ConflictResolutionConfig { + pub enabled: bool, + pub strategy: ConflictResolutionStrategy, + pub notify_lead: bool, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct Config { @@ -55,6 +71,7 @@ pub struct Config { pub cost_budget_usd: f64, pub token_budget: u64, pub budget_alert_thresholds: BudgetAlertThresholds, + pub conflict_resolution: ConflictResolutionConfig, pub theme: Theme, pub pane_layout: PaneLayout, pub pane_navigation: PaneNavigationConfig, @@ -115,6 +132,7 @@ impl Default for Config { cost_budget_usd: 10.0, token_budget: 500_000, budget_alert_thresholds: Self::BUDGET_ALERT_THRESHOLDS, + conflict_resolution: ConflictResolutionConfig::default(), theme: Theme::Dark, pane_layout: PaneLayout::Horizontal, pane_navigation: PaneNavigationConfig::default(), @@ -403,6 +421,22 @@ impl Default for BudgetAlertThresholds { } } +impl Default for ConflictResolutionStrategy { + fn default() -> Self { + Self::Escalate + } +} + +impl Default for ConflictResolutionConfig { + fn default() -> Self { + Self { + enabled: true, + strategy: ConflictResolutionStrategy::Escalate, + notify_lead: true, + } + } +} + impl BudgetAlertThresholds { pub fn sanitized(self) -> Self { let values = [self.advisory, self.warning, self.critical]; @@ -422,7 +456,10 @@ impl BudgetAlertThresholds { #[cfg(test)] mod tests { - use super::{BudgetAlertThresholds, Config, PaneLayout}; + use super::{ + BudgetAlertThresholds, Config, ConflictResolutionConfig, ConflictResolutionStrategy, + PaneLayout, + }; use crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; use uuid::Uuid; @@ -466,6 +503,7 @@ theme = "Dark" config.budget_alert_thresholds, defaults.budget_alert_thresholds ); + assert_eq!(config.conflict_resolution, defaults.conflict_resolution); assert_eq!(config.pane_layout, defaults.pane_layout); assert_eq!(config.pane_navigation, defaults.pane_navigation); assert_eq!( @@ -746,6 +784,28 @@ end_hour = 7 assert_eq!(config.desktop_notifications.quiet_hours.end_hour, 7); } + #[test] + fn conflict_resolution_deserializes_from_toml() { + let config: Config = toml::from_str( + r#" +[conflict_resolution] +enabled = true +strategy = "last_write_wins" +notify_lead = false +"#, + ) + .unwrap(); + + assert_eq!( + config.conflict_resolution, + ConflictResolutionConfig { + enabled: true, + strategy: ConflictResolutionStrategy::LastWriteWins, + notify_lead: false, + } + ); + } + #[test] fn completion_summary_notifications_deserialize_from_toml() { let config: Config = toml::from_str( @@ -843,6 +903,8 @@ critical = 1.10 warning: 0.70, critical: 0.88, }; + config.conflict_resolution.strategy = ConflictResolutionStrategy::Merge; + config.conflict_resolution.notify_lead = false; config.pane_navigation.focus_metrics = "e".to_string(); config.pane_navigation.move_right = "d".to_string(); config.linear_pane_size_percent = 42; @@ -879,6 +941,11 @@ critical = 1.10 critical: 0.88, } ); + assert_eq!( + loaded.conflict_resolution.strategy, + ConflictResolutionStrategy::Merge + ); + assert!(!loaded.conflict_resolution.notify_lead); assert_eq!(loaded.pane_navigation.focus_metrics, "e"); assert_eq!(loaded.pane_navigation.move_right, "d"); assert_eq!(loaded.linear_pane_size_percent, 42); diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index ef96d26b..8311a4f2 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -1,6 +1,6 @@ use anyhow::{Context, Result}; use serde::Serialize; -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt; use std::path::{Path, PathBuf}; use std::process::Stdio; @@ -510,6 +510,224 @@ pub fn enforce_budget_hard_limits( Ok(outcome) } +#[derive(Debug, Clone, Default, Serialize, PartialEq)] +pub struct ConflictEnforcementOutcome { + pub strategy: crate::config::ConflictResolutionStrategy, + pub created_incidents: usize, + pub resolved_incidents: usize, + pub paused_sessions: Vec, +} + +pub fn enforce_conflict_resolution( + db: &StateStore, + cfg: &Config, +) -> Result { + let mut outcome = ConflictEnforcementOutcome { + strategy: cfg.conflict_resolution.strategy, + created_incidents: 0, + resolved_incidents: 0, + paused_sessions: Vec::new(), + }; + + if !cfg.conflict_resolution.enabled { + return Ok(outcome); + } + + let sessions = db.list_sessions()?; + let sessions_by_id: HashMap<_, _> = sessions + .iter() + .cloned() + .map(|session| (session.id.clone(), session)) + .collect(); + + let active_sessions: Vec<_> = sessions + .into_iter() + .filter(|session| { + matches!( + session.state, + SessionState::Pending + | SessionState::Running + | SessionState::Idle + | SessionState::Stale + ) + }) + .collect(); + + let mut latest_activity_by_path: BTreeMap> = + BTreeMap::new(); + for session in &active_sessions { + let mut seen_paths = HashSet::new(); + for entry in db.list_file_activity(&session.id, 64)? { + if seen_paths.insert(entry.path.clone()) { + latest_activity_by_path + .entry(entry.path.clone()) + .or_default() + .push(entry); + } + } + } + + let mut paused_once = HashSet::new(); + + for (path, mut entries) in latest_activity_by_path { + entries.retain(|entry| !matches!(entry.action, super::FileActivityAction::Read)); + if entries.len() < 2 { + continue; + } + + entries.sort_by_key(|entry| (entry.timestamp, entry.session_id.clone())); + let latest = entries.last().cloned().expect("entries is not empty"); + for other in entries[..entries.len() - 1].iter() { + let conflict_key = conflict_incident_key(&path, &latest.session_id, &other.session_id); + if db.has_open_conflict_incident(&conflict_key)? { + continue; + } + + let (active_session_id, paused_session_id, summary) = + choose_conflict_resolution(&path, &latest, other, cfg.conflict_resolution.strategy); + let (first_session_id, second_session_id, first_action, second_action) = + if latest.session_id <= other.session_id { + ( + latest.session_id.clone(), + other.session_id.clone(), + latest.action.clone(), + other.action.clone(), + ) + } else { + ( + other.session_id.clone(), + latest.session_id.clone(), + other.action.clone(), + latest.action.clone(), + ) + }; + + db.upsert_conflict_incident( + &conflict_key, + &path, + &first_session_id, + &second_session_id, + &active_session_id, + &paused_session_id, + &first_action, + &second_action, + conflict_strategy_label(cfg.conflict_resolution.strategy), + &summary, + )?; + + if paused_once.insert(paused_session_id.clone()) { + if let Some(session) = sessions_by_id.get(&paused_session_id) { + if matches!( + session.state, + SessionState::Pending + | SessionState::Running + | SessionState::Idle + | SessionState::Stale + ) { + stop_session_recorded(db, session, false)?; + outcome.paused_sessions.push(paused_session_id.clone()); + } + } + } + + comms::send( + db, + &active_session_id, + &paused_session_id, + &MessageType::Conflict { + file: path.clone(), + description: summary.clone(), + }, + )?; + + db.insert_decision( + &paused_session_id, + &format!("Pause work due to conflict on {path}"), + &[ + format!("Keep {active_session_id} active"), + "Continue concurrently".to_string(), + ], + &summary, + )?; + + if cfg.conflict_resolution.notify_lead { + if let Some(lead_session_id) = db.latest_task_handoff_source(&paused_session_id)? { + if lead_session_id != paused_session_id && lead_session_id != active_session_id + { + comms::send( + db, + &paused_session_id, + &lead_session_id, + &MessageType::Conflict { + file: path.clone(), + description: format!( + "{} | delegate {} paused", + summary, paused_session_id + ), + }, + )?; + } + } + } + + outcome.created_incidents += 1; + } + } + + Ok(outcome) +} + +fn conflict_incident_key(path: &str, session_a: &str, session_b: &str) -> String { + let (first, second) = if session_a <= session_b { + (session_a, session_b) + } else { + (session_b, session_a) + }; + format!("{path}::{first}::{second}") +} + +fn conflict_strategy_label(strategy: crate::config::ConflictResolutionStrategy) -> &'static str { + match strategy { + crate::config::ConflictResolutionStrategy::Escalate => "escalate", + crate::config::ConflictResolutionStrategy::LastWriteWins => "last_write_wins", + crate::config::ConflictResolutionStrategy::Merge => "merge", + } +} + +fn choose_conflict_resolution( + path: &str, + latest: &super::FileActivityEntry, + other: &super::FileActivityEntry, + strategy: crate::config::ConflictResolutionStrategy, +) -> (String, String, String) { + match strategy { + crate::config::ConflictResolutionStrategy::Escalate => ( + other.session_id.clone(), + latest.session_id.clone(), + format!( + "Escalated overlap on {path}; paused later session {} while {} stays active", + latest.session_id, other.session_id + ), + ), + crate::config::ConflictResolutionStrategy::LastWriteWins => ( + latest.session_id.clone(), + other.session_id.clone(), + format!( + "Applied last-write-wins on {path}; kept later session {} active and paused {}", + latest.session_id, other.session_id + ), + ), + crate::config::ConflictResolutionStrategy::Merge => ( + other.session_id.clone(), + latest.session_id.clone(), + format!( + "Queued manual merge on {path}; paused later session {} until merge review against {}", + latest.session_id, other.session_id + ), + ), + } +} + pub fn record_tool_call( db: &StateStore, session_id: &str, @@ -2428,6 +2646,7 @@ mod tests { cost_budget_usd: 10.0, token_budget: 500_000, budget_alert_thresholds: Config::BUDGET_ALERT_THRESHOLDS, + conflict_resolution: crate::config::ConflictResolutionConfig::default(), theme: Theme::Dark, pane_layout: PaneLayout::Horizontal, pane_navigation: Default::default(), @@ -4821,4 +5040,148 @@ mod tests { Ok(()) } + + #[test] + fn enforce_conflict_resolution_pauses_later_session_and_notifies_lead() -> Result<()> { + let tempdir = TestDir::new("manager-conflict-escalate")?; + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&build_session("lead", SessionState::Running, now))?; + db.insert_session(&build_session( + "session-a", + SessionState::Running, + now - Duration::minutes(2), + ))?; + db.insert_session(&build_session( + "session-b", + SessionState::Running, + now - Duration::minutes(1), + ))?; + + crate::comms::send( + &db, + "lead", + "session-b", + &crate::comms::MessageType::TaskHandoff { + task: "Review src/lib.rs".to_string(), + context: "Lead delegated follow-up".to_string(), + }, + )?; + + let metrics_dir = tempdir.path().join("metrics"); + std::fs::create_dir_all(&metrics_dir)?; + let metrics_path = metrics_dir.join("tool-usage.jsonl"); + std::fs::write( + &metrics_path, + concat!( + "{\"id\":\"evt-1\",\"session_id\":\"session-a\",\"tool_name\":\"Edit\",\"input_summary\":\"Edit src/lib.rs\",\"output_summary\":\"updated logic\",\"file_events\":[{\"path\":\"src/lib.rs\",\"action\":\"modify\"}],\"timestamp\":\"2026-04-09T00:02:00Z\"}\n", + "{\"id\":\"evt-2\",\"session_id\":\"session-b\",\"tool_name\":\"Write\",\"input_summary\":\"Write src/lib.rs\",\"output_summary\":\"newer change\",\"file_events\":[{\"path\":\"src/lib.rs\",\"action\":\"modify\"}],\"timestamp\":\"2026-04-09T00:03:00Z\"}\n" + ), + )?; + db.sync_tool_activity_metrics(&metrics_path)?; + + let outcome = enforce_conflict_resolution(&db, &cfg)?; + assert_eq!(outcome.created_incidents, 1); + assert_eq!(outcome.resolved_incidents, 0); + assert_eq!(outcome.paused_sessions, vec!["session-b".to_string()]); + + let session_a = db + .get_session("session-a")? + .expect("session-a should still exist"); + let session_b = db + .get_session("session-b")? + .expect("session-b should still exist"); + assert_eq!(session_a.state, SessionState::Running); + assert_eq!(session_b.state, SessionState::Stopped); + + assert!(db.has_open_conflict_incident("src/lib.rs::session-a::session-b")?); + + let decisions = db.list_decisions_for_session("session-b", 10)?; + assert!(decisions + .iter() + .any(|entry| entry.decision == "Pause work due to conflict on src/lib.rs")); + + let approval_counts = db.unread_approval_counts()?; + assert_eq!(approval_counts.get("session-b"), Some(&1usize)); + assert_eq!(approval_counts.get("lead"), Some(&1usize)); + + let unread_queue = db.unread_approval_queue(10)?; + assert!(unread_queue.iter().any(|msg| { + msg.to_session == "session-b" + && msg.msg_type == "conflict" + && msg.content.contains("src/lib.rs") + })); + assert!(unread_queue.iter().any(|msg| { + msg.to_session == "lead" + && msg.msg_type == "conflict" + && msg.content.contains("delegate session-b paused") + })); + + let second_pass = enforce_conflict_resolution(&db, &cfg)?; + assert_eq!(second_pass.created_incidents, 0); + assert_eq!(second_pass.paused_sessions, Vec::::new()); + assert_eq!( + db.list_open_conflict_incidents_for_session("session-b", 10)? + .len(), + 1 + ); + + Ok(()) + } + + #[test] + fn enforce_conflict_resolution_supports_last_write_wins() -> Result<()> { + let tempdir = TestDir::new("manager-conflict-last-write-wins")?; + let mut cfg = build_config(tempdir.path()); + cfg.conflict_resolution.strategy = crate::config::ConflictResolutionStrategy::LastWriteWins; + cfg.conflict_resolution.notify_lead = false; + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&build_session( + "session-a", + SessionState::Running, + now - Duration::minutes(2), + ))?; + db.insert_session(&build_session( + "session-b", + SessionState::Running, + now - Duration::minutes(1), + ))?; + + let metrics_dir = tempdir.path().join("metrics"); + std::fs::create_dir_all(&metrics_dir)?; + let metrics_path = metrics_dir.join("tool-usage.jsonl"); + std::fs::write( + &metrics_path, + concat!( + "{\"id\":\"evt-1\",\"session_id\":\"session-a\",\"tool_name\":\"Edit\",\"input_summary\":\"Edit src/lib.rs\",\"output_summary\":\"older change\",\"file_events\":[{\"path\":\"src/lib.rs\",\"action\":\"modify\"}],\"timestamp\":\"2026-04-09T00:02:00Z\"}\n", + "{\"id\":\"evt-2\",\"session_id\":\"session-b\",\"tool_name\":\"Edit\",\"input_summary\":\"Edit src/lib.rs\",\"output_summary\":\"later change\",\"file_events\":[{\"path\":\"src/lib.rs\",\"action\":\"modify\"}],\"timestamp\":\"2026-04-09T00:03:00Z\"}\n" + ), + )?; + db.sync_tool_activity_metrics(&metrics_path)?; + + let outcome = enforce_conflict_resolution(&db, &cfg)?; + assert_eq!(outcome.created_incidents, 1); + assert_eq!(outcome.paused_sessions, vec!["session-a".to_string()]); + + let session_a = db + .get_session("session-a")? + .expect("session-a should still exist"); + let session_b = db + .get_session("session-b")? + .expect("session-b should still exist"); + assert_eq!(session_a.state, SessionState::Stopped); + assert_eq!(session_b.state, SessionState::Running); + + let incidents = db.list_open_conflict_incidents_for_session("session-a", 10)?; + assert_eq!(incidents.len(), 1); + assert_eq!(incidents[0].active_session_id, "session-b"); + assert_eq!(incidents[0].paused_session_id, "session-a"); + assert_eq!(incidents[0].strategy, "last_write_wins"); + + Ok(()) + } } diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index af5ff0e2..b7029b57 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -39,6 +39,24 @@ pub struct FileActivityOverlap { pub timestamp: chrono::DateTime, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct ConflictIncident { + pub id: i64, + pub conflict_key: String, + pub path: String, + pub first_session_id: String, + pub second_session_id: String, + pub active_session_id: String, + pub paused_session_id: String, + pub first_action: FileActivityAction, + pub second_action: FileActivityAction, + pub strategy: String, + pub summary: String, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, + pub resolved_at: Option>, +} + #[derive(Debug, Clone, Default, Serialize)] pub struct DaemonActivity { pub last_dispatch_at: Option>, @@ -209,6 +227,23 @@ impl StateStore { requested_at TEXT NOT NULL ); + CREATE TABLE IF NOT EXISTS conflict_incidents ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + conflict_key TEXT NOT NULL UNIQUE, + path TEXT NOT NULL, + first_session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE, + second_session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE, + active_session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE, + paused_session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE, + first_action TEXT NOT NULL, + second_action TEXT NOT NULL, + strategy TEXT NOT NULL, + summary TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + resolved_at TEXT + ); + CREATE TABLE IF NOT EXISTS daemon_activity ( id INTEGER PRIMARY KEY CHECK(id = 1), last_dispatch_at TEXT, @@ -240,6 +275,8 @@ impl StateStore { ON session_output(session_id, id); CREATE INDEX IF NOT EXISTS idx_decision_log_session ON decision_log(session_id, timestamp, id); + CREATE INDEX IF NOT EXISTS idx_conflict_incidents_sessions + ON conflict_incidents(first_session_id, second_session_id, resolved_at, updated_at); CREATE INDEX IF NOT EXISTS idx_pending_worktree_queue_requested_at ON pending_worktree_queue(requested_at, session_id); @@ -2038,6 +2075,157 @@ impl StateStore { overlaps.truncate(limit); Ok(overlaps) } + + pub fn has_open_conflict_incident(&self, conflict_key: &str) -> Result { + let exists = self + .conn + .query_row( + "SELECT 1 + FROM conflict_incidents + WHERE conflict_key = ?1 AND resolved_at IS NULL + LIMIT 1", + rusqlite::params![conflict_key], + |_| Ok(()), + ) + .optional()? + .is_some(); + Ok(exists) + } + + #[allow(clippy::too_many_arguments)] + pub fn upsert_conflict_incident( + &self, + conflict_key: &str, + path: &str, + first_session_id: &str, + second_session_id: &str, + active_session_id: &str, + paused_session_id: &str, + first_action: &FileActivityAction, + second_action: &FileActivityAction, + strategy: &str, + summary: &str, + ) -> Result { + let now = chrono::Utc::now().to_rfc3339(); + self.conn.execute( + "INSERT INTO conflict_incidents ( + conflict_key, path, first_session_id, second_session_id, + active_session_id, paused_session_id, first_action, second_action, + strategy, summary, created_at, updated_at, resolved_at + ) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?11, NULL) + ON CONFLICT(conflict_key) DO UPDATE SET + path = excluded.path, + first_session_id = excluded.first_session_id, + second_session_id = excluded.second_session_id, + active_session_id = excluded.active_session_id, + paused_session_id = excluded.paused_session_id, + first_action = excluded.first_action, + second_action = excluded.second_action, + strategy = excluded.strategy, + summary = excluded.summary, + updated_at = excluded.updated_at, + resolved_at = NULL", + rusqlite::params![ + conflict_key, + path, + first_session_id, + second_session_id, + active_session_id, + paused_session_id, + file_activity_action_value(first_action), + file_activity_action_value(second_action), + strategy, + summary, + now, + ], + )?; + + self.conn + .query_row( + "SELECT id, conflict_key, path, first_session_id, second_session_id, + active_session_id, paused_session_id, first_action, second_action, + strategy, summary, created_at, updated_at, resolved_at + FROM conflict_incidents + WHERE conflict_key = ?1", + rusqlite::params![conflict_key], + map_conflict_incident, + ) + .map_err(Into::into) + } + + pub fn resolve_conflict_incidents_not_in( + &self, + active_keys: &HashSet, + ) -> Result { + let open = self.list_open_conflict_incidents(512)?; + let now = chrono::Utc::now().to_rfc3339(); + let mut resolved = 0; + + for incident in open { + if active_keys.contains(&incident.conflict_key) { + continue; + } + + resolved += self.conn.execute( + "UPDATE conflict_incidents + SET resolved_at = ?2, updated_at = ?2 + WHERE conflict_key = ?1 AND resolved_at IS NULL", + rusqlite::params![incident.conflict_key, now], + )?; + } + + Ok(resolved) + } + + pub fn list_open_conflict_incidents_for_session( + &self, + session_id: &str, + limit: usize, + ) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, conflict_key, path, first_session_id, second_session_id, + active_session_id, paused_session_id, first_action, second_action, + strategy, summary, created_at, updated_at, resolved_at + FROM conflict_incidents + WHERE resolved_at IS NULL + AND ( + first_session_id = ?1 + OR second_session_id = ?1 + OR active_session_id = ?1 + OR paused_session_id = ?1 + ) + ORDER BY updated_at DESC, id DESC + LIMIT ?2", + )?; + + let incidents = stmt + .query_map( + rusqlite::params![session_id, limit as i64], + map_conflict_incident, + )? + .collect::, _>>() + .map_err(anyhow::Error::from)?; + Ok(incidents) + } + + fn list_open_conflict_incidents(&self, limit: usize) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, conflict_key, path, first_session_id, second_session_id, + active_session_id, paused_session_id, first_action, second_action, + strategy, summary, created_at, updated_at, resolved_at + FROM conflict_incidents + WHERE resolved_at IS NULL + ORDER BY updated_at DESC, id DESC + LIMIT ?1", + )?; + + let incidents = stmt + .query_map(rusqlite::params![limit as i64], map_conflict_incident)? + .collect::, _>>() + .map_err(anyhow::Error::from)?; + Ok(incidents) + } } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -2073,6 +2261,70 @@ fn parse_persisted_file_events(value: &str) -> Option> { Some(events) } +fn file_activity_action_value(action: &FileActivityAction) -> &'static str { + match action { + FileActivityAction::Read => "read", + FileActivityAction::Create => "create", + FileActivityAction::Modify => "modify", + FileActivityAction::Move => "move", + FileActivityAction::Delete => "delete", + FileActivityAction::Touch => "touch", + } +} + +fn map_conflict_incident(row: &rusqlite::Row<'_>) -> rusqlite::Result { + let created_at = parse_timestamp_column(row.get::<_, String>(11)?, 11)?; + let updated_at = parse_timestamp_column(row.get::<_, String>(12)?, 12)?; + let resolved_at = row + .get::<_, Option>(13)? + .map(|value| parse_timestamp_column(value, 13)) + .transpose()?; + + Ok(ConflictIncident { + id: row.get(0)?, + conflict_key: row.get(1)?, + path: row.get(2)?, + first_session_id: row.get(3)?, + second_session_id: row.get(4)?, + active_session_id: row.get(5)?, + paused_session_id: row.get(6)?, + first_action: parse_file_activity_action(&row.get::<_, String>(7)?).ok_or_else(|| { + rusqlite::Error::InvalidColumnType( + 7, + "first_action".into(), + rusqlite::types::Type::Text, + ) + })?, + second_action: parse_file_activity_action(&row.get::<_, String>(8)?).ok_or_else(|| { + rusqlite::Error::InvalidColumnType( + 8, + "second_action".into(), + rusqlite::types::Type::Text, + ) + })?, + strategy: row.get(9)?, + summary: row.get(10)?, + created_at, + updated_at, + resolved_at, + }) +} + +fn parse_timestamp_column( + value: String, + index: usize, +) -> rusqlite::Result> { + chrono::DateTime::parse_from_rfc3339(&value) + .map(|value| value.with_timezone(&chrono::Utc)) + .map_err(|error| { + rusqlite::Error::FromSqlConversionFailure( + index, + rusqlite::types::Type::Text, + Box::new(error), + ) + }) +} + fn parse_file_activity_action(value: &str) -> Option { match value.trim().to_ascii_lowercase().as_str() { "read" => Some(FileActivityAction::Read), @@ -2582,6 +2834,56 @@ mod tests { Ok(()) } + #[test] + fn conflict_incidents_upsert_and_resolve() -> Result<()> { + let tempdir = TestDir::new("store-conflict-incidents")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + let now = Utc::now(); + + for id in ["session-a", "session-b"] { + db.insert_session(&Session { + id: id.to_string(), + task: id.to_string(), + project: "workspace".to_string(), + task_group: "general".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: SessionMetrics::default(), + })?; + } + + let incident = db.upsert_conflict_incident( + "src/lib.rs::session-a::session-b", + "src/lib.rs", + "session-a", + "session-b", + "session-a", + "session-b", + &FileActivityAction::Modify, + &FileActivityAction::Modify, + "escalate", + "Paused session-b after overlapping modify on src/lib.rs", + )?; + assert_eq!(incident.paused_session_id, "session-b"); + assert!(db.has_open_conflict_incident("src/lib.rs::session-a::session-b")?); + + let listed = db.list_open_conflict_incidents_for_session("session-b", 10)?; + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].path, "src/lib.rs"); + + let resolved = db.resolve_conflict_incidents_not_in(&HashSet::new())?; + assert_eq!(resolved, 1); + assert!(!db.has_open_conflict_incident("src/lib.rs::session-a::session-b")?); + + Ok(()) + } + #[test] fn open_migrates_legacy_tool_log_before_creating_hook_event_index() -> Result<()> { let tempdir = TestDir::new("store-legacy-hook-event")?; diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index d67869d9..68603fb9 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -3707,6 +3707,7 @@ impl Dashboard { ) -> ( Option, Option, + Option, ) { if let Err(error) = self.db.refresh_session_durations() { tracing::warn!("Failed to refresh session durations: {error}"); @@ -3750,11 +3751,24 @@ impl Dashboard { } }; - (heartbeat_enforcement, budget_enforcement) + let conflict_enforcement = match manager::enforce_conflict_resolution(&self.db, &self.cfg) { + Ok(outcome) => Some(outcome), + Err(error) => { + tracing::warn!("Failed to enforce conflict resolution: {error}"); + None + } + }; + + ( + heartbeat_enforcement, + budget_enforcement, + conflict_enforcement, + ) } fn sync_from_store(&mut self) { - let (heartbeat_enforcement, budget_enforcement) = self.sync_runtime_metrics(); + let (heartbeat_enforcement, budget_enforcement, conflict_enforcement) = + self.sync_runtime_metrics(); let selected_id = self.selected_session_id().map(ToOwned::to_owned); self.sessions = match self.db.list_sessions() { Ok(mut sessions) => { @@ -3796,6 +3810,10 @@ impl Dashboard { { self.set_operator_note(budget_auto_pause_note(&outcome)); } + if let Some(outcome) = conflict_enforcement.filter(|outcome| outcome.created_incidents > 0) + { + self.set_operator_note(conflict_enforcement_note(&outcome)); + } if let Some(outcome) = heartbeat_enforcement.filter(|outcome| { !outcome.stale_sessions.is_empty() || !outcome.auto_terminated_sessions.is_empty() }) { @@ -4307,12 +4325,20 @@ impl Dashboard { } self.selected_merge_readiness = worktree.and_then(|worktree| worktree::merge_readiness(worktree).ok()); - self.selected_conflict_protocol = session - .zip(worktree) - .zip(self.selected_merge_readiness.as_ref()) - .and_then(|((session, worktree), merge_readiness)| { - build_conflict_protocol(&session.id, worktree, merge_readiness) - }); + self.selected_conflict_protocol = session.and_then(|selected_session| { + worktree + .zip(self.selected_merge_readiness.as_ref()) + .and_then(|(worktree, merge_readiness)| { + build_conflict_protocol(&selected_session.id, worktree, merge_readiness) + }) + .or_else(|| { + let incidents = self + .db + .list_open_conflict_incidents_for_session(&selected_session.id, 5) + .unwrap_or_default(); + build_session_conflict_protocol(&selected_session.id, &incidents) + }) + }); if self.output_mode == OutputMode::WorktreeDiff && self.selected_diff_patch.is_none() { self.output_mode = OutputMode::SessionOutput; } @@ -5678,6 +5704,22 @@ impl Dashboard { )); } } + let conflict_incidents = self + .db + .list_open_conflict_incidents_for_session(&session.id, 3) + .unwrap_or_default(); + if !conflict_incidents.is_empty() { + lines.push("Active conflicts".to_string()); + for incident in conflict_incidents { + lines.push(format!( + "- {}", + conflict_incident_summary( + &incident, + &self.short_timestamp(&incident.updated_at.to_rfc3339()) + ) + )); + } + } lines.push(format!( "Cost ${:.4} | Duration {}s", metrics.cost_usd, metrics.duration_secs @@ -7386,6 +7428,20 @@ fn file_overlap_summary(entry: &FileActivityOverlap, timestamp: &str) -> String ) } +fn conflict_incident_summary( + incident: &crate::session::store::ConflictIncident, + timestamp: &str, +) -> String { + format!( + "{} {} | active {} | paused {} | {}", + timestamp, + truncate_for_dashboard(&incident.path, 48), + format_session_id(&incident.active_session_id), + format_session_id(&incident.paused_session_id), + incident.strategy.replace('_', "-") + ) +} + fn decision_log_summary(entry: &DecisionLogEntry) -> String { format!("decided {}", truncate_for_dashboard(&entry.decision, 72)) } @@ -7835,6 +7891,21 @@ fn budget_auto_pause_note(outcome: &manager::BudgetEnforcementOutcome) -> String ) } +fn conflict_enforcement_note(outcome: &manager::ConflictEnforcementOutcome) -> String { + let strategy = match outcome.strategy { + crate::config::ConflictResolutionStrategy::Escalate => "escalation", + crate::config::ConflictResolutionStrategy::LastWriteWins => "last-write-wins", + crate::config::ConflictResolutionStrategy::Merge => "merge review", + }; + + format!( + "file conflict detected | opened {} incident(s), auto-paused {} session(s) via {}", + outcome.created_incidents, + outcome.paused_sessions.len(), + strategy + ) +} + fn format_session_id(id: &str) -> String { id.chars().take(8).collect() } @@ -7882,6 +7953,44 @@ fn build_conflict_protocol( Some(lines.join("\n")) } +fn build_session_conflict_protocol( + session_id: &str, + incidents: &[crate::session::store::ConflictIncident], +) -> Option { + if incidents.is_empty() { + return None; + } + + let mut lines = vec![ + format!("Conflict protocol for {}", format_session_id(session_id)), + "Session overlap incidents".to_string(), + ]; + + for incident in incidents { + lines.push(format!( + "- {}", + conflict_incident_summary( + incident, + &incident.updated_at.format("%H:%M:%S").to_string() + ) + )); + lines.push(format!(" {}", incident.summary)); + } + + lines.push("Resolution steps".to_string()); + lines.push("1. Inspect the affected session output and recent file activity".to_string()); + lines.push( + "2. Decide whether to keep the active session, reassign, or merge changes manually" + .to_string(), + ); + lines.push(format!( + "3. Resume the paused session only after reviewing the overlap: ecc resume {}", + session_id + )); + + Some(lines.join("\n")) +} + fn assignment_action_label(action: manager::AssignmentAction) -> &'static str { match action { manager::AssignmentAction::Spawned => "spawned", @@ -9019,7 +9128,7 @@ diff --git a/src/lib.rs b/src/lib.rs\n\ } #[test] - fn metrics_text_surfaces_file_activity_overlaps() -> Result<()> { + fn metrics_text_surfaces_file_activity_conflicts() -> Result<()> { let root = std::env::temp_dir().join(format!("ecc2-file-overlaps-{}", Uuid::new_v4())); fs::create_dir_all(&root)?; let now = Utc::now(); @@ -9061,10 +9170,17 @@ diff --git a/src/lib.rs b/src/lib.rs\n\ dashboard.sync_from_store(); let metrics_text = dashboard.selected_session_metrics_text(); - assert!(metrics_text.contains("Potential overlaps")); - assert!(metrics_text.contains("modify src/lib.rs")); - assert!(metrics_text.contains("idle delegate")); - assert!(metrics_text.contains("as modify")); + assert!(metrics_text.contains("Active conflicts")); + assert!(metrics_text.contains("src/lib.rs")); + assert!(metrics_text.contains("escalate")); + assert_eq!( + dashboard + .db + .get_session("delegate-87654321")? + .expect("delegate should exist") + .state, + SessionState::Stopped + ); let _ = fs::remove_dir_all(root); Ok(()) @@ -10715,6 +10831,103 @@ diff --git a/src/lib.rs b/src/lib.rs ); } + #[test] + fn refresh_enforces_conflicts_and_surfaces_active_incidents() -> Result<()> { + let tempdir = + std::env::temp_dir().join(format!("dashboard-conflict-refresh-{}", Uuid::new_v4())); + fs::create_dir_all(&tempdir)?; + let mut cfg = build_config(&tempdir); + cfg.session_timeout_secs = 3600; + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "session-a".to_string(), + task: "keep active".to_string(), + project: "workspace".to_string(), + task_group: "general".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: Some(1111), + worktree: None, + created_at: now - Duration::minutes(2), + updated_at: now - Duration::minutes(2), + last_heartbeat_at: now - Duration::minutes(2), + metrics: SessionMetrics::default(), + })?; + db.insert_session(&Session { + id: "session-b".to_string(), + task: "later overlap".to_string(), + project: "workspace".to_string(), + task_group: "general".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: Some(2222), + worktree: None, + created_at: now - Duration::minutes(1), + updated_at: now - Duration::minutes(1), + last_heartbeat_at: now - Duration::minutes(1), + metrics: SessionMetrics::default(), + })?; + + fs::create_dir_all( + cfg.tool_activity_metrics_path() + .parent() + .expect("metrics dir"), + )?; + fs::write( + cfg.tool_activity_metrics_path(), + concat!( + "{\"id\":\"evt-1\",\"session_id\":\"session-a\",\"tool_name\":\"Edit\",\"input_summary\":\"Edit src/lib.rs\",\"output_summary\":\"older change\",\"file_events\":[{\"path\":\"src/lib.rs\",\"action\":\"modify\"}],\"timestamp\":\"2026-04-09T00:02:00Z\"}\n", + "{\"id\":\"evt-2\",\"session_id\":\"session-b\",\"tool_name\":\"Write\",\"input_summary\":\"Write src/lib.rs\",\"output_summary\":\"later change\",\"file_events\":[{\"path\":\"src/lib.rs\",\"action\":\"modify\"}],\"timestamp\":\"2026-04-09T00:03:00Z\"}\n" + ), + )?; + + let mut dashboard = Dashboard::new(db, cfg); + dashboard.refresh(); + dashboard.sync_selection_by_id(Some("session-b")); + dashboard.sync_selected_diff(); + + assert_eq!( + dashboard.operator_note.as_deref(), + Some("file conflict detected | opened 1 incident(s), auto-paused 1 session(s) via escalation") + ); + assert_eq!( + dashboard + .db + .get_session("session-b")? + .expect("session-b should exist") + .state, + SessionState::Stopped + ); + + let metrics_text = dashboard.selected_session_metrics_text(); + assert!(metrics_text.contains("Active conflicts")); + assert!(metrics_text.contains("src/lib.rs")); + assert!(metrics_text.contains("escalate")); + + let conflict_protocol = dashboard + .selected_conflict_protocol + .clone() + .expect("conflict protocol should be present"); + assert!(conflict_protocol.contains("Session overlap incidents")); + assert!(conflict_protocol.contains("ecc resume session-b")); + + dashboard.refresh(); + assert_eq!( + dashboard + .db + .list_open_conflict_incidents_for_session("session-b", 10)? + .len(), + 1 + ); + + let _ = fs::remove_dir_all(tempdir); + Ok(()) + } + #[test] fn new_session_task_uses_selected_session_context() { let dashboard = test_dashboard( @@ -12809,6 +13022,7 @@ diff --git a/src/lib.rs b/src/lib.rs cost_budget_usd: 10.0, token_budget: 500_000, budget_alert_thresholds: crate::config::Config::BUDGET_ALERT_THRESHOLDS, + conflict_resolution: crate::config::ConflictResolutionConfig::default(), theme: Theme::Dark, pane_layout: PaneLayout::Horizontal, pane_navigation: Default::default(),