feat: add ecc2 session messaging primitives

This commit is contained in:
Affaan Mustafa
2026-04-07 12:13:47 -07:00
parent 1d46559201
commit 27b8272fad
5 changed files with 472 additions and 32 deletions

View File

@@ -24,13 +24,63 @@ pub enum MessageType {
/// Send a structured message between sessions.
pub fn send(db: &StateStore, from: &str, to: &str, msg: &MessageType) -> Result<()> {
let content = serde_json::to_string(msg)?;
let msg_type = match msg {
let msg_type = message_type_name(msg);
db.send_message(from, to, &content, msg_type)?;
Ok(())
}
pub fn message_type_name(msg: &MessageType) -> &'static str {
match msg {
MessageType::TaskHandoff { .. } => "task_handoff",
MessageType::Query { .. } => "query",
MessageType::Response { .. } => "response",
MessageType::Completed { .. } => "completed",
MessageType::Conflict { .. } => "conflict",
};
db.send_message(from, to, &content, msg_type)?;
Ok(())
}
}
pub fn parse(content: &str) -> Option<MessageType> {
serde_json::from_str(content).ok()
}
pub fn preview(msg_type: &str, content: &str) -> String {
match parse(content) {
Some(MessageType::TaskHandoff { task, .. }) => {
format!("handoff {}", truncate(&task, 56))
}
Some(MessageType::Query { question }) => {
format!("query {}", truncate(&question, 56))
}
Some(MessageType::Response { answer }) => {
format!("response {}", truncate(&answer, 56))
}
Some(MessageType::Completed {
summary,
files_changed,
}) => {
if files_changed.is_empty() {
format!("completed {}", truncate(&summary, 48))
} else {
format!(
"completed {} | {} files",
truncate(&summary, 40),
files_changed.len()
)
}
}
Some(MessageType::Conflict { file, description }) => {
format!("conflict {} | {}", file, truncate(&description, 40))
}
None => format!("{} {}", msg_type.replace('_', " "), truncate(content, 56)),
}
}
fn truncate(value: &str, max_chars: usize) -> String {
let trimmed = value.trim();
if trimmed.chars().count() <= max_chars {
return trimmed.to_string();
}
let truncated: String = trimmed.chars().take(max_chars.saturating_sub(1)).collect();
format!("{truncated}")
}

View File

