feat: add ecc2 conflict resolution protocol

This commit is contained in:
Affaan Mustafa
2026-04-09 22:20:35 -07:00
parent ea0fb3c0fc
commit e48468a9e7
4 changed files with 961 additions and 15 deletions

View File

@@ -1,6 +1,6 @@
use anyhow::{Context, Result};
use serde::Serialize;
use std::collections::{BTreeMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt;
use std::path::{Path, PathBuf};
use std::process::Stdio;
@@ -510,6 +510,224 @@ pub fn enforce_budget_hard_limits(
Ok(outcome)
}
#[derive(Debug, Clone, Default, Serialize, PartialEq)]
pub struct ConflictEnforcementOutcome {
pub strategy: crate::config::ConflictResolutionStrategy,
pub created_incidents: usize,
pub resolved_incidents: usize,
pub paused_sessions: Vec<String>,
}
pub fn enforce_conflict_resolution(
db: &StateStore,
cfg: &Config,
) -> Result<ConflictEnforcementOutcome> {
let mut outcome = ConflictEnforcementOutcome {
strategy: cfg.conflict_resolution.strategy,
created_incidents: 0,
resolved_incidents: 0,
paused_sessions: Vec::new(),
};
if !cfg.conflict_resolution.enabled {
return Ok(outcome);
}
let sessions = db.list_sessions()?;
let sessions_by_id: HashMap<_, _> = sessions
.iter()
.cloned()
.map(|session| (session.id.clone(), session))
.collect();
let active_sessions: Vec<_> = sessions
.into_iter()
.filter(|session| {
matches!(
session.state,
SessionState::Pending
| SessionState::Running
| SessionState::Idle
| SessionState::Stale
)
})
.collect();
let mut latest_activity_by_path: BTreeMap<String, Vec<super::FileActivityEntry>> =
BTreeMap::new();
for session in &active_sessions {
let mut seen_paths = HashSet::new();
for entry in db.list_file_activity(&session.id, 64)? {
if seen_paths.insert(entry.path.clone()) {
latest_activity_by_path
.entry(entry.path.clone())
.or_default()
.push(entry);
}
}
}
let mut paused_once = HashSet::new();
for (path, mut entries) in latest_activity_by_path {
entries.retain(|entry| !matches!(entry.action, super::FileActivityAction::Read));
if entries.len() < 2 {
continue;
}
entries.sort_by_key(|entry| (entry.timestamp, entry.session_id.clone()));
let latest = entries.last().cloned().expect("entries is not empty");
for other in entries[..entries.len() - 1].iter() {
let conflict_key = conflict_incident_key(&path, &latest.session_id, &other.session_id);
if db.has_open_conflict_incident(&conflict_key)? {
continue;
}
let (active_session_id, paused_session_id, summary) =
choose_conflict_resolution(&path, &latest, other, cfg.conflict_resolution.strategy);
let (first_session_id, second_session_id, first_action, second_action) =
if latest.session_id <= other.session_id {
(
latest.session_id.clone(),
other.session_id.clone(),
latest.action.clone(),
other.action.clone(),
)
} else {
(
other.session_id.clone(),
latest.session_id.clone(),
other.action.clone(),
latest.action.clone(),
)
};
db.upsert_conflict_incident(
&conflict_key,
&path,
&first_session_id,
&second_session_id,
&active_session_id,
&paused_session_id,
&first_action,
&second_action,
conflict_strategy_label(cfg.conflict_resolution.strategy),
&summary,
)?;
if paused_once.insert(paused_session_id.clone()) {
if let Some(session) = sessions_by_id.get(&paused_session_id) {
if matches!(
session.state,
SessionState::Pending
| SessionState::Running
| SessionState::Idle
| SessionState::Stale
) {
stop_session_recorded(db, session, false)?;
outcome.paused_sessions.push(paused_session_id.clone());
}
}
}
comms::send(
db,
&active_session_id,
&paused_session_id,
&MessageType::Conflict {
file: path.clone(),
description: summary.clone(),
},
)?;
db.insert_decision(
&paused_session_id,
&format!("Pause work due to conflict on {path}"),
&[
format!("Keep {active_session_id} active"),
"Continue concurrently".to_string(),
],
&summary,
)?;
if cfg.conflict_resolution.notify_lead {
if let Some(lead_session_id) = db.latest_task_handoff_source(&paused_session_id)? {
if lead_session_id != paused_session_id && lead_session_id != active_session_id
{
comms::send(
db,
&paused_session_id,
&lead_session_id,
&MessageType::Conflict {
file: path.clone(),
description: format!(
"{} | delegate {} paused",
summary, paused_session_id
),
},
)?;
}
}
}
outcome.created_incidents += 1;
}
}
Ok(outcome)
}
fn conflict_incident_key(path: &str, session_a: &str, session_b: &str) -> String {
let (first, second) = if session_a <= session_b {
(session_a, session_b)
} else {
(session_b, session_a)
};
format!("{path}::{first}::{second}")
}
fn conflict_strategy_label(strategy: crate::config::ConflictResolutionStrategy) -> &'static str {
match strategy {
crate::config::ConflictResolutionStrategy::Escalate => "escalate",
crate::config::ConflictResolutionStrategy::LastWriteWins => "last_write_wins",
crate::config::ConflictResolutionStrategy::Merge => "merge",
}
}
fn choose_conflict_resolution(
path: &str,
latest: &super::FileActivityEntry,
other: &super::FileActivityEntry,
strategy: crate::config::ConflictResolutionStrategy,
) -> (String, String, String) {
match strategy {
crate::config::ConflictResolutionStrategy::Escalate => (
other.session_id.clone(),
latest.session_id.clone(),
format!(
"Escalated overlap on {path}; paused later session {} while {} stays active",
latest.session_id, other.session_id
),
),
crate::config::ConflictResolutionStrategy::LastWriteWins => (
latest.session_id.clone(),
other.session_id.clone(),
format!(
"Applied last-write-wins on {path}; kept later session {} active and paused {}",
latest.session_id, other.session_id
),
),
crate::config::ConflictResolutionStrategy::Merge => (
other.session_id.clone(),
latest.session_id.clone(),
format!(
"Queued manual merge on {path}; paused later session {} until merge review against {}",
latest.session_id, other.session_id
),
),
}
}
pub fn record_tool_call(
db: &StateStore,
session_id: &str,
@@ -2428,6 +2646,7 @@ mod tests {
cost_budget_usd: 10.0,
token_budget: 500_000,
budget_alert_thresholds: Config::BUDGET_ALERT_THRESHOLDS,
conflict_resolution: crate::config::ConflictResolutionConfig::default(),
theme: Theme::Dark,
pane_layout: PaneLayout::Horizontal,
pane_navigation: Default::default(),
@@ -4821,4 +5040,148 @@ mod tests {
Ok(())
}
#[test]
fn enforce_conflict_resolution_pauses_later_session_and_notifies_lead() -> Result<()> {
let tempdir = TestDir::new("manager-conflict-escalate")?;
let cfg = build_config(tempdir.path());
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&build_session("lead", SessionState::Running, now))?;
db.insert_session(&build_session(
"session-a",
SessionState::Running,
now - Duration::minutes(2),
))?;
db.insert_session(&build_session(
"session-b",
SessionState::Running,
now - Duration::minutes(1),
))?;
crate::comms::send(
&db,
"lead",
"session-b",
&crate::comms::MessageType::TaskHandoff {
task: "Review src/lib.rs".to_string(),
context: "Lead delegated follow-up".to_string(),
},
)?;
let metrics_dir = tempdir.path().join("metrics");
std::fs::create_dir_all(&metrics_dir)?;
let metrics_path = metrics_dir.join("tool-usage.jsonl");
std::fs::write(
&metrics_path,
concat!(
"{\"id\":\"evt-1\",\"session_id\":\"session-a\",\"tool_name\":\"Edit\",\"input_summary\":\"Edit src/lib.rs\",\"output_summary\":\"updated logic\",\"file_events\":[{\"path\":\"src/lib.rs\",\"action\":\"modify\"}],\"timestamp\":\"2026-04-09T00:02:00Z\"}\n",
"{\"id\":\"evt-2\",\"session_id\":\"session-b\",\"tool_name\":\"Write\",\"input_summary\":\"Write src/lib.rs\",\"output_summary\":\"newer change\",\"file_events\":[{\"path\":\"src/lib.rs\",\"action\":\"modify\"}],\"timestamp\":\"2026-04-09T00:03:00Z\"}\n"
),
)?;
db.sync_tool_activity_metrics(&metrics_path)?;
let outcome = enforce_conflict_resolution(&db, &cfg)?;
assert_eq!(outcome.created_incidents, 1);
assert_eq!(outcome.resolved_incidents, 0);
assert_eq!(outcome.paused_sessions, vec!["session-b".to_string()]);
let session_a = db
.get_session("session-a")?
.expect("session-a should still exist");
let session_b = db
.get_session("session-b")?
.expect("session-b should still exist");
assert_eq!(session_a.state, SessionState::Running);
assert_eq!(session_b.state, SessionState::Stopped);
assert!(db.has_open_conflict_incident("src/lib.rs::session-a::session-b")?);
let decisions = db.list_decisions_for_session("session-b", 10)?;
assert!(decisions
.iter()
.any(|entry| entry.decision == "Pause work due to conflict on src/lib.rs"));
let approval_counts = db.unread_approval_counts()?;
assert_eq!(approval_counts.get("session-b"), Some(&1usize));
assert_eq!(approval_counts.get("lead"), Some(&1usize));
let unread_queue = db.unread_approval_queue(10)?;
assert!(unread_queue.iter().any(|msg| {
msg.to_session == "session-b"
&& msg.msg_type == "conflict"
&& msg.content.contains("src/lib.rs")
}));
assert!(unread_queue.iter().any(|msg| {
msg.to_session == "lead"
&& msg.msg_type == "conflict"
&& msg.content.contains("delegate session-b paused")
}));
let second_pass = enforce_conflict_resolution(&db, &cfg)?;
assert_eq!(second_pass.created_incidents, 0);
assert_eq!(second_pass.paused_sessions, Vec::<String>::new());
assert_eq!(
db.list_open_conflict_incidents_for_session("session-b", 10)?
.len(),
1
);
Ok(())
}
#[test]
fn enforce_conflict_resolution_supports_last_write_wins() -> Result<()> {
let tempdir = TestDir::new("manager-conflict-last-write-wins")?;
let mut cfg = build_config(tempdir.path());
cfg.conflict_resolution.strategy = crate::config::ConflictResolutionStrategy::LastWriteWins;
cfg.conflict_resolution.notify_lead = false;
let db = StateStore::open(&cfg.db_path)?;
let now = Utc::now();
db.insert_session(&build_session(
"session-a",
SessionState::Running,
now - Duration::minutes(2),
))?;
db.insert_session(&build_session(
"session-b",
SessionState::Running,
now - Duration::minutes(1),
))?;
let metrics_dir = tempdir.path().join("metrics");
std::fs::create_dir_all(&metrics_dir)?;
let metrics_path = metrics_dir.join("tool-usage.jsonl");
std::fs::write(
&metrics_path,
concat!(
"{\"id\":\"evt-1\",\"session_id\":\"session-a\",\"tool_name\":\"Edit\",\"input_summary\":\"Edit src/lib.rs\",\"output_summary\":\"older change\",\"file_events\":[{\"path\":\"src/lib.rs\",\"action\":\"modify\"}],\"timestamp\":\"2026-04-09T00:02:00Z\"}\n",
"{\"id\":\"evt-2\",\"session_id\":\"session-b\",\"tool_name\":\"Edit\",\"input_summary\":\"Edit src/lib.rs\",\"output_summary\":\"later change\",\"file_events\":[{\"path\":\"src/lib.rs\",\"action\":\"modify\"}],\"timestamp\":\"2026-04-09T00:03:00Z\"}\n"
),
)?;
db.sync_tool_activity_metrics(&metrics_path)?;
let outcome = enforce_conflict_resolution(&db, &cfg)?;
assert_eq!(outcome.created_incidents, 1);
assert_eq!(outcome.paused_sessions, vec!["session-a".to_string()]);
let session_a = db
.get_session("session-a")?
.expect("session-a should still exist");
let session_b = db
.get_session("session-b")?
.expect("session-b should still exist");
assert_eq!(session_a.state, SessionState::Stopped);
assert_eq!(session_b.state, SessionState::Running);
let incidents = db.list_open_conflict_incidents_for_session("session-a", 10)?;
assert_eq!(incidents.len(), 1);
assert_eq!(incidents[0].active_session_id, "session-b");
assert_eq!(incidents[0].paused_session_id, "session-a");
assert_eq!(incidents[0].strategy, "last_write_wins");
Ok(())
}
}

