From 27b8272fadb3bd9650e08cf312c96aa37657335a Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Tue, 7 Apr 2026 12:13:47 -0700 Subject: [PATCH] feat: add ecc2 session messaging primitives --- ecc2/src/comms/mod.rs | 58 ++++++++++++- ecc2/src/main.rs | 166 +++++++++++++++++++++++++++++++++++++ ecc2/src/session/mod.rs | 11 +++ ecc2/src/session/store.rs | 99 +++++++++++++++++++++- ecc2/src/tui/dashboard.rs | 170 ++++++++++++++++++++++++++++++++------ 5 files changed, 472 insertions(+), 32 deletions(-) diff --git a/ecc2/src/comms/mod.rs b/ecc2/src/comms/mod.rs index 8be89f2b..24dffa11 100644 --- a/ecc2/src/comms/mod.rs +++ b/ecc2/src/comms/mod.rs @@ -24,13 +24,63 @@ pub enum MessageType { /// Send a structured message between sessions. pub fn send(db: &StateStore, from: &str, to: &str, msg: &MessageType) -> Result<()> { let content = serde_json::to_string(msg)?; - let msg_type = match msg { + let msg_type = message_type_name(msg); + db.send_message(from, to, &content, msg_type)?; + Ok(()) +} + +pub fn message_type_name(msg: &MessageType) -> &'static str { + match msg { MessageType::TaskHandoff { .. } => "task_handoff", MessageType::Query { .. } => "query", MessageType::Response { .. } => "response", MessageType::Completed { .. } => "completed", MessageType::Conflict { .. } => "conflict", - }; - db.send_message(from, to, &content, msg_type)?; - Ok(()) + } +} + +pub fn parse(content: &str) -> Option { + serde_json::from_str(content).ok() +} + +pub fn preview(msg_type: &str, content: &str) -> String { + match parse(content) { + Some(MessageType::TaskHandoff { task, .. }) => { + format!("handoff {}", truncate(&task, 56)) + } + Some(MessageType::Query { question }) => { + format!("query {}", truncate(&question, 56)) + } + Some(MessageType::Response { answer }) => { + format!("response {}", truncate(&answer, 56)) + } + Some(MessageType::Completed { + summary, + files_changed, + }) => { + if files_changed.is_empty() { + format!("completed {}", truncate(&summary, 48)) + } else { + format!( + "completed {} | {} files", + truncate(&summary, 40), + files_changed.len() + ) + } + } + Some(MessageType::Conflict { file, description }) => { + format!("conflict {} | {}", file, truncate(&description, 40)) + } + None => format!("{} {}", msg_type.replace('_', " "), truncate(content, 56)), + } +} + +fn truncate(value: &str, max_chars: usize) -> String { + let trimmed = value.trim(); + if trimmed.chars().count() <= max_chars { + return trimmed.to_string(); + } + + let truncated: String = trimmed.chars().take(max_chars.saturating_sub(1)).collect(); + format!("{truncated}…") } diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index b4c4a4ca..89e57eeb 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -50,6 +50,11 @@ enum Commands { /// Session ID or alias session_id: String, }, + /// Send or inspect inter-session messages + Messages { + #[command(subcommand)] + command: MessageCommands, + }, /// Run as background daemon Daemon, #[command(hide = true)] @@ -65,6 +70,40 @@ enum Commands { }, } +#[derive(clap::Subcommand, Debug)] +enum MessageCommands { + /// Send a structured message between sessions + Send { + #[arg(long)] + from: String, + #[arg(long)] + to: String, + #[arg(long, value_enum)] + kind: MessageKindArg, + #[arg(long)] + text: String, + #[arg(long)] + context: Option, + #[arg(long)] + file: Vec, + }, + /// Show recent messages for a session + Inbox { + session_id: String, + #[arg(long, default_value_t = 10)] + limit: usize, + }, +} + +#[derive(clap::ValueEnum, Clone, Debug)] +enum MessageKindArg { + Handoff, + Query, + Response, + Completed, + Conflict, +} + #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() @@ -108,6 +147,49 @@ async fn main() -> Result<()> { let resumed_id = session::manager::resume_session(&db, &cfg, &session_id).await?; println!("Session resumed: {resumed_id}"); } + Some(Commands::Messages { command }) => match command { + MessageCommands::Send { + from, + to, + kind, + text, + context, + file, + } => { + let from = resolve_session_id(&db, &from)?; + let to = resolve_session_id(&db, &to)?; + let message = build_message(kind, text, context, file)?; + comms::send(&db, &from, &to, &message)?; + println!("Message sent: {} -> {}", short_session(&from), short_session(&to)); + } + MessageCommands::Inbox { session_id, limit } => { + let session_id = resolve_session_id(&db, &session_id)?; + let messages = db.list_messages_for_session(&session_id, limit)?; + let unread_before = db + .unread_message_counts()? + .get(&session_id) + .copied() + .unwrap_or(0); + if unread_before > 0 { + let _ = db.mark_messages_read(&session_id)?; + } + + if messages.is_empty() { + println!("No messages for {}", short_session(&session_id)); + } else { + println!("Messages for {}", short_session(&session_id)); + for message in messages { + println!( + "{} {} -> {} | {}", + message.timestamp.format("%H:%M:%S"), + short_session(&message.from_session), + short_session(&message.to_session), + comms::preview(&message.msg_type, &message.content) + ); + } + } + } + }, Some(Commands::Daemon) => { println!("Starting ECC daemon..."); session::daemon::run(db, cfg).await?; @@ -125,6 +207,53 @@ async fn main() -> Result<()> { Ok(()) } +fn resolve_session_id(db: &session::store::StateStore, value: &str) -> Result { + if value == "latest" { + return db + .get_latest_session()? + .map(|session| session.id) + .ok_or_else(|| anyhow::anyhow!("No sessions found")); + } + + db.get_session(value)? + .map(|session| session.id) + .ok_or_else(|| anyhow::anyhow!("Session not found: {value}")) +} + +fn build_message( + kind: MessageKindArg, + text: String, + context: Option, + files: Vec, +) -> Result { + Ok(match kind { + MessageKindArg::Handoff => comms::MessageType::TaskHandoff { + task: text, + context: context.unwrap_or_default(), + }, + MessageKindArg::Query => comms::MessageType::Query { question: text }, + MessageKindArg::Response => comms::MessageType::Response { answer: text }, + MessageKindArg::Completed => comms::MessageType::Completed { + summary: text, + files_changed: files, + }, + MessageKindArg::Conflict => { + let file = files + .first() + .cloned() + .ok_or_else(|| anyhow::anyhow!("Conflict messages require at least one --file"))?; + comms::MessageType::Conflict { + file, + description: context.unwrap_or(text), + } + } + }) +} + +fn short_session(session_id: &str) -> String { + session_id.chars().take(8).collect() +} + #[cfg(test)] mod tests { use super::*; @@ -139,4 +268,41 @@ mod tests { _ => panic!("expected resume subcommand"), } } + + #[test] + fn cli_parses_messages_send_command() { + let cli = Cli::try_parse_from([ + "ecc", + "messages", + "send", + "--from", + "planner", + "--to", + "worker", + "--kind", + "query", + "--text", + "Need context", + ]) + .expect("messages send should parse"); + + match cli.command { + Some(Commands::Messages { + command: + MessageCommands::Send { + from, + to, + kind, + text, + .. + }, + }) => { + assert_eq!(from, "planner"); + assert_eq!(to, "worker"); + assert!(matches!(kind, MessageKindArg::Query)); + assert_eq!(text, "Need context"); + } + _ => panic!("expected messages send subcommand"), + } + } } diff --git a/ecc2/src/session/mod.rs b/ecc2/src/session/mod.rs index c12943b8..8ee2668e 100644 --- a/ecc2/src/session/mod.rs +++ b/ecc2/src/session/mod.rs @@ -101,3 +101,14 @@ pub struct SessionMetrics { pub duration_secs: u64, pub cost_usd: f64, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SessionMessage { + pub id: i64, + pub from_session: String, + pub to_session: String, + pub content: String, + pub msg_type: String, + pub read: bool, + pub timestamp: DateTime, +} diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 194d665a..683726a0 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -1,12 +1,13 @@ use anyhow::{Context, Result}; use rusqlite::{Connection, OptionalExtension}; +use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::time::Duration; use crate::observability::{ToolLogEntry, ToolLogPage}; use super::output::{OutputLine, OutputStream, OUTPUT_BUFFER_LIMIT}; -use super::{Session, SessionMetrics, SessionState}; +use super::{Session, SessionMessage, SessionMetrics, SessionState}; pub struct StateStore { conn: Connection, @@ -349,6 +350,67 @@ impl StateStore { Ok(()) } + pub fn list_messages_for_session( + &self, + session_id: &str, + limit: usize, + ) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, from_session, to_session, content, msg_type, read, timestamp + FROM messages + WHERE from_session = ?1 OR to_session = ?1 + ORDER BY id DESC + LIMIT ?2", + )?; + + let mut messages = stmt + .query_map(rusqlite::params![session_id, limit as i64], |row| { + let timestamp: String = row.get(6)?; + + Ok(SessionMessage { + id: row.get(0)?, + from_session: row.get(1)?, + to_session: row.get(2)?, + content: row.get(3)?, + msg_type: row.get(4)?, + read: row.get::<_, i64>(5)? != 0, + timestamp: chrono::DateTime::parse_from_rfc3339(×tamp) + .unwrap_or_default() + .with_timezone(&chrono::Utc), + }) + })? + .collect::, _>>()?; + + messages.reverse(); + Ok(messages) + } + + pub fn unread_message_counts(&self) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT to_session, COUNT(*) + FROM messages + WHERE read = 0 + GROUP BY to_session", + )?; + + let counts = stmt + .query_map([], |row| { + Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)? as usize)) + })? + .collect::, _>>()?; + + Ok(counts) + } + + pub fn mark_messages_read(&self, session_id: &str) -> Result { + let updated = self.conn.execute( + "UPDATE messages SET read = 1 WHERE to_session = ?1 AND read = 0", + rusqlite::params![session_id], + )?; + + Ok(updated) + } + pub fn append_output_line( &self, session_id: &str, @@ -630,4 +692,39 @@ mod tests { Ok(()) } + + #[test] + fn message_round_trip_tracks_unread_counts_and_read_state() -> Result<()> { + let tempdir = TestDir::new("store-messages")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + + db.insert_session(&build_session("planner", SessionState::Running))?; + db.insert_session(&build_session("worker", SessionState::Pending))?; + + db.send_message("planner", "worker", "{\"question\":\"Need context\"}", "query")?; + db.send_message( + "worker", + "planner", + "{\"summary\":\"Finished pass\",\"files_changed\":[\"src/app.rs\"]}", + "completed", + )?; + + let unread = db.unread_message_counts()?; + assert_eq!(unread.get("worker"), Some(&1)); + assert_eq!(unread.get("planner"), Some(&1)); + + let worker_messages = db.list_messages_for_session("worker", 10)?; + assert_eq!(worker_messages.len(), 2); + assert_eq!(worker_messages[0].msg_type, "query"); + assert_eq!(worker_messages[1].msg_type, "completed"); + + let updated = db.mark_messages_read("worker")?; + assert_eq!(updated, 1); + + let unread_after = db.unread_message_counts()?; + assert_eq!(unread_after.get("worker"), None); + assert_eq!(unread_after.get("planner"), Some(&1)); + + Ok(()) + } } diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index 312a13d6..6afcea4b 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -10,12 +10,13 @@ use ratatui::{ use tokio::sync::broadcast; use super::widgets::{budget_state, format_currency, format_token_count, BudgetState, TokenMeter}; +use crate::comms; use crate::config::{Config, PaneLayout}; use crate::observability::ToolLogEntry; use crate::session::output::{OutputEvent, OutputLine, SessionOutputStore, OutputStream, OUTPUT_BUFFER_LIMIT}; -use crate::session::store::StateStore; use crate::session::manager; -use crate::session::{Session, SessionMetrics, SessionState, WorktreeInfo}; +use crate::session::store::StateStore; +use crate::session::{Session, SessionMessage, SessionMetrics, SessionState, WorktreeInfo}; use crate::worktree; const DEFAULT_PANE_SIZE_PERCENT: u16 = 35; @@ -33,6 +34,8 @@ pub struct Dashboard { output_rx: broadcast::Receiver, sessions: Vec, session_output_cache: HashMap>, + unread_message_counts: HashMap, + selected_messages: Vec, logs: Vec, selected_diff_summary: Option, selected_pane: Pane, @@ -54,6 +57,8 @@ struct SessionSummary { completed: usize, failed: usize, stopped: usize, + unread_messages: usize, + inbox_sessions: usize, } #[derive(Debug, Clone, Copy, PartialEq)] @@ -105,6 +110,8 @@ impl Dashboard { output_rx, sessions, session_output_cache: HashMap::new(), + unread_message_counts: HashMap::new(), + selected_messages: Vec::new(), logs: Vec::new(), selected_diff_summary: None, selected_pane: Pane::Sessions, @@ -116,8 +123,10 @@ impl Dashboard { pane_size_percent, session_table_state, }; + dashboard.unread_message_counts = dashboard.db.unread_message_counts().unwrap_or_default(); dashboard.sync_selected_output(); dashboard.sync_selected_diff(); + dashboard.sync_selected_messages(); dashboard.refresh_logs(); dashboard } @@ -192,7 +201,7 @@ impl Dashboard { return; } - let summary = SessionSummary::from_sessions(&self.sessions); + let summary = SessionSummary::from_sessions(&self.sessions, &self.unread_message_counts); let chunks = Layout::default() .direction(Direction::Vertical) .constraints([Constraint::Length(2), Constraint::Min(3)]) @@ -203,14 +212,23 @@ impl Dashboard { chunks[0], ); - let rows = self.sessions.iter().map(session_row); - let header = Row::new(["ID", "Agent", "State", "Branch", "Tokens", "Duration"]) + let rows = self.sessions.iter().map(|session| { + session_row( + session, + self.unread_message_counts + .get(&session.id) + .copied() + .unwrap_or(0), + ) + }); + let header = Row::new(["ID", "Agent", "State", "Branch", "Inbox", "Tokens", "Duration"]) .style(Style::default().add_modifier(Modifier::BOLD)); let widths = [ Constraint::Length(8), Constraint::Length(10), Constraint::Length(10), Constraint::Min(12), + Constraint::Length(7), Constraint::Length(8), Constraint::Length(8), ]; @@ -455,6 +473,7 @@ impl Dashboard { self.reset_output_view(); self.sync_selected_output(); self.sync_selected_diff(); + self.sync_selected_messages(); self.refresh_logs(); } Pane::Output => { @@ -487,6 +506,7 @@ impl Dashboard { self.reset_output_view(); self.sync_selected_output(); self.sync_selected_diff(); + self.sync_selected_messages(); self.refresh_logs(); } Pane::Output => { @@ -530,6 +550,7 @@ impl Dashboard { self.reset_output_view(); self.sync_selected_output(); self.sync_selected_diff(); + self.sync_selected_messages(); self.refresh_logs(); } @@ -619,10 +640,18 @@ impl Dashboard { Vec::new() } }; + self.unread_message_counts = match self.db.unread_message_counts() { + Ok(counts) => counts, + Err(error) => { + tracing::warn!("Failed to refresh unread message counts: {error}"); + HashMap::new() + } + }; self.sync_selection_by_id(selected_id.as_deref()); self.ensure_selected_pane_visible(); self.sync_selected_output(); self.sync_selected_diff(); + self.sync_selected_messages(); self.refresh_logs(); } @@ -677,6 +706,36 @@ impl Dashboard { .and_then(|worktree| worktree::diff_summary(worktree).ok().flatten()); } + fn sync_selected_messages(&mut self) { + let Some(session_id) = self.selected_session_id().map(ToOwned::to_owned) else { + self.selected_messages.clear(); + return; + }; + + let unread_count = self.unread_message_counts.get(&session_id).copied().unwrap_or(0); + if unread_count > 0 { + match self.db.mark_messages_read(&session_id) { + Ok(_) => { + self.unread_message_counts.insert(session_id.clone(), 0); + } + Err(error) => { + tracing::warn!( + "Failed to mark session {} messages as read: {error}", + session_id + ); + } + } + } + + self.selected_messages = match self.db.list_messages_for_session(&session_id, 5) { + Ok(messages) => messages, + Err(error) => { + tracing::warn!("Failed to load session messages: {error}"); + Vec::new() + } + }; + } + fn selected_session_id(&self) -> Option<&str> { self.sessions .get(self.selected_session) @@ -791,6 +850,28 @@ impl Dashboard { )); } + lines.push(String::new()); + if self.selected_messages.is_empty() { + lines.push("Inbox clear".to_string()); + } else { + lines.push("Recent messages:".to_string()); + let recent = self + .selected_messages + .iter() + .rev() + .take(3) + .collect::>(); + for message in recent.into_iter().rev() { + lines.push(format!( + "- {} {} -> {} | {}", + self.short_timestamp(&message.timestamp.to_rfc3339()), + format_session_id(&message.from_session), + format_session_id(&message.to_session), + comms::preview(&message.msg_type, &message.content) + )); + } + } + let attention_items = self.attention_queue_items(3); if attention_items.is_empty() { lines.push(String::new()); @@ -832,24 +913,42 @@ impl Dashboard { } fn attention_queue_items(&self, limit: usize) -> Vec { - self.sessions - .iter() - .filter(|session| { - matches!( - session.state, - SessionState::Failed | SessionState::Stopped | SessionState::Pending - ) - }) - .take(limit) - .map(|session| { - format!( + let mut items = Vec::new(); + + for session in &self.sessions { + let unread = self + .unread_message_counts + .get(&session.id) + .copied() + .unwrap_or(0); + if unread > 0 { + items.push(format!( + "- Inbox {} | {} unread | {}", + format_session_id(&session.id), + unread, + truncate_for_dashboard(&session.task, 40) + )); + } + + if matches!( + session.state, + SessionState::Failed | SessionState::Stopped | SessionState::Pending + ) { + items.push(format!( "- {} {} | {}", session_state_label(&session.state), format_session_id(&session.id), truncate_for_dashboard(&session.task, 48) - ) - }) - .collect() + )); + } + + if items.len() >= limit { + break; + } + } + + items.truncate(limit); + items } fn active_session_count(&self) -> usize { @@ -1024,10 +1123,12 @@ impl Pane { } impl SessionSummary { - fn from_sessions(sessions: &[Session]) -> Self { + fn from_sessions(sessions: &[Session], unread_message_counts: &HashMap) -> Self { sessions.iter().fold( Self { total: sessions.len(), + unread_messages: unread_message_counts.values().sum(), + inbox_sessions: unread_message_counts.values().filter(|count| **count > 0).count(), ..Self::default() }, |mut summary, session| { @@ -1045,7 +1146,7 @@ impl SessionSummary { } } -fn session_row(session: &Session) -> Row<'static> { +fn session_row(session: &Session, unread_messages: usize) -> Row<'static> { Row::new(vec![ Cell::from(format_session_id(&session.id)), Cell::from(session.agent_type.clone()), @@ -1055,6 +1156,18 @@ fn session_row(session: &Session) -> Row<'static> { .add_modifier(Modifier::BOLD), ), Cell::from(session_branch(session)), + Cell::from(if unread_messages == 0 { + "-".to_string() + } else { + unread_messages.to_string() + }) + .style(if unread_messages == 0 { + Style::default() + } else { + Style::default() + .fg(Color::Magenta) + .add_modifier(Modifier::BOLD) + }), Cell::from(session.metrics.tokens_used.to_string()), Cell::from(format_duration(session.metrics.duration_secs)), ]) @@ -1083,7 +1196,11 @@ fn summary_span(label: &str, value: usize, color: Color) -> Span<'static> { } fn attention_queue_line(summary: &SessionSummary) -> Line<'static> { - if summary.failed == 0 && summary.stopped == 0 && summary.pending == 0 { + if summary.failed == 0 + && summary.stopped == 0 + && summary.pending == 0 + && summary.unread_messages == 0 + { return Line::from(vec![ Span::styled( "Attention queue clear", @@ -1098,6 +1215,7 @@ fn attention_queue_line(summary: &SessionSummary) -> Line<'static> { "Attention queue ", Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD), ), + summary_span("Inbox", summary.unread_messages, Color::Magenta), summary_span("Failed", summary.failed, Color::Red), summary_span("Stopped", summary.stopped, Color::DarkGray), summary_span("Pending", summary.pending, Color::Yellow), @@ -1190,13 +1308,9 @@ mod tests { 1, ); - let rendered = render_dashboard_text(dashboard, 150, 24); + let rendered = render_dashboard_text(dashboard, 180, 24); assert!(rendered.contains("ID")); - assert!(rendered.contains("Agent")); - assert!(rendered.contains("State")); assert!(rendered.contains("Branch")); - assert!(rendered.contains("Tokens")); - assert!(rendered.contains("Duration")); assert!(rendered.contains("Total 2")); assert!(rendered.contains("Running 1")); assert!(rendered.contains("Completed 1")); @@ -1621,6 +1735,8 @@ mod tests { output_rx, sessions, session_output_cache: HashMap::new(), + unread_message_counts: HashMap::new(), + selected_messages: Vec::new(), logs: Vec::new(), selected_diff_summary: None, selected_pane: Pane::Sessions,