feat(ecc2): implement live output streaming per agent (#774)

- PTY output capture via tokio::process with stdout/stderr piping
- Ring buffer (1000 lines) per session
- Output pane wired to show selected session with auto-scroll
- Broadcast channel for output events
This commit is contained in:
Affaan Mustafa
2026-03-23 03:46:19 -07:00
parent b032846806
commit ec4dcd3c45
5 changed files with 719 additions and 34 deletions

View File

@@ -1,20 +1,28 @@
use std::collections::HashMap;
use ratatui::{
prelude::*,
widgets::{Block, Borders, List, ListItem, Paragraph, Tabs},
};
use tokio::sync::broadcast;
use crate::config::Config;
use crate::session::{Session, SessionState};
use crate::session::output::{OutputEvent, OutputLine, SessionOutputStore, OUTPUT_BUFFER_LIMIT};
use crate::session::store::StateStore;
use crate::session::{Session, SessionState};
pub struct Dashboard {
db: StateStore,
cfg: Config,
output_store: SessionOutputStore,
output_rx: broadcast::Receiver<OutputEvent>,
sessions: Vec<Session>,
session_output_cache: HashMap<String, Vec<OutputLine>>,
selected_pane: Pane,
selected_session: usize,
show_help: bool,
scroll_offset: usize,
output_follow: bool,
output_scroll_offset: usize,
last_output_height: usize,
}
#[derive(Debug, Clone, Copy, PartialEq)]
@@ -26,23 +34,39 @@ enum Pane {
impl Dashboard {
pub fn new(db: StateStore, cfg: Config) -> Self {
Self::with_output_store(db, cfg, SessionOutputStore::default())
}
pub fn with_output_store(
db: StateStore,
_cfg: Config,
output_store: SessionOutputStore,
) -> Self {
let sessions = db.list_sessions().unwrap_or_default();
Self {
let output_rx = output_store.subscribe();
let mut dashboard = Self {
db,
cfg,
output_store,
output_rx,
sessions,
session_output_cache: HashMap::new(),
selected_pane: Pane::Sessions,
selected_session: 0,
show_help: false,
scroll_offset: 0,
}
output_follow: true,
output_scroll_offset: 0,
last_output_height: 0,
};
dashboard.sync_selected_output();
dashboard
}
pub fn render(&self, frame: &mut Frame) {
pub fn render(&mut self, frame: &mut Frame) {
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([
Constraint::Length(3), // Header
Constraint::Length(3), // Header
Constraint::Min(10), // Main content
Constraint::Length(3), // Status bar
])
@@ -79,7 +103,11 @@ impl Dashboard {
}
fn render_header(&self, frame: &mut Frame, area: Rect) {
let running = self.sessions.iter().filter(|s| s.state == SessionState::Running).count();
let running = self
.sessions
.iter()
.filter(|s| s.state == SessionState::Running)
.count();
let total = self.sessions.len();
let title = format!(" ECC 2.0 | {running} running / {total} total ");
@@ -90,7 +118,11 @@ impl Dashboard {
Pane::Output => 1,
Pane::Metrics => 2,
})
.highlight_style(Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD));
.highlight_style(
Style::default()
.fg(Color::Cyan)
.add_modifier(Modifier::BOLD),
);
frame.render_widget(tabs, area);
}
@@ -110,11 +142,18 @@ impl Dashboard {
SessionState::Pending => "",
};
let style = if i == self.selected_session {
Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD)
Style::default()
.fg(Color::Cyan)
.add_modifier(Modifier::BOLD)
} else {
Style::default()
};
let text = format!("{state_icon} {} [{}] {}", &s.id[..8.min(s.id.len())], s.agent_type, s.task);
let text = format!(
"{state_icon} {} [{}] {}",
&s.id[..8.min(s.id.len())],
s.agent_type,
s.task
);
ListItem::new(text).style(style)
})
.collect();
@@ -134,9 +173,21 @@ impl Dashboard {
frame.render_widget(list, area);
}
fn render_output(&self, frame: &mut Frame, area: Rect) {
let content = if let Some(session) = self.sessions.get(self.selected_session) {
format!("Agent output for session {}...\n\n(Live streaming coming soon)", session.id)
fn render_output(&mut self, frame: &mut Frame, area: Rect) {
self.sync_output_scroll(area.height.saturating_sub(2) as usize);
let content = if self.sessions.get(self.selected_session).is_some() {
let lines = self.selected_output_lines();
if lines.is_empty() {
"Waiting for session output...".to_string()
} else {
lines
.iter()
.map(|line| line.text.as_str())
.collect::<Vec<_>>()
.join("\n")
}
} else {
"No sessions. Press 'n' to start one.".to_string()
};
@@ -147,12 +198,14 @@ impl Dashboard {
Style::default()
};
let paragraph = Paragraph::new(content).block(
Block::default()
.borders(Borders::ALL)
.title(" Output ")
.border_style(border_style),
);
let paragraph = Paragraph::new(content)
.block(
Block::default()
.borders(Borders::ALL)
.title(" Output ")
.border_style(border_style),
)
.scroll((self.output_scroll_offset as u16, 0));
frame.render_widget(paragraph, area);
}
@@ -233,16 +286,38 @@ impl Dashboard {
pub fn scroll_down(&mut self) {
if self.selected_pane == Pane::Sessions && !self.sessions.is_empty() {
self.selected_session = (self.selected_session + 1).min(self.sessions.len() - 1);
} else {
self.scroll_offset = self.scroll_offset.saturating_add(1);
self.reset_output_view();
self.sync_selected_output();
} else if self.selected_pane == Pane::Output {
let max_scroll = self.max_output_scroll();
if self.output_follow {
return;
}
if self.output_scroll_offset >= max_scroll.saturating_sub(1) {
self.output_follow = true;
self.output_scroll_offset = max_scroll;
} else {
self.output_scroll_offset = self.output_scroll_offset.saturating_add(1);
}
}
}
pub fn scroll_up(&mut self) {
if self.selected_pane == Pane::Sessions {
self.selected_session = self.selected_session.saturating_sub(1);
self.reset_output_view();
self.sync_selected_output();
} else if self.selected_pane == Pane::Output {
if self.output_follow {
self.output_follow = false;
self.output_scroll_offset = self.max_output_scroll();
}
self.output_scroll_offset = self.output_scroll_offset.saturating_sub(1);
} else {
self.scroll_offset = self.scroll_offset.saturating_sub(1);
self.output_scroll_offset = self.output_scroll_offset.saturating_sub(1);
}
}
@@ -260,6 +335,8 @@ impl Dashboard {
pub fn refresh(&mut self) {
self.sessions = self.db.list_sessions().unwrap_or_default();
self.clamp_selected_session();
self.sync_selected_output();
}
pub fn toggle_help(&mut self) {
@@ -267,7 +344,169 @@ impl Dashboard {
}
pub async fn tick(&mut self) {
// Periodic refresh every few ticks
loop {
match self.output_rx.try_recv() {
Ok(_event) => {}
Err(broadcast::error::TryRecvError::Empty) => break,
Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
Err(broadcast::error::TryRecvError::Closed) => break,
}
}
self.sessions = self.db.list_sessions().unwrap_or_default();
self.clamp_selected_session();
self.sync_selected_output();
}
fn clamp_selected_session(&mut self) {
if self.sessions.is_empty() {
self.selected_session = 0;
} else {
self.selected_session = self.selected_session.min(self.sessions.len() - 1);
}
}
fn sync_selected_output(&mut self) {
let Some(session_id) = self.selected_session_id().map(ToOwned::to_owned) else {
return;
};
match self.db.get_output_lines(&session_id, OUTPUT_BUFFER_LIMIT) {
Ok(lines) => {
self.output_store.replace_lines(&session_id, lines.clone());
self.session_output_cache.insert(session_id, lines);
}
Err(error) => {
tracing::warn!("Failed to load session output: {error}");
}
}
}
fn selected_session_id(&self) -> Option<&str> {
self.sessions
.get(self.selected_session)
.map(|session| session.id.as_str())
}
fn selected_output_lines(&self) -> &[OutputLine] {
self.selected_session_id()
.and_then(|session_id| self.session_output_cache.get(session_id))
.map(Vec::as_slice)
.unwrap_or(&[])
}
fn sync_output_scroll(&mut self, viewport_height: usize) {
self.last_output_height = viewport_height.max(1);
let max_scroll = self.max_output_scroll();
if self.output_follow {
self.output_scroll_offset = max_scroll;
} else {
self.output_scroll_offset = self.output_scroll_offset.min(max_scroll);
}
}
fn max_output_scroll(&self) -> usize {
self.selected_output_lines()
.len()
.saturating_sub(self.last_output_height.max(1))
}
fn reset_output_view(&mut self) {
self.output_follow = true;
self.output_scroll_offset = 0;
}
#[cfg(test)]
fn selected_output_text(&self) -> String {
self.selected_output_lines()
.iter()
.map(|line| line.text.clone())
.collect::<Vec<_>>()
.join("\n")
}
}
#[cfg(test)]
mod tests {
use std::env;
use anyhow::Result;
use chrono::Utc;
use uuid::Uuid;
use super::{Dashboard, Pane};
use crate::config::Config;
use crate::session::output::OutputStream;
use crate::session::store::StateStore;
use crate::session::{Session, SessionMetrics, SessionState};
#[test]
fn refresh_loads_selected_session_output_and_follows_tail() -> Result<()> {
let db_path = env::temp_dir().join(format!("ecc2-dashboard-{}.db", Uuid::new_v4()));
let db = StateStore::open(&db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "session-1".to_string(),
task: "tail output".to_string(),
agent_type: "claude".to_string(),
state: SessionState::Running,
worktree: None,
created_at: now,
updated_at: now,
metrics: SessionMetrics::default(),
})?;
for index in 0..12 {
db.append_output_line("session-1", OutputStream::Stdout, &format!("line {index}"))?;
}
let mut dashboard = Dashboard::new(db, Config::default());
dashboard.selected_pane = Pane::Output;
dashboard.refresh();
dashboard.sync_output_scroll(4);
assert_eq!(dashboard.output_scroll_offset, 8);
assert!(dashboard.selected_output_text().contains("line 11"));
let _ = std::fs::remove_file(db_path);
Ok(())
}
#[test]
fn scrolling_up_disables_follow_mode() -> Result<()> {
let db_path = env::temp_dir().join(format!("ecc2-dashboard-{}.db", Uuid::new_v4()));
let db = StateStore::open(&db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "session-1".to_string(),
task: "inspect output".to_string(),
agent_type: "claude".to_string(),
state: SessionState::Running,
worktree: None,
created_at: now,
updated_at: now,
metrics: SessionMetrics::default(),
})?;
for index in 0..6 {
db.append_output_line("session-1", OutputStream::Stdout, &format!("line {index}"))?;
}
let mut dashboard = Dashboard::new(db, Config::default());
dashboard.selected_pane = Pane::Output;
dashboard.refresh();
dashboard.sync_output_scroll(3);
dashboard.scroll_up();
assert!(!dashboard.output_follow);
assert_eq!(dashboard.output_scroll_offset, 2);
let _ = std::fs::remove_file(db_path);
Ok(())
}
}