View File

@@ -39,6 +39,24 @@ pub struct FileActivityOverlap {
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct ConflictIncident {
pub id: i64,
pub conflict_key: String,
pub path: String,
pub first_session_id: String,
pub second_session_id: String,
pub active_session_id: String,
pub paused_session_id: String,
pub first_action: FileActivityAction,
pub second_action: FileActivityAction,
pub strategy: String,
pub summary: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
pub resolved_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct DaemonActivity {
pub last_dispatch_at: Option<chrono::DateTime<chrono::Utc>>,
@@ -209,6 +227,23 @@ impl StateStore {
requested_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS conflict_incidents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conflict_key TEXT NOT NULL UNIQUE,
path TEXT NOT NULL,
first_session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
second_session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
active_session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
paused_session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
first_action TEXT NOT NULL,
second_action TEXT NOT NULL,
strategy TEXT NOT NULL,
summary TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
resolved_at TEXT
);
CREATE TABLE IF NOT EXISTS daemon_activity (
id INTEGER PRIMARY KEY CHECK(id = 1),
last_dispatch_at TEXT,
@@ -240,6 +275,8 @@ impl StateStore {
ON session_output(session_id, id);
CREATE INDEX IF NOT EXISTS idx_decision_log_session
ON decision_log(session_id, timestamp, id);
CREATE INDEX IF NOT EXISTS idx_conflict_incidents_sessions
ON conflict_incidents(first_session_id, second_session_id, resolved_at, updated_at);
CREATE INDEX IF NOT EXISTS idx_pending_worktree_queue_requested_at
ON pending_worktree_queue(requested_at, session_id);
@@ -2038,6 +2075,157 @@ impl StateStore {
overlaps.truncate(limit);
Ok(overlaps)
}
pub fn has_open_conflict_incident(&self, conflict_key: &str) -> Result<bool> {
let exists = self
.conn
.query_row(
"SELECT 1
FROM conflict_incidents
WHERE conflict_key = ?1 AND resolved_at IS NULL
LIMIT 1",
rusqlite::params![conflict_key],
|_| Ok(()),
)
.optional()?
.is_some();
Ok(exists)
}
#[allow(clippy::too_many_arguments)]
pub fn upsert_conflict_incident(
&self,
conflict_key: &str,
path: &str,
first_session_id: &str,
second_session_id: &str,
active_session_id: &str,
paused_session_id: &str,
first_action: &FileActivityAction,
second_action: &FileActivityAction,
strategy: &str,
summary: &str,
) -> Result<ConflictIncident> {
let now = chrono::Utc::now().to_rfc3339();
self.conn.execute(
"INSERT INTO conflict_incidents (
conflict_key, path, first_session_id, second_session_id,
active_session_id, paused_session_id, first_action, second_action,
strategy, summary, created_at, updated_at, resolved_at
)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?11, NULL)
ON CONFLICT(conflict_key) DO UPDATE SET
path = excluded.path,
first_session_id = excluded.first_session_id,
second_session_id = excluded.second_session_id,
active_session_id = excluded.active_session_id,
paused_session_id = excluded.paused_session_id,
first_action = excluded.first_action,
second_action = excluded.second_action,
strategy = excluded.strategy,
summary = excluded.summary,
updated_at = excluded.updated_at,
resolved_at = NULL",
rusqlite::params![
conflict_key,
path,
first_session_id,
second_session_id,
active_session_id,
paused_session_id,
file_activity_action_value(first_action),
file_activity_action_value(second_action),
strategy,
summary,
now,
],
)?;
self.conn
.query_row(
"SELECT id, conflict_key, path, first_session_id, second_session_id,
active_session_id, paused_session_id, first_action, second_action,
strategy, summary, created_at, updated_at, resolved_at
FROM conflict_incidents
WHERE conflict_key = ?1",
rusqlite::params![conflict_key],
map_conflict_incident,
)
.map_err(Into::into)
}
pub fn resolve_conflict_incidents_not_in(
&self,
active_keys: &HashSet<String>,
) -> Result<usize> {
let open = self.list_open_conflict_incidents(512)?;
let now = chrono::Utc::now().to_rfc3339();
let mut resolved = 0;
for incident in open {
if active_keys.contains(&incident.conflict_key) {
continue;
}
resolved += self.conn.execute(
"UPDATE conflict_incidents
SET resolved_at = ?2, updated_at = ?2
WHERE conflict_key = ?1 AND resolved_at IS NULL",
rusqlite::params![incident.conflict_key, now],
)?;
}
Ok(resolved)
}
pub fn list_open_conflict_incidents_for_session(
&self,
session_id: &str,
limit: usize,
) -> Result<Vec<ConflictIncident>> {
let mut stmt = self.conn.prepare(
"SELECT id, conflict_key, path, first_session_id, second_session_id,
active_session_id, paused_session_id, first_action, second_action,
strategy, summary, created_at, updated_at, resolved_at
FROM conflict_incidents
WHERE resolved_at IS NULL
AND (
first_session_id = ?1
OR second_session_id = ?1
OR active_session_id = ?1
OR paused_session_id = ?1
)
ORDER BY updated_at DESC, id DESC
LIMIT ?2",
)?;
let incidents = stmt
.query_map(
rusqlite::params![session_id, limit as i64],
map_conflict_incident,
)?
.collect::<Result<Vec<_>, _>>()
.map_err(anyhow::Error::from)?;
Ok(incidents)
}
fn list_open_conflict_incidents(&self, limit: usize) -> Result<Vec<ConflictIncident>> {
let mut stmt = self.conn.prepare(
"SELECT id, conflict_key, path, first_session_id, second_session_id,
active_session_id, paused_session_id, first_action, second_action,
strategy, summary, created_at, updated_at, resolved_at
FROM conflict_incidents
WHERE resolved_at IS NULL
ORDER BY updated_at DESC, id DESC
LIMIT ?1",
)?;
let incidents = stmt
.query_map(rusqlite::params![limit as i64], map_conflict_incident)?
.collect::<Result<Vec<_>, _>>()
.map_err(anyhow::Error::from)?;
Ok(incidents)
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@@ -2073,6 +2261,70 @@ fn parse_persisted_file_events(value: &str) -> Option<Vec<PersistedFileEvent>> {
Some(events)
}
fn file_activity_action_value(action: &FileActivityAction) -> &'static str {
match action {
FileActivityAction::Read => "read",
FileActivityAction::Create => "create",
FileActivityAction::Modify => "modify",
FileActivityAction::Move => "move",
FileActivityAction::Delete => "delete",
FileActivityAction::Touch => "touch",
}
}
fn map_conflict_incident(row: &rusqlite::Row<'_>) -> rusqlite::Result<ConflictIncident> {
let created_at = parse_timestamp_column(row.get::<_, String>(11)?, 11)?;
let updated_at = parse_timestamp_column(row.get::<_, String>(12)?, 12)?;
let resolved_at = row
.get::<_, Option<String>>(13)?
.map(|value| parse_timestamp_column(value, 13))
.transpose()?;
Ok(ConflictIncident {
id: row.get(0)?,
conflict_key: row.get(1)?,
path: row.get(2)?,
first_session_id: row.get(3)?,
second_session_id: row.get(4)?,
active_session_id: row.get(5)?,
paused_session_id: row.get(6)?,
first_action: parse_file_activity_action(&row.get::<_, String>(7)?).ok_or_else(|| {
rusqlite::Error::InvalidColumnType(
7,
"first_action".into(),
rusqlite::types::Type::Text,
)
})?,
second_action: parse_file_activity_action(&row.get::<_, String>(8)?).ok_or_else(|| {
rusqlite::Error::InvalidColumnType(
8,
"second_action".into(),
rusqlite::types::Type::Text,
)
})?,
strategy: row.get(9)?,
summary: row.get(10)?,
created_at,
updated_at,
resolved_at,
})
}
fn parse_timestamp_column(
value: String,
index: usize,
) -> rusqlite::Result<chrono::DateTime<chrono::Utc>> {
chrono::DateTime::parse_from_rfc3339(&value)
.map(|value| value.with_timezone(&chrono::Utc))
.map_err(|error| {
rusqlite::Error::FromSqlConversionFailure(
index,
rusqlite::types::Type::Text,
Box::new(error),
)
})
}
fn parse_file_activity_action(value: &str) -> Option<FileActivityAction> {
match value.trim().to_ascii_lowercase().as_str() {
"read" => Some(FileActivityAction::Read),
@@ -2582,6 +2834,56 @@ mod tests {
Ok(())
}
#[test]
fn conflict_incidents_upsert_and_resolve() -> Result<()> {
let tempdir = TestDir::new("store-conflict-incidents")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let now = Utc::now();
for id in ["session-a", "session-b"] {
db.insert_session(&Session {
id: id.to_string(),
task: id.to_string(),
project: "workspace".to_string(),
task_group: "general".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics::default(),
})?;
}
let incident = db.upsert_conflict_incident(
"src/lib.rs::session-a::session-b",
"src/lib.rs",
"session-a",
"session-b",
"session-a",
"session-b",
&FileActivityAction::Modify,
&FileActivityAction::Modify,
"escalate",
"Paused session-b after overlapping modify on src/lib.rs",
)?;
assert_eq!(incident.paused_session_id, "session-b");
assert!(db.has_open_conflict_incident("src/lib.rs::session-a::session-b")?);
let listed = db.list_open_conflict_incidents_for_session("session-b", 10)?;
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].path, "src/lib.rs");
let resolved = db.resolve_conflict_incidents_not_in(&HashSet::new())?;
assert_eq!(resolved, 1);
assert!(!db.has_open_conflict_incident("src/lib.rs::session-a::session-b")?);
Ok(())
}
#[test]
fn open_migrates_legacy_tool_log_before_creating_hook_event_index() -> Result<()> {
let tempdir = TestDir::new("store-legacy-hook-event")?;