From 148fc726cb20e1a0b592595c56a3adb9771c88db Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Mon, 23 Mar 2026 03:46:19 -0700 Subject: [PATCH] feat(ecc2): implement live output streaming per agent (#774) - PTY output capture via tokio::process with stdout/stderr piping - Ring buffer (1000 lines) per session - Output pane wired to show selected session with auto-scroll - Broadcast channel for output events --- ecc2/src/session/mod.rs | 2 + ecc2/src/session/output.rs | 149 ++++++++++++++++++ ecc2/src/session/runtime.rs | 176 ++++++++++++++++++++++ ecc2/src/session/store.rs | 135 ++++++++++++++++- ecc2/src/tui/dashboard.rs | 291 ++++++++++++++++++++++++++++++++---- 5 files changed, 719 insertions(+), 34 deletions(-) create mode 100644 ecc2/src/session/output.rs create mode 100644 ecc2/src/session/runtime.rs diff --git a/ecc2/src/session/mod.rs b/ecc2/src/session/mod.rs index 67aeb05a..ce03311a 100644 --- a/ecc2/src/session/mod.rs +++ b/ecc2/src/session/mod.rs @@ -1,5 +1,7 @@ pub mod daemon; pub mod manager; +pub mod output; +pub mod runtime; pub mod store; use chrono::{DateTime, Utc}; diff --git a/ecc2/src/session/output.rs b/ecc2/src/session/output.rs new file mode 100644 index 00000000..6cae21f3 --- /dev/null +++ b/ecc2/src/session/output.rs @@ -0,0 +1,149 @@ +use std::collections::{HashMap, VecDeque}; +use std::sync::{Arc, Mutex, MutexGuard}; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; + +pub const OUTPUT_BUFFER_LIMIT: usize = 1000; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum OutputStream { + Stdout, + Stderr, +} + +impl OutputStream { + pub fn as_str(self) -> &'static str { + match self { + Self::Stdout => "stdout", + Self::Stderr => "stderr", + } + } + + pub fn from_db_value(value: &str) -> Self { + match value { + "stderr" => Self::Stderr, + _ => Self::Stdout, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct OutputLine { + pub stream: OutputStream, + pub text: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct OutputEvent { + pub session_id: String, + pub line: OutputLine, +} + +#[derive(Clone)] +pub struct SessionOutputStore { + capacity: usize, + buffers: Arc>>>, + tx: broadcast::Sender, +} + +impl Default for SessionOutputStore { + fn default() -> Self { + Self::new(OUTPUT_BUFFER_LIMIT) + } +} + +impl SessionOutputStore { + pub fn new(capacity: usize) -> Self { + let capacity = capacity.max(1); + let (tx, _) = broadcast::channel(capacity.max(16)); + + Self { + capacity, + buffers: Arc::new(Mutex::new(HashMap::new())), + tx, + } + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.tx.subscribe() + } + + pub fn push_line(&self, session_id: &str, stream: OutputStream, text: impl Into) { + let line = OutputLine { + stream, + text: text.into(), + }; + + { + let mut buffers = self.lock_buffers(); + let buffer = buffers.entry(session_id.to_string()).or_default(); + buffer.push_back(line.clone()); + + while buffer.len() > self.capacity { + let _ = buffer.pop_front(); + } + } + + let _ = self.tx.send(OutputEvent { + session_id: session_id.to_string(), + line, + }); + } + + pub fn replace_lines(&self, session_id: &str, lines: Vec) { + let mut buffer: VecDeque = lines.into_iter().collect(); + + while buffer.len() > self.capacity { + let _ = buffer.pop_front(); + } + + self.lock_buffers().insert(session_id.to_string(), buffer); + } + + pub fn lines(&self, session_id: &str) -> Vec { + self.lock_buffers() + .get(session_id) + .map(|buffer| buffer.iter().cloned().collect()) + .unwrap_or_default() + } + + fn lock_buffers(&self) -> MutexGuard<'_, HashMap>> { + self.buffers + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + } +} + +#[cfg(test)] +mod tests { + use super::{OutputStream, SessionOutputStore}; + + #[test] + fn ring_buffer_keeps_most_recent_lines() { + let store = SessionOutputStore::new(3); + + store.push_line("session-1", OutputStream::Stdout, "line-1"); + store.push_line("session-1", OutputStream::Stdout, "line-2"); + store.push_line("session-1", OutputStream::Stdout, "line-3"); + store.push_line("session-1", OutputStream::Stdout, "line-4"); + + let lines = store.lines("session-1"); + let texts: Vec<_> = lines.iter().map(|line| line.text.as_str()).collect(); + + assert_eq!(texts, vec!["line-2", "line-3", "line-4"]); + } + + #[tokio::test] + async fn pushing_output_broadcasts_events() { + let store = SessionOutputStore::new(8); + let mut rx = store.subscribe(); + + store.push_line("session-1", OutputStream::Stderr, "problem"); + + let event = rx.recv().await.expect("broadcast event"); + assert_eq!(event.session_id, "session-1"); + assert_eq!(event.line.stream, OutputStream::Stderr); + assert_eq!(event.line.text, "problem"); + } +} diff --git a/ecc2/src/session/runtime.rs b/ecc2/src/session/runtime.rs new file mode 100644 index 00000000..f82197ce --- /dev/null +++ b/ecc2/src/session/runtime.rs @@ -0,0 +1,176 @@ +use std::path::{Path, PathBuf}; +use std::process::ExitStatus; +use std::process::Stdio; + +use anyhow::{Context, Result}; +use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; +use tokio::process::Command; + +use super::output::{OutputStream, SessionOutputStore}; +use super::store::StateStore; +use super::SessionState; + +pub async fn capture_command_output( + db_path: PathBuf, + session_id: String, + mut command: Command, + output_store: SessionOutputStore, +) -> Result { + let result = async { + let mut child = command + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .with_context(|| format!("Failed to start process for session {}", session_id))?; + + update_session_state(&db_path, &session_id, SessionState::Running)?; + + let stdout = child.stdout.take().context("Child stdout was not piped")?; + let stderr = child.stderr.take().context("Child stderr was not piped")?; + + let stdout_task = tokio::spawn(capture_stream( + db_path.clone(), + session_id.clone(), + stdout, + OutputStream::Stdout, + output_store.clone(), + )); + let stderr_task = tokio::spawn(capture_stream( + db_path.clone(), + session_id.clone(), + stderr, + OutputStream::Stderr, + output_store, + )); + + let status = child.wait().await?; + stdout_task.await??; + stderr_task.await??; + + let final_state = if status.success() { + SessionState::Completed + } else { + SessionState::Failed + }; + update_session_state(&db_path, &session_id, final_state)?; + + Ok(status) + } + .await; + + if result.is_err() { + let _ = update_session_state(&db_path, &session_id, SessionState::Failed); + } + + result +} + +async fn capture_stream( + db_path: PathBuf, + session_id: String, + reader: R, + stream: OutputStream, + output_store: SessionOutputStore, +) -> Result<()> +where + R: AsyncRead + Unpin, +{ + let mut lines = BufReader::new(reader).lines(); + + while let Some(line) = lines.next_line().await? { + output_store.push_line(&session_id, stream, line.clone()); + append_output_line(&db_path, &session_id, stream, &line)?; + } + + Ok(()) +} + +fn append_output_line( + db_path: &Path, + session_id: &str, + stream: OutputStream, + line: &str, +) -> Result<()> { + let db = StateStore::open(db_path)?; + db.append_output_line(session_id, stream, line)?; + Ok(()) +} + +fn update_session_state(db_path: &Path, session_id: &str, state: SessionState) -> Result<()> { + let db = StateStore::open(db_path)?; + db.update_state(session_id, &state)?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::env; + + use anyhow::Result; + use chrono::Utc; + use tokio::process::Command; + use uuid::Uuid; + + use super::capture_command_output; + use crate::session::output::{SessionOutputStore, OUTPUT_BUFFER_LIMIT}; + use crate::session::store::StateStore; + use crate::session::{Session, SessionMetrics, SessionState}; + + #[tokio::test] + async fn capture_command_output_persists_lines_and_events() -> Result<()> { + let db_path = env::temp_dir().join(format!("ecc2-runtime-{}.db", Uuid::new_v4())); + let db = StateStore::open(&db_path)?; + let session_id = "session-1".to_string(); + let now = Utc::now(); + + db.insert_session(&Session { + id: session_id.clone(), + task: "stream output".to_string(), + agent_type: "test".to_string(), + state: SessionState::Pending, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + })?; + + let output_store = SessionOutputStore::default(); + let mut rx = output_store.subscribe(); + let mut command = Command::new("/bin/sh"); + command + .arg("-c") + .arg("printf 'alpha\\n'; printf 'beta\\n' >&2"); + + let status = + capture_command_output(db_path.clone(), session_id.clone(), command, output_store) + .await?; + + assert!(status.success()); + + let db = StateStore::open(&db_path)?; + let session = db + .get_session(&session_id)? + .expect("session should still exist"); + assert_eq!(session.state, SessionState::Completed); + + let lines = db.get_output_lines(&session_id, OUTPUT_BUFFER_LIMIT)?; + let texts: HashSet<_> = lines.iter().map(|line| line.text.as_str()).collect(); + assert_eq!(lines.len(), 2); + assert!(texts.contains("alpha")); + assert!(texts.contains("beta")); + + let mut events = Vec::new(); + while let Ok(event) = rx.try_recv() { + events.push(event.line.text); + } + + assert_eq!(events.len(), 2); + assert!(events.iter().any(|line| line == "alpha")); + assert!(events.iter().any(|line| line == "beta")); + + let _ = std::fs::remove_file(db_path); + + Ok(()) + } +} diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index b412f188..33b09f36 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -1,7 +1,9 @@ use anyhow::Result; use rusqlite::Connection; use std::path::Path; +use std::time::Duration; +use super::output::{OutputLine, OutputStream, OUTPUT_BUFFER_LIMIT}; use super::{Session, SessionMetrics, SessionState}; pub struct StateStore { @@ -11,6 +13,7 @@ pub struct StateStore { impl StateStore { pub fn open(path: &Path) -> Result { let conn = Connection::open(path)?; + conn.busy_timeout(Duration::from_secs(5))?; let store = Self { conn }; store.init_schema()?; Ok(store) @@ -57,9 +60,19 @@ impl StateStore { timestamp TEXT NOT NULL ); + CREATE TABLE IF NOT EXISTS session_output ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL REFERENCES sessions(id), + stream TEXT NOT NULL, + line TEXT NOT NULL, + timestamp TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_sessions_state ON sessions(state); CREATE INDEX IF NOT EXISTS idx_tool_log_session ON tool_log(session_id); CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_session, read); + CREATE INDEX IF NOT EXISTS idx_session_output_session + ON session_output(session_id, id); ", )?; Ok(()) @@ -170,16 +183,12 @@ impl StateStore { pub fn get_session(&self, id: &str) -> Result> { let sessions = self.list_sessions()?; - Ok(sessions.into_iter().find(|s| s.id == id || s.id.starts_with(id))) + Ok(sessions + .into_iter() + .find(|s| s.id == id || s.id.starts_with(id))) } - pub fn send_message( - &self, - from: &str, - to: &str, - content: &str, - msg_type: &str, - ) -> Result<()> { + pub fn send_message(&self, from: &str, to: &str, content: &str, msg_type: &str) -> Result<()> { self.conn.execute( "INSERT INTO messages (from_session, to_session, content, msg_type, timestamp) VALUES (?1, ?2, ?3, ?4, ?5)", @@ -187,4 +196,114 @@ impl StateStore { )?; Ok(()) } + + pub fn append_output_line( + &self, + session_id: &str, + stream: OutputStream, + line: &str, + ) -> Result<()> { + let now = chrono::Utc::now().to_rfc3339(); + + self.conn.execute( + "INSERT INTO session_output (session_id, stream, line, timestamp) + VALUES (?1, ?2, ?3, ?4)", + rusqlite::params![session_id, stream.as_str(), line, now], + )?; + + self.conn.execute( + "DELETE FROM session_output + WHERE session_id = ?1 + AND id NOT IN ( + SELECT id + FROM session_output + WHERE session_id = ?1 + ORDER BY id DESC + LIMIT ?2 + )", + rusqlite::params![session_id, OUTPUT_BUFFER_LIMIT as i64], + )?; + + self.conn.execute( + "UPDATE sessions SET updated_at = ?1 WHERE id = ?2", + rusqlite::params![chrono::Utc::now().to_rfc3339(), session_id], + )?; + + Ok(()) + } + + pub fn get_output_lines(&self, session_id: &str, limit: usize) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT stream, line + FROM ( + SELECT id, stream, line + FROM session_output + WHERE session_id = ?1 + ORDER BY id DESC + LIMIT ?2 + ) + ORDER BY id ASC", + )?; + + let lines = stmt + .query_map(rusqlite::params![session_id, limit as i64], |row| { + let stream: String = row.get(0)?; + let text: String = row.get(1)?; + + Ok(OutputLine { + stream: OutputStream::from_db_value(&stream), + text, + }) + })? + .collect::, _>>()?; + + Ok(lines) + } +} + +#[cfg(test)] +mod tests { + use std::env; + + use anyhow::Result; + use chrono::Utc; + use uuid::Uuid; + + use super::StateStore; + use crate::session::output::{OutputStream, OUTPUT_BUFFER_LIMIT}; + use crate::session::{Session, SessionMetrics, SessionState}; + + #[test] + fn append_output_line_keeps_latest_buffer_window() -> Result<()> { + let db_path = env::temp_dir().join(format!("ecc2-store-{}.db", Uuid::new_v4())); + let db = StateStore::open(&db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "session-1".to_string(), + task: "buffer output".to_string(), + agent_type: "claude".to_string(), + state: SessionState::Running, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + })?; + + for index in 0..(OUTPUT_BUFFER_LIMIT + 5) { + db.append_output_line("session-1", OutputStream::Stdout, &format!("line-{index}"))?; + } + + let lines = db.get_output_lines("session-1", OUTPUT_BUFFER_LIMIT)?; + let texts: Vec<_> = lines.iter().map(|line| line.text.as_str()).collect(); + + assert_eq!(lines.len(), OUTPUT_BUFFER_LIMIT); + assert_eq!(texts.first().copied(), Some("line-5")); + let expected_last_line = format!("line-{}", OUTPUT_BUFFER_LIMIT + 4); + assert_eq!(texts.last().copied(), Some(expected_last_line.as_str())); + + let _ = std::fs::remove_file(db_path); + + Ok(()) + } } diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index aca1e995..3942bb1e 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -1,20 +1,28 @@ +use std::collections::HashMap; + use ratatui::{ prelude::*, widgets::{Block, Borders, List, ListItem, Paragraph, Tabs}, }; +use tokio::sync::broadcast; use crate::config::Config; -use crate::session::{Session, SessionState}; +use crate::session::output::{OutputEvent, OutputLine, SessionOutputStore, OUTPUT_BUFFER_LIMIT}; use crate::session::store::StateStore; +use crate::session::{Session, SessionState}; pub struct Dashboard { db: StateStore, - cfg: Config, + output_store: SessionOutputStore, + output_rx: broadcast::Receiver, sessions: Vec, + session_output_cache: HashMap>, selected_pane: Pane, selected_session: usize, show_help: bool, - scroll_offset: usize, + output_follow: bool, + output_scroll_offset: usize, + last_output_height: usize, } #[derive(Debug, Clone, Copy, PartialEq)] @@ -26,23 +34,39 @@ enum Pane { impl Dashboard { pub fn new(db: StateStore, cfg: Config) -> Self { + Self::with_output_store(db, cfg, SessionOutputStore::default()) + } + + pub fn with_output_store( + db: StateStore, + _cfg: Config, + output_store: SessionOutputStore, + ) -> Self { let sessions = db.list_sessions().unwrap_or_default(); - Self { + let output_rx = output_store.subscribe(); + + let mut dashboard = Self { db, - cfg, + output_store, + output_rx, sessions, + session_output_cache: HashMap::new(), selected_pane: Pane::Sessions, selected_session: 0, show_help: false, - scroll_offset: 0, - } + output_follow: true, + output_scroll_offset: 0, + last_output_height: 0, + }; + dashboard.sync_selected_output(); + dashboard } - pub fn render(&self, frame: &mut Frame) { + pub fn render(&mut self, frame: &mut Frame) { let chunks = Layout::default() .direction(Direction::Vertical) .constraints([ - Constraint::Length(3), // Header + Constraint::Length(3), // Header Constraint::Min(10), // Main content Constraint::Length(3), // Status bar ]) @@ -79,7 +103,11 @@ impl Dashboard { } fn render_header(&self, frame: &mut Frame, area: Rect) { - let running = self.sessions.iter().filter(|s| s.state == SessionState::Running).count(); + let running = self + .sessions + .iter() + .filter(|s| s.state == SessionState::Running) + .count(); let total = self.sessions.len(); let title = format!(" ECC 2.0 | {running} running / {total} total "); @@ -90,7 +118,11 @@ impl Dashboard { Pane::Output => 1, Pane::Metrics => 2, }) - .highlight_style(Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD)); + .highlight_style( + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + ); frame.render_widget(tabs, area); } @@ -110,11 +142,18 @@ impl Dashboard { SessionState::Pending => "◌", }; let style = if i == self.selected_session { - Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD) + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD) } else { Style::default() }; - let text = format!("{state_icon} {} [{}] {}", &s.id[..8.min(s.id.len())], s.agent_type, s.task); + let text = format!( + "{state_icon} {} [{}] {}", + &s.id[..8.min(s.id.len())], + s.agent_type, + s.task + ); ListItem::new(text).style(style) }) .collect(); @@ -134,9 +173,21 @@ impl Dashboard { frame.render_widget(list, area); } - fn render_output(&self, frame: &mut Frame, area: Rect) { - let content = if let Some(session) = self.sessions.get(self.selected_session) { - format!("Agent output for session {}...\n\n(Live streaming coming soon)", session.id) + fn render_output(&mut self, frame: &mut Frame, area: Rect) { + self.sync_output_scroll(area.height.saturating_sub(2) as usize); + + let content = if self.sessions.get(self.selected_session).is_some() { + let lines = self.selected_output_lines(); + + if lines.is_empty() { + "Waiting for session output...".to_string() + } else { + lines + .iter() + .map(|line| line.text.as_str()) + .collect::>() + .join("\n") + } } else { "No sessions. Press 'n' to start one.".to_string() }; @@ -147,12 +198,14 @@ impl Dashboard { Style::default() }; - let paragraph = Paragraph::new(content).block( - Block::default() - .borders(Borders::ALL) - .title(" Output ") - .border_style(border_style), - ); + let paragraph = Paragraph::new(content) + .block( + Block::default() + .borders(Borders::ALL) + .title(" Output ") + .border_style(border_style), + ) + .scroll((self.output_scroll_offset as u16, 0)); frame.render_widget(paragraph, area); } @@ -233,16 +286,38 @@ impl Dashboard { pub fn scroll_down(&mut self) { if self.selected_pane == Pane::Sessions && !self.sessions.is_empty() { self.selected_session = (self.selected_session + 1).min(self.sessions.len() - 1); - } else { - self.scroll_offset = self.scroll_offset.saturating_add(1); + self.reset_output_view(); + self.sync_selected_output(); + } else if self.selected_pane == Pane::Output { + let max_scroll = self.max_output_scroll(); + + if self.output_follow { + return; + } + + if self.output_scroll_offset >= max_scroll.saturating_sub(1) { + self.output_follow = true; + self.output_scroll_offset = max_scroll; + } else { + self.output_scroll_offset = self.output_scroll_offset.saturating_add(1); + } } } pub fn scroll_up(&mut self) { if self.selected_pane == Pane::Sessions { self.selected_session = self.selected_session.saturating_sub(1); + self.reset_output_view(); + self.sync_selected_output(); + } else if self.selected_pane == Pane::Output { + if self.output_follow { + self.output_follow = false; + self.output_scroll_offset = self.max_output_scroll(); + } + + self.output_scroll_offset = self.output_scroll_offset.saturating_sub(1); } else { - self.scroll_offset = self.scroll_offset.saturating_sub(1); + self.output_scroll_offset = self.output_scroll_offset.saturating_sub(1); } } @@ -260,6 +335,8 @@ impl Dashboard { pub fn refresh(&mut self) { self.sessions = self.db.list_sessions().unwrap_or_default(); + self.clamp_selected_session(); + self.sync_selected_output(); } pub fn toggle_help(&mut self) { @@ -267,7 +344,169 @@ impl Dashboard { } pub async fn tick(&mut self) { - // Periodic refresh every few ticks + loop { + match self.output_rx.try_recv() { + Ok(_event) => {} + Err(broadcast::error::TryRecvError::Empty) => break, + Err(broadcast::error::TryRecvError::Lagged(_)) => continue, + Err(broadcast::error::TryRecvError::Closed) => break, + } + } + self.sessions = self.db.list_sessions().unwrap_or_default(); + self.clamp_selected_session(); + self.sync_selected_output(); + } + + fn clamp_selected_session(&mut self) { + if self.sessions.is_empty() { + self.selected_session = 0; + } else { + self.selected_session = self.selected_session.min(self.sessions.len() - 1); + } + } + + fn sync_selected_output(&mut self) { + let Some(session_id) = self.selected_session_id().map(ToOwned::to_owned) else { + return; + }; + + match self.db.get_output_lines(&session_id, OUTPUT_BUFFER_LIMIT) { + Ok(lines) => { + self.output_store.replace_lines(&session_id, lines.clone()); + self.session_output_cache.insert(session_id, lines); + } + Err(error) => { + tracing::warn!("Failed to load session output: {error}"); + } + } + } + + fn selected_session_id(&self) -> Option<&str> { + self.sessions + .get(self.selected_session) + .map(|session| session.id.as_str()) + } + + fn selected_output_lines(&self) -> &[OutputLine] { + self.selected_session_id() + .and_then(|session_id| self.session_output_cache.get(session_id)) + .map(Vec::as_slice) + .unwrap_or(&[]) + } + + fn sync_output_scroll(&mut self, viewport_height: usize) { + self.last_output_height = viewport_height.max(1); + let max_scroll = self.max_output_scroll(); + + if self.output_follow { + self.output_scroll_offset = max_scroll; + } else { + self.output_scroll_offset = self.output_scroll_offset.min(max_scroll); + } + } + + fn max_output_scroll(&self) -> usize { + self.selected_output_lines() + .len() + .saturating_sub(self.last_output_height.max(1)) + } + + fn reset_output_view(&mut self) { + self.output_follow = true; + self.output_scroll_offset = 0; + } + + #[cfg(test)] + fn selected_output_text(&self) -> String { + self.selected_output_lines() + .iter() + .map(|line| line.text.clone()) + .collect::>() + .join("\n") + } +} + +#[cfg(test)] +mod tests { + use std::env; + + use anyhow::Result; + use chrono::Utc; + use uuid::Uuid; + + use super::{Dashboard, Pane}; + use crate::config::Config; + use crate::session::output::OutputStream; + use crate::session::store::StateStore; + use crate::session::{Session, SessionMetrics, SessionState}; + + #[test] + fn refresh_loads_selected_session_output_and_follows_tail() -> Result<()> { + let db_path = env::temp_dir().join(format!("ecc2-dashboard-{}.db", Uuid::new_v4())); + let db = StateStore::open(&db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "session-1".to_string(), + task: "tail output".to_string(), + agent_type: "claude".to_string(), + state: SessionState::Running, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + })?; + + for index in 0..12 { + db.append_output_line("session-1", OutputStream::Stdout, &format!("line {index}"))?; + } + + let mut dashboard = Dashboard::new(db, Config::default()); + dashboard.selected_pane = Pane::Output; + dashboard.refresh(); + dashboard.sync_output_scroll(4); + + assert_eq!(dashboard.output_scroll_offset, 8); + assert!(dashboard.selected_output_text().contains("line 11")); + + let _ = std::fs::remove_file(db_path); + + Ok(()) + } + + #[test] + fn scrolling_up_disables_follow_mode() -> Result<()> { + let db_path = env::temp_dir().join(format!("ecc2-dashboard-{}.db", Uuid::new_v4())); + let db = StateStore::open(&db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "session-1".to_string(), + task: "inspect output".to_string(), + agent_type: "claude".to_string(), + state: SessionState::Running, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + })?; + + for index in 0..6 { + db.append_output_line("session-1", OutputStream::Stdout, &format!("line {index}"))?; + } + + let mut dashboard = Dashboard::new(db, Config::default()); + dashboard.selected_pane = Pane::Output; + dashboard.refresh(); + dashboard.sync_output_scroll(3); + dashboard.scroll_up(); + + assert!(!dashboard.output_follow); + assert_eq!(dashboard.output_scroll_offset, 2); + + let _ = std::fs::remove_file(db_path); + + Ok(()) } }