@@ -50,6 +50,11 @@ enum Commands {
/// Session ID or alias
session_id: String,
},
/// Send or inspect inter-session messages
Messages {
#[command(subcommand)]
command: MessageCommands,
},
/// Run as background daemon
Daemon,
#[command(hide = true)]
@@ -65,6 +70,40 @@ enum Commands {
},
}
#[derive(clap::Subcommand, Debug)]
enum MessageCommands {
/// Send a structured message between sessions
Send {
#[arg(long)]
from: String,
#[arg(long)]
to: String,
#[arg(long, value_enum)]
kind: MessageKindArg,
#[arg(long)]
text: String,
#[arg(long)]
context: Option<String>,
#[arg(long)]
file: Vec<String>,
},
/// Show recent messages for a session
Inbox {
session_id: String,
#[arg(long, default_value_t = 10)]
limit: usize,
},
}
#[derive(clap::ValueEnum, Clone, Debug)]
enum MessageKindArg {
Handoff,
Query,
Response,
Completed,
Conflict,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
@@ -108,6 +147,49 @@ async fn main() -> Result<()> {
let resumed_id = session::manager::resume_session(&db, &cfg, &session_id).await?;
println!("Session resumed: {resumed_id}");
}
Some(Commands::Messages { command }) => match command {
MessageCommands::Send {
from,
to,
kind,
text,
context,
file,
} => {
let from = resolve_session_id(&db, &from)?;
let to = resolve_session_id(&db, &to)?;
let message = build_message(kind, text, context, file)?;
comms::send(&db, &from, &to, &message)?;
println!("Message sent: {} -> {}", short_session(&from), short_session(&to));
}
MessageCommands::Inbox { session_id, limit } => {
let session_id = resolve_session_id(&db, &session_id)?;
let messages = db.list_messages_for_session(&session_id, limit)?;
let unread_before = db
.unread_message_counts()?
.get(&session_id)
.copied()
.unwrap_or(0);
if unread_before > 0 {
let _ = db.mark_messages_read(&session_id)?;
}
if messages.is_empty() {
println!("No messages for {}", short_session(&session_id));
} else {
println!("Messages for {}", short_session(&session_id));
for message in messages {
println!(
"{} {} -> {} | {}",
message.timestamp.format("%H:%M:%S"),
short_session(&message.from_session),
short_session(&message.to_session),
comms::preview(&message.msg_type, &message.content)
);
}
}
}
},
Some(Commands::Daemon) => {
println!("Starting ECC daemon...");
session::daemon::run(db, cfg).await?;
@@ -125,6 +207,53 @@ async fn main() -> Result<()> {
Ok(())
}
fn resolve_session_id(db: &session::store::StateStore, value: &str) -> Result<String> {
if value == "latest" {
return db
.get_latest_session()?
.map(|session| session.id)
.ok_or_else(|| anyhow::anyhow!("No sessions found"));
}
db.get_session(value)?
.map(|session| session.id)
.ok_or_else(|| anyhow::anyhow!("Session not found: {value}"))
}
fn build_message(
kind: MessageKindArg,
text: String,
context: Option<String>,
files: Vec<String>,
) -> Result<comms::MessageType> {
Ok(match kind {
MessageKindArg::Handoff => comms::MessageType::TaskHandoff {
task: text,
context: context.unwrap_or_default(),
},
MessageKindArg::Query => comms::MessageType::Query { question: text },
MessageKindArg::Response => comms::MessageType::Response { answer: text },
MessageKindArg::Completed => comms::MessageType::Completed {
summary: text,
files_changed: files,
},
MessageKindArg::Conflict => {
let file = files
.first()
.cloned()
.ok_or_else(|| anyhow::anyhow!("Conflict messages require at least one --file"))?;
comms::MessageType::Conflict {
file,
description: context.unwrap_or(text),
}
}
})
}
fn short_session(session_id: &str) -> String {
session_id.chars().take(8).collect()
}
#[cfg(test)]
mod tests {
use super::*;
@@ -139,4 +268,41 @@ mod tests {
_ => panic!("expected resume subcommand"),
}
}
#[test]
fn cli_parses_messages_send_command() {
let cli = Cli::try_parse_from([
"ecc",
"messages",
"send",
"--from",
"planner",
"--to",
"worker",
"--kind",
"query",
"--text",
"Need context",
])
.expect("messages send should parse");
match cli.command {
Some(Commands::Messages {
command:
MessageCommands::Send {
from,
to,
kind,
text,
..
},
}) => {
assert_eq!(from, "planner");
assert_eq!(to, "worker");
assert!(matches!(kind, MessageKindArg::Query));
assert_eq!(text, "Need context");
}
_ => panic!("expected messages send subcommand"),
}
}
}

View File

@@ -101,3 +101,14 @@ pub struct SessionMetrics {
pub duration_secs: u64,
pub cost_usd: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionMessage {
pub id: i64,
pub from_session: String,
pub to_session: String,
pub content: String,
pub msg_type: String,
pub read: bool,
pub timestamp: DateTime<Utc>,
}

View File

@@ -1,12 +1,13 @@
use anyhow::{Context, Result};
use rusqlite::{Connection, OptionalExtension};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::Duration;
use crate::observability::{ToolLogEntry, ToolLogPage};
use super::output::{OutputLine, OutputStream, OUTPUT_BUFFER_LIMIT};
use super::{Session, SessionMetrics, SessionState};
use super::{Session, SessionMessage, SessionMetrics, SessionState};
pub struct StateStore {
conn: Connection,
@@ -349,6 +350,67 @@ impl StateStore {
Ok(())
}
pub fn list_messages_for_session(
&self,
session_id: &str,
limit: usize,
) -> Result<Vec<SessionMessage>> {
let mut stmt = self.conn.prepare(
"SELECT id, from_session, to_session, content, msg_type, read, timestamp
FROM messages
WHERE from_session = ?1 OR to_session = ?1
ORDER BY id DESC
LIMIT ?2",
)?;
let mut messages = stmt
.query_map(rusqlite::params![session_id, limit as i64], |row| {
let timestamp: String = row.get(6)?;
Ok(SessionMessage {
id: row.get(0)?,
from_session: row.get(1)?,
to_session: row.get(2)?,
content: row.get(3)?,
msg_type: row.get(4)?,
read: row.get::<_, i64>(5)? != 0,
timestamp: chrono::DateTime::parse_from_rfc3339(&timestamp)
.unwrap_or_default()
.with_timezone(&chrono::Utc),
})
})?
.collect::<Result<Vec<_>, _>>()?;
messages.reverse();
Ok(messages)
}
pub fn unread_message_counts(&self) -> Result<HashMap<String, usize>> {
let mut stmt = self.conn.prepare(
"SELECT to_session, COUNT(*)
FROM messages
WHERE read = 0
GROUP BY to_session",
)?;
let counts = stmt
.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)? as usize))
})?
.collect::<Result<HashMap<_, _>, _>>()?;
Ok(counts)
}
pub fn mark_messages_read(&self, session_id: &str) -> Result<usize> {
let updated = self.conn.execute(
"UPDATE messages SET read = 1 WHERE to_session = ?1 AND read = 0",
rusqlite::params![session_id],
)?;
Ok(updated)
}
pub fn append_output_line(
&self,
session_id: &str,
@@ -630,4 +692,39 @@ mod tests {
Ok(())
}
#[test]
fn message_round_trip_tracks_unread_counts_and_read_state() -> Result<()> {
let tempdir = TestDir::new("store-messages")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
db.insert_session(&build_session("planner", SessionState::Running))?;
db.insert_session(&build_session("worker", SessionState::Pending))?;
db.send_message("planner", "worker", "{\"question\":\"Need context\"}", "query")?;
db.send_message(
"worker",
"planner",
"{\"summary\":\"Finished pass\",\"files_changed\":[\"src/app.rs\"]}",
"completed",
)?;
let unread = db.unread_message_counts()?;
assert_eq!(unread.get("worker"), Some(&1));
assert_eq!(unread.get("planner"), Some(&1));
let worker_messages = db.list_messages_for_session("worker", 10)?;
assert_eq!(worker_messages.len(), 2);
assert_eq!(worker_messages[0].msg_type, "query");
assert_eq!(worker_messages[1].msg_type, "completed");
let updated = db.mark_messages_read("worker")?;
assert_eq!(updated, 1);
let unread_after = db.unread_message_counts()?;
assert_eq!(unread_after.get("worker"), None);
assert_eq!(unread_after.get("planner"), Some(&1));
Ok(())
}
}

