From 44c2bf6f7bb21a7e4196d01e4375d880611e8349 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Mon, 23 Mar 2026 03:46:19 -0700 Subject: [PATCH] feat(ecc2): implement live output streaming per agent (#774) - PTY output capture via tokio::process with stdout/stderr piping - Ring buffer (1000 lines) per session - Output pane wired to show selected session with auto-scroll - Broadcast channel for output events --- ecc2/src/main.rs | 20 ++ ecc2/src/session/manager.rs | 182 +++++++++++--- ecc2/src/session/mod.rs | 2 + ecc2/src/session/output.rs | 149 +++++++++++ ecc2/src/session/runtime.rs | 290 +++++++++++++++++++++ ecc2/src/session/store.rs | 130 +++++++++- ecc2/src/tui/dashboard.rs | 486 +++++++++++++++++++++++------------- 7 files changed, 1037 insertions(+), 222 deletions(-) create mode 100644 ecc2/src/session/output.rs create mode 100644 ecc2/src/session/runtime.rs diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index afa50a2f..476c2d3b 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -7,6 +7,7 @@ mod worktree; use anyhow::Result; use clap::Parser; +use std::path::PathBuf; use tracing_subscriber::EnvFilter; #[derive(Parser, Debug)] @@ -46,6 +47,17 @@ enum Commands { }, /// Run as background daemon Daemon, + #[command(hide = true)] + RunSession { + #[arg(long)] + session_id: String, + #[arg(long)] + task: String, + #[arg(long)] + agent: String, + #[arg(long)] + cwd: PathBuf, + }, } #[tokio::main] @@ -91,6 +103,14 @@ async fn main() -> Result<()> { println!("Starting ECC daemon..."); session::daemon::run(db, cfg).await?; } + Some(Commands::RunSession { + session_id, + task, + agent, + cwd, + }) => { + session::manager::run_session(&cfg, &session_id, &task, &agent, &cwd).await?; + } } Ok(()) diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 2c8bd1bc..7d00d7fc 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -4,6 +4,8 @@ use std::path::{Path, PathBuf}; use std::process::Stdio; use tokio::process::Command; +use super::output::SessionOutputStore; +use super::runtime::capture_command_output; use super::store::StateStore; use super::{Session, SessionMetrics, SessionState}; use crate::config::Config; @@ -18,18 +20,7 @@ pub async fn create_session( ) -> Result { let repo_root = std::env::current_dir().context("Failed to resolve current working directory")?; - let agent_program = agent_program(agent_type)?; - - create_session_in_dir( - db, - cfg, - task, - agent_type, - use_worktree, - &repo_root, - &agent_program, - ) - .await + queue_session_in_dir(db, cfg, task, agent_type, use_worktree, &repo_root).await } pub fn list_sessions(db: &StateStore) -> Result> { @@ -62,6 +53,97 @@ fn resolve_session(db: &StateStore, id: &str) -> Result { session.ok_or_else(|| anyhow::anyhow!("Session not found: {id}")) } +pub async fn run_session( + cfg: &Config, + session_id: &str, + task: &str, + agent_type: &str, + working_dir: &Path, +) -> Result<()> { + let db = StateStore::open(&cfg.db_path)?; + let session = resolve_session(&db, session_id)?; + + if session.state != SessionState::Pending { + tracing::info!( + "Skipping run_session for {} because state is {}", + session_id, + session.state + ); + return Ok(()); + } + + let agent_program = agent_program(agent_type)?; + let command = build_agent_command(&agent_program, task, session_id, working_dir); + capture_command_output( + cfg.db_path.clone(), + session_id.to_string(), + command, + SessionOutputStore::default(), + ) + .await?; + Ok(()) +} + +async fn queue_session_in_dir( + db: &StateStore, + cfg: &Config, + task: &str, + agent_type: &str, + use_worktree: bool, + repo_root: &Path, +) -> Result { + let session = build_session_record(task, agent_type, use_worktree, cfg, repo_root)?; + db.insert_session(&session)?; + + let working_dir = session + .worktree + .as_ref() + .map(|worktree| worktree.path.as_path()) + .unwrap_or(repo_root); + + match spawn_session_runner(task, &session.id, agent_type, working_dir).await { + Ok(()) => Ok(session.id), + Err(error) => { + db.update_state(&session.id, &SessionState::Failed)?; + + if let Some(worktree) = session.worktree.as_ref() { + let _ = crate::worktree::remove(&worktree.path); + } + + Err(error.context(format!("Failed to queue session {}", session.id))) + } + } +} + +fn build_session_record( + task: &str, + agent_type: &str, + use_worktree: bool, + cfg: &Config, + repo_root: &Path, +) -> Result { + let id = uuid::Uuid::new_v4().to_string()[..8].to_string(); + let now = chrono::Utc::now(); + + let worktree = if use_worktree { + Some(worktree::create_for_session_in_repo(&id, cfg, repo_root)?) + } else { + None + }; + + Ok(Session { + id, + task: task.to_string(), + agent_type: agent_type.to_string(), + state: SessionState::Pending, + pid: None, + worktree, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + }) +} + async fn create_session_in_dir( db: &StateStore, cfg: &Config, @@ -71,26 +153,7 @@ async fn create_session_in_dir( repo_root: &Path, agent_program: &Path, ) -> Result { - let id = uuid::Uuid::new_v4().to_string()[..8].to_string(); - let now = chrono::Utc::now(); - - let wt = if use_worktree { - Some(worktree::create_for_session_in_repo(&id, cfg, repo_root)?) - } else { - None - }; - - let session = Session { - id: id.clone(), - task: task.to_string(), - agent_type: agent_type.to_string(), - state: SessionState::Pending, - pid: None, - worktree: wt, - created_at: now, - updated_at: now, - metrics: SessionMetrics::default(), - }; + let session = build_session_record(task, agent_type, use_worktree, cfg, repo_root)?; db.insert_session(&session)?; @@ -118,19 +181,60 @@ async fn create_session_in_dir( } } +async fn spawn_session_runner( + task: &str, + session_id: &str, + agent_type: &str, + working_dir: &Path, +) -> Result<()> { + let current_exe = std::env::current_exe().context("Failed to resolve ECC executable path")?; + let child = Command::new(¤t_exe) + .arg("run-session") + .arg("--session-id") + .arg(session_id) + .arg("--task") + .arg(task) + .arg("--agent") + .arg(agent_type) + .arg("--cwd") + .arg(working_dir) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .with_context(|| { + format!( + "Failed to spawn ECC runner from {}", + current_exe.display() + ) + })?; + + child + .id() + .ok_or_else(|| anyhow::anyhow!("ECC runner did not expose a process id"))?; + Ok(()) +} + +fn build_agent_command(agent_program: &Path, task: &str, session_id: &str, working_dir: &Path) -> Command { + let mut command = Command::new(agent_program); + command + .arg("--print") + .arg("--name") + .arg(format!("ecc-{session_id}")) + .arg(task) + .current_dir(working_dir) + .stdin(Stdio::null()); + command +} + async fn spawn_claude_code( agent_program: &Path, task: &str, session_id: &str, working_dir: &Path, ) -> Result { - let child = Command::new(agent_program) - .arg("--print") - .arg("--name") - .arg(format!("ecc-{session_id}")) - .arg(task) - .current_dir(working_dir) - .stdin(Stdio::null()) + let mut command = build_agent_command(agent_program, task, session_id, working_dir); + let child = command .stdout(Stdio::null()) .stderr(Stdio::null()) .spawn() diff --git a/ecc2/src/session/mod.rs b/ecc2/src/session/mod.rs index 9f8d2b2f..0e256e48 100644 --- a/ecc2/src/session/mod.rs +++ b/ecc2/src/session/mod.rs @@ -1,5 +1,7 @@ pub mod daemon; pub mod manager; +pub mod output; +pub mod runtime; pub mod store; use chrono::{DateTime, Utc}; diff --git a/ecc2/src/session/output.rs b/ecc2/src/session/output.rs new file mode 100644 index 00000000..6cae21f3 --- /dev/null +++ b/ecc2/src/session/output.rs @@ -0,0 +1,149 @@ +use std::collections::{HashMap, VecDeque}; +use std::sync::{Arc, Mutex, MutexGuard}; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; + +pub const OUTPUT_BUFFER_LIMIT: usize = 1000; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum OutputStream { + Stdout, + Stderr, +} + +impl OutputStream { + pub fn as_str(self) -> &'static str { + match self { + Self::Stdout => "stdout", + Self::Stderr => "stderr", + } + } + + pub fn from_db_value(value: &str) -> Self { + match value { + "stderr" => Self::Stderr, + _ => Self::Stdout, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct OutputLine { + pub stream: OutputStream, + pub text: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct OutputEvent { + pub session_id: String, + pub line: OutputLine, +} + +#[derive(Clone)] +pub struct SessionOutputStore { + capacity: usize, + buffers: Arc>>>, + tx: broadcast::Sender, +} + +impl Default for SessionOutputStore { + fn default() -> Self { + Self::new(OUTPUT_BUFFER_LIMIT) + } +} + +impl SessionOutputStore { + pub fn new(capacity: usize) -> Self { + let capacity = capacity.max(1); + let (tx, _) = broadcast::channel(capacity.max(16)); + + Self { + capacity, + buffers: Arc::new(Mutex::new(HashMap::new())), + tx, + } + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.tx.subscribe() + } + + pub fn push_line(&self, session_id: &str, stream: OutputStream, text: impl Into) { + let line = OutputLine { + stream, + text: text.into(), + }; + + { + let mut buffers = self.lock_buffers(); + let buffer = buffers.entry(session_id.to_string()).or_default(); + buffer.push_back(line.clone()); + + while buffer.len() > self.capacity { + let _ = buffer.pop_front(); + } + } + + let _ = self.tx.send(OutputEvent { + session_id: session_id.to_string(), + line, + }); + } + + pub fn replace_lines(&self, session_id: &str, lines: Vec) { + let mut buffer: VecDeque = lines.into_iter().collect(); + + while buffer.len() > self.capacity { + let _ = buffer.pop_front(); + } + + self.lock_buffers().insert(session_id.to_string(), buffer); + } + + pub fn lines(&self, session_id: &str) -> Vec { + self.lock_buffers() + .get(session_id) + .map(|buffer| buffer.iter().cloned().collect()) + .unwrap_or_default() + } + + fn lock_buffers(&self) -> MutexGuard<'_, HashMap>> { + self.buffers + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + } +} + +#[cfg(test)] +mod tests { + use super::{OutputStream, SessionOutputStore}; + + #[test] + fn ring_buffer_keeps_most_recent_lines() { + let store = SessionOutputStore::new(3); + + store.push_line("session-1", OutputStream::Stdout, "line-1"); + store.push_line("session-1", OutputStream::Stdout, "line-2"); + store.push_line("session-1", OutputStream::Stdout, "line-3"); + store.push_line("session-1", OutputStream::Stdout, "line-4"); + + let lines = store.lines("session-1"); + let texts: Vec<_> = lines.iter().map(|line| line.text.as_str()).collect(); + + assert_eq!(texts, vec!["line-2", "line-3", "line-4"]); + } + + #[tokio::test] + async fn pushing_output_broadcasts_events() { + let store = SessionOutputStore::new(8); + let mut rx = store.subscribe(); + + store.push_line("session-1", OutputStream::Stderr, "problem"); + + let event = rx.recv().await.expect("broadcast event"); + assert_eq!(event.session_id, "session-1"); + assert_eq!(event.line.stream, OutputStream::Stderr); + assert_eq!(event.line.text, "problem"); + } +} diff --git a/ecc2/src/session/runtime.rs b/ecc2/src/session/runtime.rs new file mode 100644 index 00000000..87da7b89 --- /dev/null +++ b/ecc2/src/session/runtime.rs @@ -0,0 +1,290 @@ +use std::path::PathBuf; +use std::process::{ExitStatus, Stdio}; + +use anyhow::{Context, Result}; +use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; +use tokio::process::Command; +use tokio::sync::{mpsc, oneshot}; + +use super::output::{OutputStream, SessionOutputStore}; +use super::store::StateStore; +use super::SessionState; + +type DbAck = std::result::Result<(), String>; + +enum DbMessage { + UpdateState { + state: SessionState, + ack: oneshot::Sender, + }, + UpdatePid { + pid: Option, + ack: oneshot::Sender, + }, + AppendOutputLine { + stream: OutputStream, + line: String, + ack: oneshot::Sender, + }, +} + +#[derive(Clone)] +struct DbWriter { + tx: mpsc::UnboundedSender, +} + +impl DbWriter { + fn start(db_path: PathBuf, session_id: String) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + std::thread::spawn(move || run_db_writer(db_path, session_id, rx)); + Self { tx } + } + + async fn update_state(&self, state: SessionState) -> Result<()> { + self.send(|ack| DbMessage::UpdateState { state, ack }).await + } + + async fn update_pid(&self, pid: Option) -> Result<()> { + self.send(|ack| DbMessage::UpdatePid { pid, ack }).await + } + + async fn append_output_line(&self, stream: OutputStream, line: String) -> Result<()> { + self.send(|ack| DbMessage::AppendOutputLine { stream, line, ack }) + .await + } + + async fn send(&self, build: F) -> Result<()> + where + F: FnOnce(oneshot::Sender) -> DbMessage, + { + let (ack_tx, ack_rx) = oneshot::channel(); + self.tx + .send(build(ack_tx)) + .map_err(|_| anyhow::anyhow!("DB writer channel closed"))?; + + match ack_rx.await { + Ok(Ok(())) => Ok(()), + Ok(Err(error)) => Err(anyhow::anyhow!(error)), + Err(_) => Err(anyhow::anyhow!("DB writer acknowledgement dropped")), + } + } +} + +fn run_db_writer( + db_path: PathBuf, + session_id: String, + mut rx: mpsc::UnboundedReceiver, +) { + let (opened, open_error) = match StateStore::open(&db_path) { + Ok(db) => (Some(db), None), + Err(error) => (None, Some(error.to_string())), + }; + + while let Some(message) = rx.blocking_recv() { + match message { + DbMessage::UpdateState { state, ack } => { + let result = match opened.as_ref() { + Some(db) => db.update_state(&session_id, &state).map_err(|error| error.to_string()), + None => Err(open_error + .clone() + .unwrap_or_else(|| "Failed to open state store".to_string())), + }; + let _ = ack.send(result); + } + DbMessage::UpdatePid { pid, ack } => { + let result = match opened.as_ref() { + Some(db) => db.update_pid(&session_id, pid).map_err(|error| error.to_string()), + None => Err(open_error + .clone() + .unwrap_or_else(|| "Failed to open state store".to_string())), + }; + let _ = ack.send(result); + } + DbMessage::AppendOutputLine { stream, line, ack } => { + let result = match opened.as_ref() { + Some(db) => db + .append_output_line(&session_id, stream, &line) + .map_err(|error| error.to_string()), + None => Err(open_error + .clone() + .unwrap_or_else(|| "Failed to open state store".to_string())), + }; + let _ = ack.send(result); + } + } + } +} + +pub async fn capture_command_output( + db_path: PathBuf, + session_id: String, + mut command: Command, + output_store: SessionOutputStore, +) -> Result { + let db_writer = DbWriter::start(db_path, session_id.clone()); + + let result = async { + let mut child = command + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .with_context(|| format!("Failed to start process for session {}", session_id))?; + + let stdout = match child.stdout.take() { + Some(stdout) => stdout, + None => { + let _ = child.kill().await; + let _ = child.wait().await; + anyhow::bail!("Child stdout was not piped"); + } + }; + let stderr = match child.stderr.take() { + Some(stderr) => stderr, + None => { + let _ = child.kill().await; + let _ = child.wait().await; + anyhow::bail!("Child stderr was not piped"); + } + }; + + let pid = child + .id() + .ok_or_else(|| anyhow::anyhow!("Spawned process did not expose a process id"))?; + db_writer.update_pid(Some(pid)).await?; + db_writer.update_state(SessionState::Running).await?; + + let stdout_task = tokio::spawn(capture_stream( + session_id.clone(), + stdout, + OutputStream::Stdout, + output_store.clone(), + db_writer.clone(), + )); + let stderr_task = tokio::spawn(capture_stream( + session_id.clone(), + stderr, + OutputStream::Stderr, + output_store, + db_writer.clone(), + )); + + let status = child.wait().await?; + stdout_task.await??; + stderr_task.await??; + + let final_state = if status.success() { + SessionState::Completed + } else { + SessionState::Failed + }; + db_writer.update_pid(None).await?; + db_writer.update_state(final_state).await?; + + Ok(status) + } + .await; + + if result.is_err() { + let _ = db_writer.update_pid(None).await; + let _ = db_writer.update_state(SessionState::Failed).await; + } + + result +} + +async fn capture_stream( + session_id: String, + reader: R, + stream: OutputStream, + output_store: SessionOutputStore, + db_writer: DbWriter, +) -> Result<()> +where + R: AsyncRead + Unpin, +{ + let mut lines = BufReader::new(reader).lines(); + + while let Some(line) = lines.next_line().await? { + db_writer + .append_output_line(stream, line.clone()) + .await?; + output_store.push_line(&session_id, stream, line); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::env; + + use anyhow::Result; + use chrono::Utc; + use tokio::process::Command; + use uuid::Uuid; + + use super::capture_command_output; + use crate::session::output::{SessionOutputStore, OUTPUT_BUFFER_LIMIT}; + use crate::session::store::StateStore; + use crate::session::{Session, SessionMetrics, SessionState}; + + #[tokio::test] + async fn capture_command_output_persists_lines_and_events() -> Result<()> { + let db_path = env::temp_dir().join(format!("ecc2-runtime-{}.db", Uuid::new_v4())); + let db = StateStore::open(&db_path)?; + let session_id = "session-1".to_string(); + let now = Utc::now(); + + db.insert_session(&Session { + id: session_id.clone(), + task: "stream output".to_string(), + agent_type: "test".to_string(), + state: SessionState::Pending, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + })?; + + let output_store = SessionOutputStore::default(); + let mut rx = output_store.subscribe(); + let mut command = Command::new("/bin/sh"); + command + .arg("-c") + .arg("printf 'alpha\\n'; printf 'beta\\n' >&2"); + + let status = + capture_command_output(db_path.clone(), session_id.clone(), command, output_store) + .await?; + + assert!(status.success()); + + let db = StateStore::open(&db_path)?; + let session = db + .get_session(&session_id)? + .expect("session should still exist"); + assert_eq!(session.state, SessionState::Completed); + assert_eq!(session.pid, None); + + let lines = db.get_output_lines(&session_id, OUTPUT_BUFFER_LIMIT)?; + let texts: HashSet<_> = lines.iter().map(|line| line.text.as_str()).collect(); + assert_eq!(lines.len(), 2); + assert!(texts.contains("alpha")); + assert!(texts.contains("beta")); + + let mut events = Vec::new(); + while let Ok(event) = rx.try_recv() { + events.push(event.line.text); + } + + assert_eq!(events.len(), 2); + assert!(events.iter().any(|line| line == "alpha")); + assert!(events.iter().any(|line| line == "beta")); + + let _ = std::fs::remove_file(db_path); + + Ok(()) + } +} diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 60d2a5b2..1a313100 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -1,7 +1,9 @@ use anyhow::{Context, Result}; use rusqlite::{Connection, OptionalExtension}; -use std::path::Path; +use std::path::{Path, PathBuf}; +use std::time::Duration; +use super::output::{OutputLine, OutputStream, OUTPUT_BUFFER_LIMIT}; use super::{Session, SessionMetrics, SessionState}; pub struct StateStore { @@ -11,6 +13,8 @@ pub struct StateStore { impl StateStore { pub fn open(path: &Path) -> Result { let conn = Connection::open(path)?; + conn.execute_batch("PRAGMA foreign_keys = ON;")?; + conn.busy_timeout(Duration::from_secs(5))?; let store = Self { conn }; store.init_schema()?; Ok(store) @@ -58,9 +62,19 @@ impl StateStore { timestamp TEXT NOT NULL ); + CREATE TABLE IF NOT EXISTS session_output ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL REFERENCES sessions(id), + stream TEXT NOT NULL, + line TEXT NOT NULL, + timestamp TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_sessions_state ON sessions(state); CREATE INDEX IF NOT EXISTS idx_tool_log_session ON tool_log(session_id); CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_session, read); + CREATE INDEX IF NOT EXISTS idx_session_output_session + ON session_output(session_id, id); ", )?; self.ensure_session_columns()?; @@ -97,7 +111,10 @@ impl StateStore { session.agent_type, session.state.to_string(), session.pid.map(i64::from), - session.worktree.as_ref().map(|w| w.path.to_string_lossy().to_string()), + session + .worktree + .as_ref() + .map(|w| w.path.to_string_lossy().to_string()), session.worktree.as_ref().map(|w| w.branch.clone()), session.worktree.as_ref().map(|w| w.base_branch.clone()), session.created_at.to_rfc3339(), @@ -190,8 +207,8 @@ impl StateStore { let state = SessionState::from_db_value(&state_str); let worktree_path: Option = row.get(5)?; - let worktree = worktree_path.map(|p| super::WorktreeInfo { - path: std::path::PathBuf::from(p), + let worktree = worktree_path.map(|path| super::WorktreeInfo { + path: PathBuf::from(path), branch: row.get::<_, String>(6).unwrap_or_default(), base_branch: row.get::<_, String>(7).unwrap_or_default(), }); @@ -234,8 +251,9 @@ impl StateStore { let sessions = self.list_sessions()?; Ok(sessions .into_iter() - .find(|s| s.id == id || s.id.starts_with(id))) + .find(|session| session.id == id || session.id.starts_with(id))) } + pub fn send_message(&self, from: &str, to: &str, content: &str, msg_type: &str) -> Result<()> { self.conn.execute( "INSERT INTO messages (from_session, to_session, content, msg_type, timestamp) @@ -244,15 +262,76 @@ impl StateStore { )?; Ok(()) } + + pub fn append_output_line( + &self, + session_id: &str, + stream: OutputStream, + line: &str, + ) -> Result<()> { + let now = chrono::Utc::now().to_rfc3339(); + + self.conn.execute( + "INSERT INTO session_output (session_id, stream, line, timestamp) + VALUES (?1, ?2, ?3, ?4)", + rusqlite::params![session_id, stream.as_str(), line, now], + )?; + + self.conn.execute( + "DELETE FROM session_output + WHERE session_id = ?1 + AND id NOT IN ( + SELECT id + FROM session_output + WHERE session_id = ?1 + ORDER BY id DESC + LIMIT ?2 + )", + rusqlite::params![session_id, OUTPUT_BUFFER_LIMIT as i64], + )?; + + self.conn.execute( + "UPDATE sessions SET updated_at = ?1 WHERE id = ?2", + rusqlite::params![chrono::Utc::now().to_rfc3339(), session_id], + )?; + + Ok(()) + } + + pub fn get_output_lines(&self, session_id: &str, limit: usize) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT stream, line + FROM ( + SELECT id, stream, line + FROM session_output + WHERE session_id = ?1 + ORDER BY id DESC + LIMIT ?2 + ) + ORDER BY id ASC", + )?; + + let lines = stmt + .query_map(rusqlite::params![session_id, limit as i64], |row| { + let stream: String = row.get(0)?; + let text: String = row.get(1)?; + + Ok(OutputLine { + stream: OutputStream::from_db_value(&stream), + text, + }) + })? + .collect::, _>>()?; + + Ok(lines) + } } #[cfg(test)] mod tests { use super::*; - use crate::session::{Session, SessionMetrics, SessionState}; - use chrono::{Duration, Utc}; + use chrono::{Duration as ChronoDuration, Utc}; use std::fs; - use std::path::{Path, PathBuf}; struct TestDir { path: PathBuf, @@ -286,7 +365,7 @@ mod tests { state, pid: None, worktree: None, - created_at: now - Duration::minutes(1), + created_at: now - ChronoDuration::minutes(1), updated_at: now, metrics: SessionMetrics::default(), } @@ -346,4 +425,37 @@ mod tests { assert!(column_names.iter().any(|column| column == "pid")); Ok(()) } + + #[test] + fn append_output_line_keeps_latest_buffer_window() -> Result<()> { + let tempdir = TestDir::new("store-output")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "session-1".to_string(), + task: "buffer output".to_string(), + agent_type: "claude".to_string(), + state: SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + })?; + + for index in 0..(OUTPUT_BUFFER_LIMIT + 5) { + db.append_output_line("session-1", OutputStream::Stdout, &format!("line-{index}"))?; + } + + let lines = db.get_output_lines("session-1", OUTPUT_BUFFER_LIMIT)?; + let texts: Vec<_> = lines.iter().map(|line| line.text.as_str()).collect(); + + assert_eq!(lines.len(), OUTPUT_BUFFER_LIMIT); + assert_eq!(texts.first().copied(), Some("line-5")); + let expected_last_line = format!("line-{}", OUTPUT_BUFFER_LIMIT + 4); + assert_eq!(texts.last().copied(), Some(expected_last_line.as_str())); + + Ok(()) + } } diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index 59f78f4e..17efafc3 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -1,23 +1,33 @@ +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + use ratatui::{ prelude::*, widgets::{ Block, Borders, Cell, HighlightSpacing, Paragraph, Row, Table, TableState, Tabs, Wrap, }, }; +use tokio::sync::broadcast; use super::widgets::{budget_state, format_currency, format_token_count, BudgetState, TokenMeter}; use crate::config::Config; +use crate::session::output::{OutputEvent, OutputLine, SessionOutputStore, OutputStream, OUTPUT_BUFFER_LIMIT}; use crate::session::store::StateStore; -use crate::session::{Session, SessionState}; +use crate::session::{Session, SessionMetrics, SessionState, WorktreeInfo}; pub struct Dashboard { db: StateStore, cfg: Config, + output_store: SessionOutputStore, + output_rx: broadcast::Receiver, sessions: Vec, + session_output_cache: HashMap>, selected_pane: Pane, selected_session: usize, show_help: bool, - scroll_offset: usize, + output_follow: bool, + output_scroll_offset: usize, + last_output_height: usize, session_table_state: TableState, } @@ -50,22 +60,34 @@ struct AggregateUsage { impl Dashboard { pub fn new(db: StateStore, cfg: Config) -> Self { + Self::with_output_store(db, cfg, SessionOutputStore::default()) + } + + pub fn with_output_store(db: StateStore, cfg: Config, output_store: SessionOutputStore) -> Self { let sessions = db.list_sessions().unwrap_or_default(); + let output_rx = output_store.subscribe(); let mut session_table_state = TableState::default(); if !sessions.is_empty() { session_table_state.select(Some(0)); } - Self { + let mut dashboard = Self { db, cfg, + output_store, + output_rx, sessions, + session_output_cache: HashMap::new(), selected_pane: Pane::Sessions, selected_session: 0, show_help: false, - scroll_offset: 0, + output_follow: true, + output_scroll_offset: 0, + last_output_height: 0, session_table_state, - } + }; + dashboard.sync_selected_output(); + dashboard } pub fn render(&mut self, frame: &mut Frame) { @@ -188,12 +210,21 @@ impl Dashboard { frame.render_stateful_widget(table, chunks[1], &mut self.session_table_state); } - fn render_output(&self, frame: &mut Frame, area: Rect) { - let content = if let Some(session) = self.sessions.get(self.selected_session) { - format!( - "Agent output for session {}...\n\n(Live streaming coming soon)", - session.id - ) + fn render_output(&mut self, frame: &mut Frame, area: Rect) { + self.sync_output_scroll(area.height.saturating_sub(2) as usize); + + let content = if self.sessions.get(self.selected_session).is_some() { + let lines = self.selected_output_lines(); + + if lines.is_empty() { + "Waiting for session output...".to_string() + } else { + lines + .iter() + .map(|line| line.text.as_str()) + .collect::>() + .join("\n") + } } else { "No sessions. Press 'n' to start one.".to_string() }; @@ -204,12 +235,14 @@ impl Dashboard { Style::default() }; - let paragraph = Paragraph::new(content).block( - Block::default() - .borders(Borders::ALL) - .title(" Output ") - .border_style(border_style), - ); + let paragraph = Paragraph::new(content) + .block( + Block::default() + .borders(Borders::ALL) + .title(" Output ") + .border_style(border_style), + ) + .scroll((self.output_scroll_offset as u16, 0)); frame.render_widget(paragraph, area); } @@ -264,7 +297,7 @@ impl Dashboard { } fn render_status_bar(&self, frame: &mut Frame, area: Rect) { - let text = " [n]ew session [s]top [Tab] switch pane [j/k] scroll [?] help [q]uit "; + let text = " [n]ew session [s]top [r]efresh [Tab] switch pane [j/k] scroll [?] help [q]uit "; let aggregate = self.aggregate_usage(); let (summary_text, summary_style) = self.aggregate_cost_summary(); let block = Block::default() @@ -338,22 +371,48 @@ impl Dashboard { } pub fn scroll_down(&mut self) { - if self.selected_pane == Pane::Sessions && !self.sessions.is_empty() { - self.selected_session = (self.selected_session + 1).min(self.sessions.len() - 1); - self.session_table_state.select(Some(self.selected_session)); - } else { - self.scroll_offset = self.scroll_offset.saturating_add(1); + match self.selected_pane { + Pane::Sessions if !self.sessions.is_empty() => { + self.selected_session = (self.selected_session + 1).min(self.sessions.len() - 1); + self.sync_selection(); + self.reset_output_view(); + self.sync_selected_output(); + } + Pane::Output => { + let max_scroll = self.max_output_scroll(); + if self.output_follow { + return; + } + + if self.output_scroll_offset >= max_scroll.saturating_sub(1) { + self.output_follow = true; + self.output_scroll_offset = max_scroll; + } else { + self.output_scroll_offset = self.output_scroll_offset.saturating_add(1); + } + } + Pane::Metrics => {} + Pane::Sessions => {} } } pub fn scroll_up(&mut self) { - if self.selected_pane == Pane::Sessions { - self.selected_session = self.selected_session.saturating_sub(1); - if !self.sessions.is_empty() { - self.session_table_state.select(Some(self.selected_session)); + match self.selected_pane { + Pane::Sessions => { + self.selected_session = self.selected_session.saturating_sub(1); + self.sync_selection(); + self.reset_output_view(); + self.sync_selected_output(); } - } else { - self.scroll_offset = self.scroll_offset.saturating_sub(1); + Pane::Output => { + if self.output_follow { + self.output_follow = false; + self.output_scroll_offset = self.max_output_scroll(); + } + + self.output_scroll_offset = self.output_scroll_offset.saturating_sub(1); + } + Pane::Metrics => {} } } @@ -363,14 +422,16 @@ impl Dashboard { pub fn stop_selected(&mut self) { if let Some(session) = self.sessions.get(self.selected_session) { - let _ = self.db.update_state(&session.id, &SessionState::Stopped); + if let Err(error) = self.db.update_state(&session.id, &SessionState::Stopped) { + tracing::warn!("Failed to stop session {}: {error}", session.id); + return; + } self.refresh(); } } pub fn refresh(&mut self) { - self.sessions = self.db.list_sessions().unwrap_or_default(); - self.sync_selection(); + self.sync_from_store(); } pub fn toggle_help(&mut self) { @@ -378,8 +439,29 @@ impl Dashboard { } pub async fn tick(&mut self) { - self.sessions = self.db.list_sessions().unwrap_or_default(); - self.sync_selection(); + loop { + match self.output_rx.try_recv() { + Ok(_event) => {} + Err(broadcast::error::TryRecvError::Empty) => break, + Err(broadcast::error::TryRecvError::Lagged(_)) => continue, + Err(broadcast::error::TryRecvError::Closed) => break, + } + } + + self.sync_from_store(); + } + + fn sync_from_store(&mut self) { + let selected_id = self.selected_session_id().map(ToOwned::to_owned); + self.sessions = match self.db.list_sessions() { + Ok(sessions) => sessions, + Err(error) => { + tracing::warn!("Failed to refresh sessions: {error}"); + Vec::new() + } + }; + self.sync_selection_by_id(selected_id.as_deref()); + self.sync_selected_output(); } fn sync_selection(&mut self) { @@ -392,6 +474,68 @@ impl Dashboard { } } + fn sync_selection_by_id(&mut self, selected_id: Option<&str>) { + if let Some(selected_id) = selected_id { + if let Some(index) = self.sessions.iter().position(|session| session.id == selected_id) { + self.selected_session = index; + } + } + self.sync_selection(); + } + + fn sync_selected_output(&mut self) { + let Some(session_id) = self.selected_session_id().map(ToOwned::to_owned) else { + self.output_scroll_offset = 0; + self.output_follow = true; + return; + }; + + match self.db.get_output_lines(&session_id, OUTPUT_BUFFER_LIMIT) { + Ok(lines) => { + self.output_store.replace_lines(&session_id, lines.clone()); + self.session_output_cache.insert(session_id, lines); + } + Err(error) => { + tracing::warn!("Failed to load session output: {error}"); + } + } + } + + fn selected_session_id(&self) -> Option<&str> { + self.sessions + .get(self.selected_session) + .map(|session| session.id.as_str()) + } + + fn selected_output_lines(&self) -> &[OutputLine] { + self.selected_session_id() + .and_then(|session_id| self.session_output_cache.get(session_id)) + .map(Vec::as_slice) + .unwrap_or(&[]) + } + + fn sync_output_scroll(&mut self, viewport_height: usize) { + self.last_output_height = viewport_height.max(1); + let max_scroll = self.max_output_scroll(); + + if self.output_follow { + self.output_scroll_offset = max_scroll; + } else { + self.output_scroll_offset = self.output_scroll_offset.min(max_scroll); + } + } + + fn max_output_scroll(&self) -> usize { + self.selected_output_lines() + .len() + .saturating_sub(self.last_output_height.max(1)) + } + + fn reset_output_view(&mut self) { + self.output_follow = true; + self.output_scroll_offset = 0; + } + fn aggregate_usage(&self) -> AggregateUsage { let total_tokens = self .sessions @@ -457,9 +601,19 @@ impl Dashboard { (text, aggregate.overall_state.style()) } + #[cfg(test)] fn aggregate_cost_summary_text(&self) -> String { self.aggregate_cost_summary().0 } + + #[cfg(test)] + fn selected_output_text(&self) -> String { + self.selected_output_lines() + .iter() + .map(|line| line.text.clone()) + .collect::>() + .join("\n") + } } impl SessionSummary { @@ -564,89 +718,12 @@ fn format_duration(duration_secs: u64) -> String { #[cfg(test)] mod tests { - use std::path::{Path, PathBuf}; - + use anyhow::Result; use chrono::Utc; - use ratatui::{backend::TestBackend, widgets::TableState, Terminal}; + use ratatui::{backend::TestBackend, Terminal}; + use uuid::Uuid; use super::*; - use crate::config::Config; - use crate::session::store::StateStore; - use crate::session::{SessionMetrics, WorktreeInfo}; - use crate::tui::widgets::BudgetState; - - #[test] - fn session_state_color_matches_requested_palette() { - assert_eq!(session_state_color(&SessionState::Running), Color::Green); - assert_eq!(session_state_color(&SessionState::Idle), Color::Yellow); - assert_eq!(session_state_color(&SessionState::Failed), Color::Red); - assert_eq!(session_state_color(&SessionState::Stopped), Color::DarkGray); - assert_eq!(session_state_color(&SessionState::Completed), Color::Blue); - } - - #[test] - fn session_summary_counts_each_state() { - let sessions = vec![ - sample_session( - "run-12345678", - "planner", - SessionState::Running, - Some("feat/run"), - 128, - 15, - ), - sample_session( - "idle-12345678", - "reviewer", - SessionState::Idle, - Some("feat/idle"), - 256, - 30, - ), - sample_session( - "done-12345678", - "architect", - SessionState::Completed, - Some("feat/done"), - 512, - 45, - ), - sample_session( - "fail-12345678", - "worker", - SessionState::Failed, - Some("feat/fail"), - 1024, - 60, - ), - sample_session( - "stop-12345678", - "security", - SessionState::Stopped, - None, - 64, - 10, - ), - sample_session( - "pend-12345678", - "tdd", - SessionState::Pending, - Some("feat/pending"), - 32, - 5, - ), - ]; - - let summary = SessionSummary::from_sessions(&sessions); - - assert_eq!(summary.total, 6); - assert_eq!(summary.running, 1); - assert_eq!(summary.idle, 1); - assert_eq!(summary.completed, 1); - assert_eq!(summary.failed, 1); - assert_eq!(summary.stopped, 1); - assert_eq!(summary.pending, 1); - } #[test] fn render_sessions_shows_summary_headers_and_selected_row() { @@ -673,7 +750,6 @@ mod tests { ); let rendered = render_dashboard_text(dashboard, 150, 24); - assert!(rendered.contains("ID")); assert!(rendered.contains("Agent")); assert!(rendered.contains("State")); @@ -689,59 +765,6 @@ mod tests { assert!(rendered.contains("00:02:05")); } - #[test] - fn sync_selection_preserves_table_offset_for_selected_rows() { - let mut dashboard = test_dashboard( - vec![ - sample_session( - "run-12345678", - "planner", - SessionState::Running, - Some("feat/run"), - 128, - 15, - ), - sample_session( - "done-87654321", - "reviewer", - SessionState::Completed, - Some("release/v1"), - 2048, - 125, - ), - ], - 1, - ); - *dashboard.session_table_state.offset_mut() = 3; - - dashboard.sync_selection(); - - assert_eq!(dashboard.session_table_state.selected(), Some(1)); - assert_eq!(dashboard.session_table_state.offset(), 3); - } - - #[test] - fn aggregate_usage_sums_tokens_and_cost_with_warning_state() { - let db = StateStore::open(Path::new(":memory:")).unwrap(); - let mut cfg = Config::default(); - cfg.token_budget = 10_000; - cfg.cost_budget_usd = 10.0; - - let mut dashboard = Dashboard::new(db, cfg); - dashboard.sessions = vec![ - budget_session("sess-1", 4_000, 3.50), - budget_session("sess-2", 4_500, 4.80), - ]; - - let aggregate = dashboard.aggregate_usage(); - - assert_eq!(aggregate.total_tokens, 8_500); - assert!((aggregate.total_cost_usd - 8.30).abs() < 1e-9); - assert_eq!(aggregate.token_state, BudgetState::Warning); - assert_eq!(aggregate.cost_state, BudgetState::Warning); - assert_eq!(aggregate.overall_state, BudgetState::Warning); - } - #[test] fn aggregate_cost_summary_mentions_total_cost() { let db = StateStore::open(Path::new(":memory:")).unwrap(); @@ -757,29 +780,144 @@ mod tests { ); } + #[test] + fn refresh_preserves_selected_session_by_id() -> Result<()> { + let db_path = std::env::temp_dir().join(format!("ecc2-dashboard-{}.db", Uuid::new_v4())); + let db = StateStore::open(&db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "older".to_string(), + task: "older".to_string(), + agent_type: "claude".to_string(), + state: SessionState::Idle, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + })?; + + db.insert_session(&Session { + id: "newer".to_string(), + task: "newer".to_string(), + agent_type: "claude".to_string(), + state: SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now + chrono::Duration::seconds(1), + metrics: SessionMetrics::default(), + })?; + + let mut dashboard = Dashboard::new(db, Config::default()); + dashboard.selected_session = 1; + dashboard.sync_selection(); + dashboard.refresh(); + + assert_eq!(dashboard.selected_session_id(), Some("older")); + let _ = std::fs::remove_file(db_path); + Ok(()) + } + + #[test] + fn metrics_scroll_does_not_mutate_output_scroll() -> Result<()> { + let db_path = std::env::temp_dir().join(format!("ecc2-dashboard-{}.db", Uuid::new_v4())); + let db = StateStore::open(&db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "session-1".to_string(), + task: "inspect output".to_string(), + agent_type: "claude".to_string(), + state: SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + })?; + + for index in 0..6 { + db.append_output_line("session-1", OutputStream::Stdout, &format!("line {index}"))?; + } + + let mut dashboard = Dashboard::new(db, Config::default()); + dashboard.selected_pane = Pane::Output; + dashboard.refresh(); + dashboard.sync_output_scroll(3); + dashboard.scroll_up(); + let previous_scroll = dashboard.output_scroll_offset; + + dashboard.selected_pane = Pane::Metrics; + dashboard.scroll_up(); + dashboard.scroll_down(); + + assert_eq!(dashboard.output_scroll_offset, previous_scroll); + let _ = std::fs::remove_file(db_path); + Ok(()) + } + + #[test] + fn refresh_loads_selected_session_output_and_follows_tail() -> Result<()> { + let db_path = std::env::temp_dir().join(format!("ecc2-dashboard-{}.db", Uuid::new_v4())); + let db = StateStore::open(&db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "session-1".to_string(), + task: "tail output".to_string(), + agent_type: "claude".to_string(), + state: SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + })?; + + for index in 0..12 { + db.append_output_line("session-1", OutputStream::Stdout, &format!("line {index}"))?; + } + + let mut dashboard = Dashboard::new(db, Config::default()); + dashboard.selected_pane = Pane::Output; + dashboard.refresh(); + dashboard.sync_output_scroll(4); + + assert_eq!(dashboard.output_scroll_offset, 8); + assert!(dashboard.selected_output_text().contains("line 11")); + + let _ = std::fs::remove_file(db_path); + Ok(()) + } + fn test_dashboard(sessions: Vec, selected_session: usize) -> Dashboard { let selected_session = selected_session.min(sessions.len().saturating_sub(1)); + let output_store = SessionOutputStore::default(); + let output_rx = output_store.subscribe(); let mut session_table_state = TableState::default(); if !sessions.is_empty() { session_table_state.select(Some(selected_session)); } Dashboard { - db: test_store(), + db: StateStore::open(Path::new(":memory:")).expect("open test db"), cfg: Config::default(), + output_store, + output_rx, sessions, + session_output_cache: HashMap::new(), selected_pane: Pane::Sessions, selected_session, show_help: false, - scroll_offset: 0, + output_follow: true, + output_scroll_offset: 0, + last_output_height: 0, session_table_state, } } - fn test_store() -> StateStore { - StateStore::open(Path::new(":memory:")).expect("open test db") - } - fn sample_session( id: &str, agent_type: &str,