feat(ecc2): implement live output streaming per agent (#774)

- PTY output capture via tokio::process with stdout/stderr piping
- Ring buffer (1000 lines) per session
- Output pane wired to show selected session with auto-scroll
- Broadcast channel for output events
This commit is contained in:
Affaan Mustafa
2026-03-23 03:46:19 -07:00
parent e78c092499
commit 44c2bf6f7b
7 changed files with 1037 additions and 222 deletions

View File

@@ -1,23 +1,33 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use ratatui::{
prelude::*,
widgets::{
Block, Borders, Cell, HighlightSpacing, Paragraph, Row, Table, TableState, Tabs, Wrap,
},
};
use tokio::sync::broadcast;
use super::widgets::{budget_state, format_currency, format_token_count, BudgetState, TokenMeter};
use crate::config::Config;
use crate::session::output::{OutputEvent, OutputLine, SessionOutputStore, OutputStream, OUTPUT_BUFFER_LIMIT};
use crate::session::store::StateStore;
use crate::session::{Session, SessionState};
use crate::session::{Session, SessionMetrics, SessionState, WorktreeInfo};
pub struct Dashboard {
db: StateStore,
cfg: Config,
output_store: SessionOutputStore,
output_rx: broadcast::Receiver<OutputEvent>,
sessions: Vec<Session>,
session_output_cache: HashMap<String, Vec<OutputLine>>,
selected_pane: Pane,
selected_session: usize,
show_help: bool,
scroll_offset: usize,
output_follow: bool,
output_scroll_offset: usize,
last_output_height: usize,
session_table_state: TableState,
}
@@ -50,22 +60,34 @@ struct AggregateUsage {
impl Dashboard {
pub fn new(db: StateStore, cfg: Config) -> Self {
Self::with_output_store(db, cfg, SessionOutputStore::default())
}
pub fn with_output_store(db: StateStore, cfg: Config, output_store: SessionOutputStore) -> Self {
let sessions = db.list_sessions().unwrap_or_default();
let output_rx = output_store.subscribe();
let mut session_table_state = TableState::default();
if !sessions.is_empty() {
session_table_state.select(Some(0));
}
Self {
let mut dashboard = Self {
db,
cfg,
output_store,
output_rx,
sessions,
session_output_cache: HashMap::new(),
selected_pane: Pane::Sessions,
selected_session: 0,
show_help: false,
scroll_offset: 0,
output_follow: true,
output_scroll_offset: 0,
last_output_height: 0,
session_table_state,
}
};
dashboard.sync_selected_output();
dashboard
}
pub fn render(&mut self, frame: &mut Frame) {
@@ -188,12 +210,21 @@ impl Dashboard {
frame.render_stateful_widget(table, chunks[1], &mut self.session_table_state);
}
fn render_output(&self, frame: &mut Frame, area: Rect) {
let content = if let Some(session) = self.sessions.get(self.selected_session) {
format!(
"Agent output for session {}...\n\n(Live streaming coming soon)",
session.id
)
fn render_output(&mut self, frame: &mut Frame, area: Rect) {
self.sync_output_scroll(area.height.saturating_sub(2) as usize);
let content = if self.sessions.get(self.selected_session).is_some() {
let lines = self.selected_output_lines();
if lines.is_empty() {
"Waiting for session output...".to_string()
} else {
lines
.iter()
.map(|line| line.text.as_str())
.collect::<Vec<_>>()
.join("\n")
}
} else {
"No sessions. Press 'n' to start one.".to_string()
};
@@ -204,12 +235,14 @@ impl Dashboard {
Style::default()
};
let paragraph = Paragraph::new(content).block(
Block::default()
.borders(Borders::ALL)
.title(" Output ")
.border_style(border_style),
);
let paragraph = Paragraph::new(content)
.block(
Block::default()
.borders(Borders::ALL)
.title(" Output ")
.border_style(border_style),
)
.scroll((self.output_scroll_offset as u16, 0));
frame.render_widget(paragraph, area);
}
@@ -264,7 +297,7 @@ impl Dashboard {
}
fn render_status_bar(&self, frame: &mut Frame, area: Rect) {
let text = " [n]ew session [s]top [Tab] switch pane [j/k] scroll [?] help [q]uit ";
let text = " [n]ew session [s]top [r]efresh [Tab] switch pane [j/k] scroll [?] help [q]uit ";
let aggregate = self.aggregate_usage();
let (summary_text, summary_style) = self.aggregate_cost_summary();
let block = Block::default()
@@ -338,22 +371,48 @@ impl Dashboard {
}
pub fn scroll_down(&mut self) {
if self.selected_pane == Pane::Sessions && !self.sessions.is_empty() {
self.selected_session = (self.selected_session + 1).min(self.sessions.len() - 1);
self.session_table_state.select(Some(self.selected_session));
} else {
self.scroll_offset = self.scroll_offset.saturating_add(1);
match self.selected_pane {
Pane::Sessions if !self.sessions.is_empty() => {
self.selected_session = (self.selected_session + 1).min(self.sessions.len() - 1);
self.sync_selection();
self.reset_output_view();
self.sync_selected_output();
}
Pane::Output => {
let max_scroll = self.max_output_scroll();
if self.output_follow {
return;
}
if self.output_scroll_offset >= max_scroll.saturating_sub(1) {
self.output_follow = true;
self.output_scroll_offset = max_scroll;
} else {
self.output_scroll_offset = self.output_scroll_offset.saturating_add(1);
}
}
Pane::Metrics => {}
Pane::Sessions => {}
}
}
pub fn scroll_up(&mut self) {
if self.selected_pane == Pane::Sessions {
self.selected_session = self.selected_session.saturating_sub(1);
if !self.sessions.is_empty() {
self.session_table_state.select(Some(self.selected_session));
match self.selected_pane {
Pane::Sessions => {
self.selected_session = self.selected_session.saturating_sub(1);
self.sync_selection();
self.reset_output_view();
self.sync_selected_output();
}
} else {
self.scroll_offset = self.scroll_offset.saturating_sub(1);
Pane::Output => {
if self.output_follow {
self.output_follow = false;
self.output_scroll_offset = self.max_output_scroll();
}
self.output_scroll_offset = self.output_scroll_offset.saturating_sub(1);
}
Pane::Metrics => {}
}
}
@@ -363,14 +422,16 @@ impl Dashboard {
pub fn stop_selected(&mut self) {
if let Some(session) = self.sessions.get(self.selected_session) {
let _ = self.db.update_state(&session.id, &SessionState::Stopped);
if let Err(error) = self.db.update_state(&session.id, &SessionState::Stopped) {
tracing::warn!("Failed to stop session {}: {error}", session.id);
return;
}
self.refresh();
}
}
pub fn refresh(&mut self) {
self.sessions = self.db.list_sessions().unwrap_or_default();
self.sync_selection();
self.sync_from_store();
}
pub fn toggle_help(&mut self) {
@@ -378,8 +439,29 @@ impl Dashboard {
}
pub async fn tick(&mut self) {
self.sessions = self.db.list_sessions().unwrap_or_default();
self.sync_selection();
loop {
match self.output_rx.try_recv() {
Ok(_event) => {}
Err(broadcast::error::TryRecvError::Empty) => break,
Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
Err(broadcast::error::TryRecvError::Closed) => break,
}
}
self.sync_from_store();
}
fn sync_from_store(&mut self) {
let selected_id = self.selected_session_id().map(ToOwned::to_owned);
self.sessions = match self.db.list_sessions() {
Ok(sessions) => sessions,
Err(error) => {
tracing::warn!("Failed to refresh sessions: {error}");
Vec::new()
}
};
self.sync_selection_by_id(selected_id.as_deref());
self.sync_selected_output();
}
fn sync_selection(&mut self) {
@@ -392,6 +474,68 @@ impl Dashboard {
}
}
fn sync_selection_by_id(&mut self, selected_id: Option<&str>) {
if let Some(selected_id) = selected_id {
if let Some(index) = self.sessions.iter().position(|session| session.id == selected_id) {
self.selected_session = index;
}
}
self.sync_selection();
}
fn sync_selected_output(&mut self) {
let Some(session_id) = self.selected_session_id().map(ToOwned::to_owned) else {
self.output_scroll_offset = 0;
self.output_follow = true;
return;
};
match self.db.get_output_lines(&session_id, OUTPUT_BUFFER_LIMIT) {
Ok(lines) => {
self.output_store.replace_lines(&session_id, lines.clone());
self.session_output_cache.insert(session_id, lines);
}
Err(error) => {
tracing::warn!("Failed to load session output: {error}");
}
}
}
fn selected_session_id(&self) -> Option<&str> {
self.sessions
.get(self.selected_session)
.map(|session| session.id.as_str())
}
fn selected_output_lines(&self) -> &[OutputLine] {
self.selected_session_id()
.and_then(|session_id| self.session_output_cache.get(session_id))
.map(Vec::as_slice)
.unwrap_or(&[])
}
fn sync_output_scroll(&mut self, viewport_height: usize) {
self.last_output_height = viewport_height.max(1);
let max_scroll = self.max_output_scroll();
if self.output_follow {
self.output_scroll_offset = max_scroll;
} else {
self.output_scroll_offset = self.output_scroll_offset.min(max_scroll);
}
}
fn max_output_scroll(&self) -> usize {
self.selected_output_lines()
.len()
.saturating_sub(self.last_output_height.max(1))
}
fn reset_output_view(&mut self) {
self.output_follow = true;
self.output_scroll_offset = 0;
}
fn aggregate_usage(&self) -> AggregateUsage {
let total_tokens = self
.sessions
@@ -457,9 +601,19 @@ impl Dashboard {
(text, aggregate.overall_state.style())
}
#[cfg(test)]
fn aggregate_cost_summary_text(&self) -> String {
self.aggregate_cost_summary().0
}
#[cfg(test)]
fn selected_output_text(&self) -> String {
self.selected_output_lines()
.iter()
.map(|line| line.text.clone())
.collect::<Vec<_>>()
.join("\n")
}
}
impl SessionSummary {
@@ -564,89 +718,12 @@ fn format_duration(duration_secs: u64) -> String {
#[cfg(test)]
mod tests {
use std::path::{Path, PathBuf};
use anyhow::Result;
use chrono::Utc;
use ratatui::{backend::TestBackend, widgets::TableState, Terminal};
use ratatui::{backend::TestBackend, Terminal};
use uuid::Uuid;
use super::*;
use crate::config::Config;
use crate::session::store::StateStore;
use crate::session::{SessionMetrics, WorktreeInfo};
use crate::tui::widgets::BudgetState;
#[test]
fn session_state_color_matches_requested_palette() {
assert_eq!(session_state_color(&SessionState::Running), Color::Green);
assert_eq!(session_state_color(&SessionState::Idle), Color::Yellow);
assert_eq!(session_state_color(&SessionState::Failed), Color::Red);
assert_eq!(session_state_color(&SessionState::Stopped), Color::DarkGray);
assert_eq!(session_state_color(&SessionState::Completed), Color::Blue);
}
#[test]
fn session_summary_counts_each_state() {
let sessions = vec![
sample_session(
"run-12345678",
"planner",
SessionState::Running,
Some("feat/run"),
128,
15,
),
sample_session(
"idle-12345678",
"reviewer",
SessionState::Idle,
Some("feat/idle"),
256,
30,
),
sample_session(
"done-12345678",
"architect",
SessionState::Completed,
Some("feat/done"),
512,
45,
),
sample_session(
"fail-12345678",
"worker",
SessionState::Failed,
Some("feat/fail"),
1024,
60,
),
sample_session(
"stop-12345678",
"security",
SessionState::Stopped,
None,
64,
10,
),
sample_session(
"pend-12345678",
"tdd",
SessionState::Pending,
Some("feat/pending"),
32,
5,
),
];
let summary = SessionSummary::from_sessions(&sessions);
assert_eq!(summary.total, 6);
assert_eq!(summary.running, 1);
assert_eq!(summary.idle, 1);
assert_eq!(summary.completed, 1);
assert_eq!(summary.failed, 1);
assert_eq!(summary.stopped, 1);
assert_eq!(summary.pending, 1);
}
#[test]
fn render_sessions_shows_summary_headers_and_selected_row() {
@@ -673,7 +750,6 @@ mod tests {
);
let rendered = render_dashboard_text(dashboard, 150, 24);
assert!(rendered.contains("ID"));
assert!(rendered.contains("Agent"));
assert!(rendered.contains("State"));
@@ -689,59 +765,6 @@ mod tests {
assert!(rendered.contains("00:02:05"));
}
#[test]
fn sync_selection_preserves_table_offset_for_selected_rows() {
let mut dashboard = test_dashboard(
vec![
sample_session(
"run-12345678",
"planner",
SessionState::Running,
Some("feat/run"),
128,
15,
),
sample_session(
"done-87654321",
"reviewer",
SessionState::Completed,
Some("release/v1"),
2048,
125,
),
],
1,
);
*dashboard.session_table_state.offset_mut() = 3;
dashboard.sync_selection();
assert_eq!(dashboard.session_table_state.selected(), Some(1));
assert_eq!(dashboard.session_table_state.offset(), 3);
}
#[test]
fn aggregate_usage_sums_tokens_and_cost_with_warning_state() {
let db = StateStore::open(Path::new(":memory:")).unwrap();
let mut cfg = Config::default();
cfg.token_budget = 10_000;
cfg.cost_budget_usd = 10.0;
let mut dashboard = Dashboard::new(db, cfg);
dashboard.sessions = vec![
budget_session("sess-1", 4_000, 3.50),
budget_session("sess-2", 4_500, 4.80),
];
let aggregate = dashboard.aggregate_usage();
assert_eq!(aggregate.total_tokens, 8_500);
assert!((aggregate.total_cost_usd - 8.30).abs() < 1e-9);
assert_eq!(aggregate.token_state, BudgetState::Warning);
assert_eq!(aggregate.cost_state, BudgetState::Warning);
assert_eq!(aggregate.overall_state, BudgetState::Warning);
}
#[test]
fn aggregate_cost_summary_mentions_total_cost() {
let db = StateStore::open(Path::new(":memory:")).unwrap();
@@ -757,29 +780,144 @@ mod tests {
);
}
#[test]
fn refresh_preserves_selected_session_by_id() -> Result<()> {
let db_path = std::env::temp_dir().join(format!("ecc2-dashboard-{}.db", Uuid::new_v4()));
let db = StateStore::open(&db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "older".to_string(),
task: "older".to_string(),
agent_type: "claude".to_string(),
state: SessionState::Idle,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "newer".to_string(),
task: "newer".to_string(),
agent_type: "claude".to_string(),
state: SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now + chrono::Duration::seconds(1),
metrics: SessionMetrics::default(),
})?;
let mut dashboard = Dashboard::new(db, Config::default());
dashboard.selected_session = 1;
dashboard.sync_selection();
dashboard.refresh();
assert_eq!(dashboard.selected_session_id(), Some("older"));
let _ = std::fs::remove_file(db_path);
Ok(())
}
#[test]
fn metrics_scroll_does_not_mutate_output_scroll() -> Result<()> {
let db_path = std::env::temp_dir().join(format!("ecc2-dashboard-{}.db", Uuid::new_v4()));
let db = StateStore::open(&db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "session-1".to_string(),
task: "inspect output".to_string(),
agent_type: "claude".to_string(),
state: SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
metrics: SessionMetrics::default(),
})?;
for index in 0..6 {
db.append_output_line("session-1", OutputStream::Stdout, &format!("line {index}"))?;
}
let mut dashboard = Dashboard::new(db, Config::default());
dashboard.selected_pane = Pane::Output;
dashboard.refresh();
dashboard.sync_output_scroll(3);
dashboard.scroll_up();
let previous_scroll = dashboard.output_scroll_offset;
dashboard.selected_pane = Pane::Metrics;
dashboard.scroll_up();
dashboard.scroll_down();
assert_eq!(dashboard.output_scroll_offset, previous_scroll);
let _ = std::fs::remove_file(db_path);
Ok(())
}
#[test]
fn refresh_loads_selected_session_output_and_follows_tail() -> Result<()> {
let db_path = std::env::temp_dir().join(format!("ecc2-dashboard-{}.db", Uuid::new_v4()));
let db = StateStore::open(&db_path)?;
let now = Utc::now();
db.insert_session(&Session {
id: "session-1".to_string(),
task: "tail output".to_string(),
agent_type: "claude".to_string(),
state: SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
metrics: SessionMetrics::default(),
})?;
for index in 0..12 {
db.append_output_line("session-1", OutputStream::Stdout, &format!("line {index}"))?;
}
let mut dashboard = Dashboard::new(db, Config::default());
dashboard.selected_pane = Pane::Output;
dashboard.refresh();
dashboard.sync_output_scroll(4);
assert_eq!(dashboard.output_scroll_offset, 8);
assert!(dashboard.selected_output_text().contains("line 11"));
let _ = std::fs::remove_file(db_path);
Ok(())
}
fn test_dashboard(sessions: Vec<Session>, selected_session: usize) -> Dashboard {
let selected_session = selected_session.min(sessions.len().saturating_sub(1));
let output_store = SessionOutputStore::default();
let output_rx = output_store.subscribe();
let mut session_table_state = TableState::default();
if !sessions.is_empty() {
session_table_state.select(Some(selected_session));
}
Dashboard {
db: test_store(),
db: StateStore::open(Path::new(":memory:")).expect("open test db"),
cfg: Config::default(),
output_store,
output_rx,
sessions,
session_output_cache: HashMap::new(),
selected_pane: Pane::Sessions,
selected_session,
show_help: false,
scroll_offset: 0,
output_follow: true,
output_scroll_offset: 0,
last_output_height: 0,
session_table_state,
}
}
fn test_store() -> StateStore {
StateStore::open(Path::new(":memory:")).expect("open test db")
}
fn sample_session(
id: &str,
agent_type: &str,