View File

@@ -10,12 +10,13 @@ use ratatui::{
use tokio::sync::broadcast;
use super::widgets::{budget_state, format_currency, format_token_count, BudgetState, TokenMeter};
use crate::comms;
use crate::config::{Config, PaneLayout};
use crate::observability::ToolLogEntry;
use crate::session::output::{OutputEvent, OutputLine, SessionOutputStore, OutputStream, OUTPUT_BUFFER_LIMIT};
use crate::session::store::StateStore;
use crate::session::manager;
use crate::session::{Session, SessionMetrics, SessionState, WorktreeInfo};
use crate::session::store::StateStore;
use crate::session::{Session, SessionMessage, SessionMetrics, SessionState, WorktreeInfo};
use crate::worktree;
const DEFAULT_PANE_SIZE_PERCENT: u16 = 35;
@@ -33,6 +34,8 @@ pub struct Dashboard {
output_rx: broadcast::Receiver<OutputEvent>,
sessions: Vec<Session>,
session_output_cache: HashMap<String, Vec<OutputLine>>,
unread_message_counts: HashMap<String, usize>,
selected_messages: Vec<SessionMessage>,
logs: Vec<ToolLogEntry>,
selected_diff_summary: Option<String>,
selected_pane: Pane,
@@ -54,6 +57,8 @@ struct SessionSummary {
completed: usize,
failed: usize,
stopped: usize,
unread_messages: usize,
inbox_sessions: usize,
}
#[derive(Debug, Clone, Copy, PartialEq)]
@@ -105,6 +110,8 @@ impl Dashboard {
output_rx,
sessions,
session_output_cache: HashMap::new(),
unread_message_counts: HashMap::new(),
selected_messages: Vec::new(),
logs: Vec::new(),
selected_diff_summary: None,
selected_pane: Pane::Sessions,
@@ -116,8 +123,10 @@ impl Dashboard {
pane_size_percent,
session_table_state,
};
dashboard.unread_message_counts = dashboard.db.unread_message_counts().unwrap_or_default();
dashboard.sync_selected_output();
dashboard.sync_selected_diff();
dashboard.sync_selected_messages();
dashboard.refresh_logs();
dashboard
}
@@ -192,7 +201,7 @@ impl Dashboard {
return;
}
let summary = SessionSummary::from_sessions(&self.sessions);
let summary = SessionSummary::from_sessions(&self.sessions, &self.unread_message_counts);
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Length(2), Constraint::Min(3)])
@@ -203,14 +212,23 @@ impl Dashboard {
chunks[0],
);
let rows = self.sessions.iter().map(session_row);
let header = Row::new(["ID", "Agent", "State", "Branch", "Tokens", "Duration"])
let rows = self.sessions.iter().map(|session| {
session_row(
session,
self.unread_message_counts
.get(&session.id)
.copied()
.unwrap_or(0),
)
});
let header = Row::new(["ID", "Agent", "State", "Branch", "Inbox", "Tokens", "Duration"])
.style(Style::default().add_modifier(Modifier::BOLD));
let widths = [
Constraint::Length(8),
Constraint::Length(10),
Constraint::Length(10),
Constraint::Min(12),
Constraint::Length(7),
Constraint::Length(8),
Constraint::Length(8),
];
@@ -455,6 +473,7 @@ impl Dashboard {
self.reset_output_view();
self.sync_selected_output();
self.sync_selected_diff();
self.sync_selected_messages();
self.refresh_logs();
}
Pane::Output => {
@@ -487,6 +506,7 @@ impl Dashboard {
self.reset_output_view();
self.sync_selected_output();
self.sync_selected_diff();
self.sync_selected_messages();
self.refresh_logs();
}
Pane::Output => {
@@ -530,6 +550,7 @@ impl Dashboard {
self.reset_output_view();
self.sync_selected_output();
self.sync_selected_diff();
self.sync_selected_messages();
self.refresh_logs();
}
@@ -619,10 +640,18 @@ impl Dashboard {
Vec::new()
}
};
self.unread_message_counts = match self.db.unread_message_counts() {
Ok(counts) => counts,
Err(error) => {
tracing::warn!("Failed to refresh unread message counts: {error}");
HashMap::new()
}
};
self.sync_selection_by_id(selected_id.as_deref());
self.ensure_selected_pane_visible();
self.sync_selected_output();
self.sync_selected_diff();
self.sync_selected_messages();
self.refresh_logs();
}
@@ -677,6 +706,36 @@ impl Dashboard {
.and_then(|worktree| worktree::diff_summary(worktree).ok().flatten());
}
fn sync_selected_messages(&mut self) {
let Some(session_id) = self.selected_session_id().map(ToOwned::to_owned) else {
self.selected_messages.clear();
return;
};
let unread_count = self.unread_message_counts.get(&session_id).copied().unwrap_or(0);
if unread_count > 0 {
match self.db.mark_messages_read(&session_id) {
Ok(_) => {
self.unread_message_counts.insert(session_id.clone(), 0);
}
Err(error) => {
tracing::warn!(
"Failed to mark session {} messages as read: {error}",
session_id
);
}
}
}
self.selected_messages = match self.db.list_messages_for_session(&session_id, 5) {
Ok(messages) => messages,
Err(error) => {
tracing::warn!("Failed to load session messages: {error}");
Vec::new()
}
};
}
fn selected_session_id(&self) -> Option<&str> {
self.sessions
.get(self.selected_session)
@@ -791,6 +850,28 @@ impl Dashboard {
));
}
lines.push(String::new());
if self.selected_messages.is_empty() {
lines.push("Inbox clear".to_string());
} else {
lines.push("Recent messages:".to_string());
let recent = self
.selected_messages
.iter()
.rev()
.take(3)
.collect::<Vec<_>>();
for message in recent.into_iter().rev() {
lines.push(format!(
"- {} {} -> {} | {}",
self.short_timestamp(&message.timestamp.to_rfc3339()),
format_session_id(&message.from_session),
format_session_id(&message.to_session),
comms::preview(&message.msg_type, &message.content)
));
}
}
let attention_items = self.attention_queue_items(3);
if attention_items.is_empty() {
lines.push(String::new());
@@ -832,24 +913,42 @@ impl Dashboard {
}
fn attention_queue_items(&self, limit: usize) -> Vec<String> {
self.sessions
.iter()
.filter(|session| {
matches!(
session.state,
SessionState::Failed | SessionState::Stopped | SessionState::Pending
)
})
.take(limit)
.map(|session| {
format!(
let mut items = Vec::new();
for session in &self.sessions {
let unread = self
.unread_message_counts
.get(&session.id)
.copied()
.unwrap_or(0);
if unread > 0 {
items.push(format!(
"- Inbox {} | {} unread | {}",
format_session_id(&session.id),
unread,
truncate_for_dashboard(&session.task, 40)
));
}
if matches!(
session.state,
SessionState::Failed | SessionState::Stopped | SessionState::Pending
) {
items.push(format!(
"- {} {} | {}",
session_state_label(&session.state),
format_session_id(&session.id),
truncate_for_dashboard(&session.task, 48)
)
})
.collect()
));
}
if items.len() >= limit {
break;
}
}
items.truncate(limit);
items
}
fn active_session_count(&self) -> usize {
@@ -1024,10 +1123,12 @@ impl Pane {
}
impl SessionSummary {
fn from_sessions(sessions: &[Session]) -> Self {
fn from_sessions(sessions: &[Session], unread_message_counts: &HashMap<String, usize>) -> Self {
sessions.iter().fold(
Self {
total: sessions.len(),
unread_messages: unread_message_counts.values().sum(),
inbox_sessions: unread_message_counts.values().filter(|count| **count > 0).count(),
..Self::default()
},
|mut summary, session| {
@@ -1045,7 +1146,7 @@ impl SessionSummary {
}
}
fn session_row(session: &Session) -> Row<'static> {
fn session_row(session: &Session, unread_messages: usize) -> Row<'static> {
Row::new(vec![
Cell::from(format_session_id(&session.id)),
Cell::from(session.agent_type.clone()),
@@ -1055,6 +1156,18 @@ fn session_row(session: &Session) -> Row<'static> {
.add_modifier(Modifier::BOLD),
),
Cell::from(session_branch(session)),
Cell::from(if unread_messages == 0 {
"-".to_string()
} else {
unread_messages.to_string()
})
.style(if unread_messages == 0 {
Style::default()
} else {
Style::default()
.fg(Color::Magenta)
.add_modifier(Modifier::BOLD)
}),
Cell::from(session.metrics.tokens_used.to_string()),
Cell::from(format_duration(session.metrics.duration_secs)),
])
@@ -1083,7 +1196,11 @@ fn summary_span(label: &str, value: usize, color: Color) -> Span<'static> {
}
fn attention_queue_line(summary: &SessionSummary) -> Line<'static> {
if summary.failed == 0 && summary.stopped == 0 && summary.pending == 0 {
if summary.failed == 0
&& summary.stopped == 0
&& summary.pending == 0
&& summary.unread_messages == 0
{
return Line::from(vec![
Span::styled(
"Attention queue clear",
@@ -1098,6 +1215,7 @@ fn attention_queue_line(summary: &SessionSummary) -> Line<'static> {
"Attention queue ",
Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD),
),
summary_span("Inbox", summary.unread_messages, Color::Magenta),
summary_span("Failed", summary.failed, Color::Red),
summary_span("Stopped", summary.stopped, Color::DarkGray),
summary_span("Pending", summary.pending, Color::Yellow),
@@ -1190,13 +1308,9 @@ mod tests {
1,
);
let rendered = render_dashboard_text(dashboard, 150, 24);
let rendered = render_dashboard_text(dashboard, 180, 24);
assert!(rendered.contains("ID"));
assert!(rendered.contains("Agent"));
assert!(rendered.contains("State"));
assert!(rendered.contains("Branch"));
assert!(rendered.contains("Tokens"));
assert!(rendered.contains("Duration"));
assert!(rendered.contains("Total 2"));
assert!(rendered.contains("Running 1"));
assert!(rendered.contains("Completed 1"));
@@ -1621,6 +1735,8 @@ mod tests {
output_rx,
sessions,
session_output_cache: HashMap::new(),
unread_message_counts: HashMap::new(),
selected_messages: Vec::new(),
logs: Vec::new(),
selected_diff_summary: None,
selected_pane: Pane::Sessions,