From 08f61f667df38f17be641f50fa564dee1cadb9ce Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Thu, 9 Apr 2026 06:22:20 -0700 Subject: [PATCH] feat: sync ecc2 cost tracker metrics --- ecc2/src/config/mod.rs | 8 + ecc2/src/main.rs | 12 ++ ecc2/src/session/manager.rs | 12 +- ecc2/src/session/mod.rs | 2 + ecc2/src/session/store.rs | 256 +++++++++++++++++++++++++++++-- ecc2/src/tui/dashboard.rs | 52 ++++++- scripts/hooks/cost-tracker.js | 2 +- tests/hooks/cost-tracker.test.js | 21 +++ 8 files changed, 352 insertions(+), 13 deletions(-) diff --git a/ecc2/src/config/mod.rs b/ecc2/src/config/mod.rs index aaa0500f..86b542dc 100644 --- a/ecc2/src/config/mod.rs +++ b/ecc2/src/config/mod.rs @@ -113,6 +113,14 @@ impl Config { .join("ecc2.toml") } + pub fn cost_metrics_path(&self) -> PathBuf { + self.db_path + .parent() + .unwrap_or_else(|| std::path::Path::new(".")) + .join("metrics") + .join("costs.jsonl") + } + pub fn load() -> Result { let config_path = Self::config_path(); diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 2c16abe9..c52b0096 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -673,17 +673,20 @@ async fn main() -> Result<()> { } } Some(Commands::Sessions) => { + sync_runtime_session_metrics(&db, &cfg)?; let sessions = session::manager::list_sessions(&db)?; for s in sessions { println!("{} [{}] {}", s.id, s.state, s.task); } } Some(Commands::Status { session_id }) => { + sync_runtime_session_metrics(&db, &cfg)?; let id = session_id.unwrap_or_else(|| "latest".to_string()); let status = session::manager::get_status(&db, &id)?; println!("{status}"); } Some(Commands::Team { session_id, depth }) => { + sync_runtime_session_metrics(&db, &cfg)?; let id = session_id.unwrap_or_else(|| "latest".to_string()); let team = session::manager::get_team_status(&db, &id, depth)?; println!("{team}"); @@ -890,6 +893,15 @@ fn resolve_session_id(db: &session::store::StateStore, value: &str) -> Result Result<()> { + db.refresh_session_durations()?; + db.sync_cost_tracker_metrics(&cfg.cost_metrics_path())?; + Ok(()) +} + fn build_message( kind: MessageKindArg, text: String, diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index e83e838f..0bdff6db 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -1136,6 +1136,7 @@ fn build_agent_command( ) -> Command { let mut command = Command::new(agent_program); command + .env("ECC_SESSION_ID", session_id) .arg("--print") .arg("--name") .arg(format!("ecc-{session_id}")) @@ -1412,7 +1413,13 @@ impl fmt::Display for SessionStatus { writeln!(f, "Branch: {}", wt.branch)?; writeln!(f, "Worktree: {}", wt.path.display())?; } - writeln!(f, "Tokens: {}", s.metrics.tokens_used)?; + writeln!( + f, + "Tokens: {} total (in {} / out {})", + s.metrics.tokens_used, + s.metrics.input_tokens, + s.metrics.output_tokens + )?; writeln!(f, "Tools: {}", s.metrics.tool_calls)?; writeln!(f, "Files: {}", s.metrics.files_changed)?; writeln!(f, "Cost: ${:.4}", s.metrics.cost_usd)?; @@ -1741,7 +1748,7 @@ mod tests { let script_path = root.join("fake-claude.sh"); let log_path = root.join("fake-claude.log"); let script = format!( - "#!/usr/bin/env python3\nimport os\nimport pathlib\nimport signal\nimport sys\nimport time\n\nlog_path = pathlib.Path(r\"{}\")\nlog_path.write_text(os.getcwd() + \"\\n\", encoding=\"utf-8\")\nwith log_path.open(\"a\", encoding=\"utf-8\") as handle:\n handle.write(\" \".join(sys.argv[1:]) + \"\\n\")\n\ndef handle_term(signum, frame):\n raise SystemExit(0)\n\nsignal.signal(signal.SIGTERM, handle_term)\nwhile True:\n time.sleep(0.1)\n", + "#!/usr/bin/env python3\nimport os\nimport pathlib\nimport signal\nimport sys\nimport time\n\nlog_path = pathlib.Path(r\"{}\")\nlog_path.write_text(os.getcwd() + \"\\n\", encoding=\"utf-8\")\nwith log_path.open(\"a\", encoding=\"utf-8\") as handle:\n handle.write(\" \".join(sys.argv[1:]) + \"\\n\")\n handle.write(\"ECC_SESSION_ID=\" + os.environ.get(\"ECC_SESSION_ID\", \"\") + \"\\n\")\n\ndef handle_term(signum, frame):\n raise SystemExit(0)\n\nsignal.signal(signal.SIGTERM, handle_term)\nwhile True:\n time.sleep(0.1)\n", log_path.display() ); @@ -1803,6 +1810,7 @@ mod tests { assert!(log.contains(repo_root.to_string_lossy().as_ref())); assert!(log.contains("--print")); assert!(log.contains("implement lifecycle")); + assert!(log.contains(&format!("ECC_SESSION_ID={session_id}"))); stop_session_with_options(&db, &session_id, false).await?; Ok(()) diff --git a/ecc2/src/session/mod.rs b/ecc2/src/session/mod.rs index 8ee2668e..6d243858 100644 --- a/ecc2/src/session/mod.rs +++ b/ecc2/src/session/mod.rs @@ -95,6 +95,8 @@ pub struct WorktreeInfo { #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct SessionMetrics { + pub input_tokens: u64, + pub output_tokens: u64, pub tokens_used: u64, pub tool_calls: u64, pub files_changed: u32, diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 22362723..d3f9da9c 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -2,6 +2,8 @@ use anyhow::{Context, Result}; use rusqlite::{Connection, OptionalExtension}; use serde::Serialize; use std::collections::HashMap; +use std::fs::File; +use std::io::{BufRead, BufReader}; use std::path::{Path, PathBuf}; use std::time::Duration; @@ -121,6 +123,8 @@ impl StateStore { worktree_path TEXT, worktree_branch TEXT, worktree_base TEXT, + input_tokens INTEGER DEFAULT 0, + output_tokens INTEGER DEFAULT 0, tokens_used INTEGER DEFAULT 0, tool_calls INTEGER DEFAULT 0, files_changed INTEGER DEFAULT 0, @@ -212,6 +216,24 @@ impl StateStore { .context("Failed to add pid column to sessions table")?; } + if !self.has_column("sessions", "input_tokens")? { + self.conn + .execute( + "ALTER TABLE sessions ADD COLUMN input_tokens INTEGER NOT NULL DEFAULT 0", + [], + ) + .context("Failed to add input_tokens column to sessions table")?; + } + + if !self.has_column("sessions", "output_tokens")? { + self.conn + .execute( + "ALTER TABLE sessions ADD COLUMN output_tokens INTEGER NOT NULL DEFAULT 0", + [], + ) + .context("Failed to add output_tokens column to sessions table")?; + } + if !self.has_column("daemon_activity", "last_dispatch_deferred")? { self.conn .execute( @@ -470,8 +492,19 @@ impl StateStore { pub fn update_metrics(&self, session_id: &str, metrics: &SessionMetrics) -> Result<()> { self.conn.execute( - "UPDATE sessions SET tokens_used = ?1, tool_calls = ?2, files_changed = ?3, duration_secs = ?4, cost_usd = ?5, updated_at = ?6 WHERE id = ?7", + "UPDATE sessions + SET input_tokens = ?1, + output_tokens = ?2, + tokens_used = ?3, + tool_calls = ?4, + files_changed = ?5, + duration_secs = ?6, + cost_usd = ?7, + updated_at = ?8 + WHERE id = ?9", rusqlite::params![ + metrics.input_tokens, + metrics.output_tokens, metrics.tokens_used, metrics.tool_calls, metrics.files_changed, @@ -484,6 +517,121 @@ impl StateStore { Ok(()) } + pub fn refresh_session_durations(&self) -> Result<()> { + let now = chrono::Utc::now(); + let mut stmt = self.conn.prepare( + "SELECT id, state, created_at, updated_at, duration_secs + FROM sessions", + )?; + let rows = stmt + .query_map([], |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + row.get::<_, String>(3)?, + row.get::<_, u64>(4)?, + )) + })? + .collect::, _>>()?; + + for (session_id, state_raw, created_raw, updated_raw, current_duration) in rows { + let state = SessionState::from_db_value(&state_raw); + let created_at = chrono::DateTime::parse_from_rfc3339(&created_raw) + .unwrap_or_default() + .with_timezone(&chrono::Utc); + let updated_at = chrono::DateTime::parse_from_rfc3339(&updated_raw) + .unwrap_or_default() + .with_timezone(&chrono::Utc); + let effective_end = match state { + SessionState::Pending | SessionState::Running | SessionState::Idle => now, + SessionState::Completed | SessionState::Failed | SessionState::Stopped => updated_at, + }; + let duration_secs = effective_end + .signed_duration_since(created_at) + .num_seconds() + .max(0) as u64; + + if duration_secs != current_duration { + self.conn.execute( + "UPDATE sessions SET duration_secs = ?1 WHERE id = ?2", + rusqlite::params![duration_secs, session_id], + )?; + } + } + + Ok(()) + } + + pub fn sync_cost_tracker_metrics(&self, metrics_path: &Path) -> Result<()> { + if !metrics_path.exists() { + return Ok(()); + } + + #[derive(Default)] + struct UsageAggregate { + input_tokens: u64, + output_tokens: u64, + cost_usd: f64, + } + + #[derive(serde::Deserialize)] + struct CostTrackerRow { + session_id: String, + #[serde(default)] + input_tokens: u64, + #[serde(default)] + output_tokens: u64, + #[serde(default)] + estimated_cost_usd: f64, + } + + let file = File::open(metrics_path) + .with_context(|| format!("Failed to open {}", metrics_path.display()))?; + let reader = BufReader::new(file); + let mut aggregates: HashMap = HashMap::new(); + + for line in reader.lines() { + let line = line?; + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + let Ok(row) = serde_json::from_str::(trimmed) else { + continue; + }; + if row.session_id.trim().is_empty() { + continue; + } + + let aggregate = aggregates.entry(row.session_id).or_default(); + aggregate.input_tokens = aggregate.input_tokens.saturating_add(row.input_tokens); + aggregate.output_tokens = aggregate.output_tokens.saturating_add(row.output_tokens); + aggregate.cost_usd += row.estimated_cost_usd; + } + + for (session_id, aggregate) in aggregates { + self.conn.execute( + "UPDATE sessions + SET input_tokens = ?1, + output_tokens = ?2, + tokens_used = ?3, + cost_usd = ?4 + WHERE id = ?5", + rusqlite::params![ + aggregate.input_tokens, + aggregate.output_tokens, + aggregate.input_tokens.saturating_add(aggregate.output_tokens), + aggregate.cost_usd, + session_id, + ], + )?; + } + + Ok(()) + } + pub fn increment_tool_calls(&self, session_id: &str) -> Result<()> { self.conn.execute( "UPDATE sessions SET tool_calls = tool_calls + 1, updated_at = ?1 WHERE id = ?2", @@ -495,7 +643,7 @@ impl StateStore { pub fn list_sessions(&self) -> Result> { let mut stmt = self.conn.prepare( "SELECT id, task, agent_type, working_dir, state, pid, worktree_path, worktree_branch, worktree_base, - tokens_used, tool_calls, files_changed, duration_secs, cost_usd, + input_tokens, output_tokens, tokens_used, tool_calls, files_changed, duration_secs, cost_usd, created_at, updated_at FROM sessions ORDER BY updated_at DESC", )?; @@ -512,8 +660,8 @@ impl StateStore { base_branch: row.get::<_, String>(8).unwrap_or_default(), }); - let created_str: String = row.get(14)?; - let updated_str: String = row.get(15)?; + let created_str: String = row.get(16)?; + let updated_str: String = row.get(17)?; Ok(Session { id: row.get(0)?, @@ -530,11 +678,13 @@ impl StateStore { .unwrap_or_default() .with_timezone(&chrono::Utc), metrics: SessionMetrics { - tokens_used: row.get(9)?, - tool_calls: row.get(10)?, - files_changed: row.get(11)?, - duration_secs: row.get(12)?, - cost_usd: row.get(13)?, + input_tokens: row.get(9)?, + output_tokens: row.get(10)?, + tokens_used: row.get(11)?, + tool_calls: row.get(12)?, + files_changed: row.get(13)?, + duration_secs: row.get(14)?, + cost_usd: row.get(15)?, }, }) })? @@ -1216,6 +1366,94 @@ mod tests { assert!(column_names.iter().any(|column| column == "working_dir")); assert!(column_names.iter().any(|column| column == "pid")); + assert!(column_names.iter().any(|column| column == "input_tokens")); + assert!(column_names.iter().any(|column| column == "output_tokens")); + Ok(()) + } + + #[test] + fn sync_cost_tracker_metrics_aggregates_usage_into_sessions() -> Result<()> { + let tempdir = TestDir::new("store-cost-metrics")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "session-1".to_string(), + task: "sync usage".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + metrics: SessionMetrics::default(), + })?; + + let metrics_dir = tempdir.path().join("metrics"); + fs::create_dir_all(&metrics_dir)?; + let metrics_path = metrics_dir.join("costs.jsonl"); + fs::write( + &metrics_path, + concat!( + "{\"session_id\":\"session-1\",\"input_tokens\":100,\"output_tokens\":25,\"estimated_cost_usd\":0.11}\n", + "{\"session_id\":\"session-1\",\"input_tokens\":40,\"output_tokens\":10,\"estimated_cost_usd\":0.05}\n", + "{\"session_id\":\"other-session\",\"input_tokens\":999,\"output_tokens\":1,\"estimated_cost_usd\":9.99}\n" + ), + )?; + + db.sync_cost_tracker_metrics(&metrics_path)?; + + let session = db + .get_session("session-1")? + .expect("session should still exist"); + assert_eq!(session.metrics.input_tokens, 140); + assert_eq!(session.metrics.output_tokens, 35); + assert_eq!(session.metrics.tokens_used, 175); + assert!((session.metrics.cost_usd - 0.16).abs() < f64::EPSILON); + + Ok(()) + } + + #[test] + fn refresh_session_durations_updates_running_and_terminal_sessions() -> Result<()> { + let tempdir = TestDir::new("store-duration-metrics")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "running-1".to_string(), + task: "live run".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: Some(1234), + worktree: None, + created_at: now - ChronoDuration::seconds(95), + updated_at: now - ChronoDuration::seconds(1), + metrics: SessionMetrics::default(), + })?; + db.insert_session(&Session { + id: "done-1".to_string(), + task: "finished run".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Completed, + pid: None, + worktree: None, + created_at: now - ChronoDuration::seconds(80), + updated_at: now - ChronoDuration::seconds(5), + metrics: SessionMetrics::default(), + })?; + + db.refresh_session_durations()?; + + let running = db.get_session("running-1")?.expect("running session should exist"); + let completed = db.get_session("done-1")?.expect("completed session should exist"); + + assert!(running.metrics.duration_secs >= 95); + assert!(completed.metrics.duration_secs >= 75); + Ok(()) } diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index 40f583f2..842d820b 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -8,6 +8,7 @@ use ratatui::{ }; use regex::Regex; use std::collections::{HashMap, HashSet}; +use std::time::UNIX_EPOCH; use tokio::sync::broadcast; use super::widgets::{budget_state, format_currency, format_token_count, BudgetState, TokenMeter}; @@ -100,6 +101,7 @@ pub struct Dashboard { search_matches: Vec, selected_search_match: usize, session_table_state: TableState, + last_cost_metrics_signature: Option<(u64, u128)>, } #[derive(Debug, Default, PartialEq, Eq)] @@ -277,6 +279,11 @@ impl Dashboard { output_store: SessionOutputStore, ) -> Self { let pane_size_percent = configured_pane_size(&cfg, cfg.pane_layout); + let initial_cost_metrics_signature = cost_metrics_signature(&cfg.cost_metrics_path()); + let _ = db.refresh_session_durations(); + if initial_cost_metrics_signature.is_some() { + let _ = db.sync_cost_tracker_metrics(&cfg.cost_metrics_path()); + } let sessions = db.list_sessions().unwrap_or_default(); let output_rx = output_store.subscribe(); let mut session_table_state = TableState::default(); @@ -336,6 +343,7 @@ impl Dashboard { search_matches: Vec::new(), selected_search_match: 0, session_table_state, + last_cost_metrics_signature: initial_cost_metrics_signature, }; dashboard.unread_message_counts = dashboard.db.unread_message_counts().unwrap_or_default(); dashboard.sync_handoff_backlog_counts(); @@ -2729,7 +2737,27 @@ impl Dashboard { self.sync_from_store(); } + fn sync_runtime_metrics(&mut self) { + if let Err(error) = self.db.refresh_session_durations() { + tracing::warn!("Failed to refresh session durations: {error}"); + } + + let metrics_path = self.cfg.cost_metrics_path(); + let signature = cost_metrics_signature(&metrics_path); + if signature == self.last_cost_metrics_signature { + return; + } + + self.last_cost_metrics_signature = signature; + if signature.is_some() { + if let Err(error) = self.db.sync_cost_tracker_metrics(&metrics_path) { + tracing::warn!("Failed to sync cost tracker metrics: {error}"); + } + } + } + fn sync_from_store(&mut self) { + self.sync_runtime_metrics(); let selected_id = self.selected_session_id().map(ToOwned::to_owned); self.sessions = match self.db.list_sessions() { Ok(sessions) => sessions, @@ -3977,8 +4005,13 @@ impl Dashboard { } lines.push(format!( - "Tokens {} | Tools {} | Files {}", + "Tokens {} total | In {} | Out {}", format_token_count(metrics.tokens_used), + format_token_count(metrics.input_tokens), + format_token_count(metrics.output_tokens), + )); + lines.push(format!( + "Tools {} | Files {}", metrics.tool_calls, metrics.files_changed, )); @@ -5348,6 +5381,17 @@ fn format_duration(duration_secs: u64) -> String { format!("{hours:02}:{minutes:02}:{seconds:02}") } +fn cost_metrics_signature(path: &std::path::Path) -> Option<(u64, u128)> { + let metadata = std::fs::metadata(path).ok()?; + let modified = metadata + .modified() + .ok()? + .duration_since(UNIX_EPOCH) + .ok()? + .as_nanos(); + Some((metadata.len(), modified)) +} + #[cfg(test)] mod tests { use anyhow::{Context, Result}; @@ -5668,6 +5712,7 @@ mod tests { assert!(text.contains("- Working ?? notes.txt")); assert!(text.contains("Merge blocked by 1 conflict(s): src/main.rs")); assert!(text.contains("- conflict src/main.rs")); + assert!(text.contains("Tokens 512 total | In 384 | Out 128")); assert!(text.contains("Last output last useful output")); assert!(text.contains("Needs attention:")); assert!(text.contains("Failed failed-8 | Render dashboard rows")); @@ -8915,6 +8960,7 @@ diff --git a/src/next.rs b/src/next.rs search_matches: Vec::new(), selected_search_match: 0, session_table_state, + last_cost_metrics_signature: None, } } @@ -8990,6 +9036,8 @@ diff --git a/src/next.rs b/src/next.rs created_at: Utc::now(), updated_at: Utc::now(), metrics: SessionMetrics { + input_tokens: tokens_used.saturating_mul(3) / 4, + output_tokens: tokens_used / 4, tokens_used, tool_calls: 4, files_changed: 2, @@ -9012,6 +9060,8 @@ diff --git a/src/next.rs b/src/next.rs created_at: now, updated_at: now, metrics: SessionMetrics { + input_tokens: tokens_used.saturating_mul(3) / 4, + output_tokens: tokens_used / 4, tokens_used, tool_calls: 0, files_changed: 0, diff --git a/scripts/hooks/cost-tracker.js b/scripts/hooks/cost-tracker.js index d3b90f9b..817ff77a 100755 --- a/scripts/hooks/cost-tracker.js +++ b/scripts/hooks/cost-tracker.js @@ -55,7 +55,7 @@ process.stdin.on('end', () => { const outputTokens = toNumber(usage.output_tokens || usage.completion_tokens || 0); const model = String(input.model || input._cursor?.model || process.env.CLAUDE_MODEL || 'unknown'); - const sessionId = String(process.env.CLAUDE_SESSION_ID || 'default'); + const sessionId = String(process.env.ECC_SESSION_ID || process.env.CLAUDE_SESSION_ID || 'default'); const metricsDir = path.join(getClaudeDir(), 'metrics'); ensureDir(metricsDir); diff --git a/tests/hooks/cost-tracker.test.js b/tests/hooks/cost-tracker.test.js index 3b474912..ee834465 100644 --- a/tests/hooks/cost-tracker.test.js +++ b/tests/hooks/cost-tracker.test.js @@ -131,6 +131,27 @@ function runTests() { fs.rmSync(tmpHome, { recursive: true, force: true }); }) ? passed++ : failed++); + // 6. Prefers ECC_SESSION_ID for ECC2 session correlation + (test('prefers ECC_SESSION_ID over CLAUDE_SESSION_ID when both are present', () => { + const tmpHome = makeTempDir(); + const input = { + model: 'claude-sonnet-4-20250514', + usage: { input_tokens: 120, output_tokens: 30 }, + }; + const result = runScript(input, { + ...withTempHome(tmpHome), + ECC_SESSION_ID: 'ecc-session-1234', + CLAUDE_SESSION_ID: 'claude-session-9999', + }); + assert.strictEqual(result.code, 0, `Expected exit code 0, got ${result.code}`); + + const metricsFile = path.join(tmpHome, '.claude', 'metrics', 'costs.jsonl'); + const row = JSON.parse(fs.readFileSync(metricsFile, 'utf8').trim()); + assert.strictEqual(row.session_id, 'ecc-session-1234', 'Expected ECC_SESSION_ID to win'); + + fs.rmSync(tmpHome, { recursive: true, force: true }); + }) ? passed++ : failed++); + console.log(`\nResults: Passed: ${passed}, Failed: ${failed}`); process.exit(failed > 0 ? 1 : 0); }