feat: sync ecc2 cost tracker metrics

This commit is contained in:
Affaan Mustafa
2026-04-09 06:22:20 -07:00
parent cf9c68846c
commit 08f61f667d
8 changed files with 352 additions and 13 deletions

View File

@@ -2,6 +2,8 @@ use anyhow::{Context, Result};
use rusqlite::{Connection, OptionalExtension};
use serde::Serialize;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::time::Duration;
@@ -121,6 +123,8 @@ impl StateStore {
worktree_path TEXT,
worktree_branch TEXT,
worktree_base TEXT,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
tokens_used INTEGER DEFAULT 0,
tool_calls INTEGER DEFAULT 0,
files_changed INTEGER DEFAULT 0,
@@ -212,6 +216,24 @@ impl StateStore {
.context("Failed to add pid column to sessions table")?;
}
if !self.has_column("sessions", "input_tokens")? {
self.conn
.execute(
"ALTER TABLE sessions ADD COLUMN input_tokens INTEGER NOT NULL DEFAULT 0",
[],
)
.context("Failed to add input_tokens column to sessions table")?;
}
if !self.has_column("sessions", "output_tokens")? {
self.conn
.execute(
"ALTER TABLE sessions ADD COLUMN output_tokens INTEGER NOT NULL DEFAULT 0",
[],
)
.context("Failed to add output_tokens column to sessions table")?;
}
if !self.has_column("daemon_activity", "last_dispatch_deferred")? {
self.conn
.execute(
@@ -470,8 +492,19 @@ impl StateStore {
pub fn update_metrics(&self, session_id: &str, metrics: &SessionMetrics) -> Result<()> {
self.conn.execute(
"UPDATE sessions SET tokens_used = ?1, tool_calls = ?2, files_changed = ?3, duration_secs = ?4, cost_usd = ?5, updated_at = ?6 WHERE id = ?7",
"UPDATE sessions
SET input_tokens = ?1,
output_tokens = ?2,
tokens_used = ?3,
tool_calls = ?4,
files_changed = ?5,
duration_secs = ?6,
cost_usd = ?7,
updated_at = ?8
WHERE id = ?9",
rusqlite::params![
metrics.input_tokens,
metrics.output_tokens,
metrics.tokens_used,
metrics.tool_calls,
metrics.files_changed,
@@ -484,6 +517,121 @@ impl StateStore {
Ok(())
}
pub fn refresh_session_durations(&self) -> Result<()> {
let now = chrono::Utc::now();
let mut stmt = self.conn.prepare(
"SELECT id, state, created_at, updated_at, duration_secs
FROM sessions",
)?;
let rows = stmt
.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
row.get::<_, u64>(4)?,
))
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
for (session_id, state_raw, created_raw, updated_raw, current_duration) in rows {
let state = SessionState::from_db_value(&state_raw);
let created_at = chrono::DateTime::parse_from_rfc3339(&created_raw)
.unwrap_or_default()
.with_timezone(&chrono::Utc);
let updated_at = chrono::DateTime::parse_from_rfc3339(&updated_raw)
.unwrap_or_default()
.with_timezone(&chrono::Utc);
let effective_end = match state {
SessionState::Pending | SessionState::Running | SessionState::Idle => now,
SessionState::Completed | SessionState::Failed | SessionState::Stopped => updated_at,
};
let duration_secs = effective_end
.signed_duration_since(created_at)
.num_seconds()
.max(0) as u64;
if duration_secs != current_duration {
self.conn.execute(
"UPDATE sessions SET duration_secs = ?1 WHERE id = ?2",
rusqlite::params![duration_secs, session_id],
)?;
}
}
Ok(())
}
pub fn sync_cost_tracker_metrics(&self, metrics_path: &Path) -> Result<()> {
if !metrics_path.exists() {
return Ok(());
}
#[derive(Default)]
struct UsageAggregate {
input_tokens: u64,
output_tokens: u64,
cost_usd: f64,
}
#[derive(serde::Deserialize)]
struct CostTrackerRow {
session_id: String,
#[serde(default)]
input_tokens: u64,
#[serde(default)]
output_tokens: u64,
#[serde(default)]
estimated_cost_usd: f64,
}
let file = File::open(metrics_path)
.with_context(|| format!("Failed to open {}", metrics_path.display()))?;
let reader = BufReader::new(file);
let mut aggregates: HashMap<String, UsageAggregate> = HashMap::new();
for line in reader.lines() {
let line = line?;
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let Ok(row) = serde_json::from_str::<CostTrackerRow>(trimmed) else {
continue;
};
if row.session_id.trim().is_empty() {
continue;
}
let aggregate = aggregates.entry(row.session_id).or_default();
aggregate.input_tokens = aggregate.input_tokens.saturating_add(row.input_tokens);
aggregate.output_tokens = aggregate.output_tokens.saturating_add(row.output_tokens);
aggregate.cost_usd += row.estimated_cost_usd;
}
for (session_id, aggregate) in aggregates {
self.conn.execute(
"UPDATE sessions
SET input_tokens = ?1,
output_tokens = ?2,
tokens_used = ?3,
cost_usd = ?4
WHERE id = ?5",
rusqlite::params![
aggregate.input_tokens,
aggregate.output_tokens,
aggregate.input_tokens.saturating_add(aggregate.output_tokens),
aggregate.cost_usd,
session_id,
],
)?;
}
Ok(())
}
pub fn increment_tool_calls(&self, session_id: &str) -> Result<()> {
self.conn.execute(
"UPDATE sessions SET tool_calls = tool_calls + 1, updated_at = ?1 WHERE id = ?2",
@@ -495,7 +643,7 @@ impl StateStore {
pub fn list_sessions(&self) -> Result<Vec<Session>> {
let mut stmt = self.conn.prepare(
"SELECT id, task, agent_type, working_dir, state, pid, worktree_path, worktree_branch, worktree_base,
tokens_used, tool_calls, files_changed, duration_secs, cost_usd,
input_tokens, output_tokens, tokens_used, tool_calls, files_changed, duration_secs, cost_usd,
created_at, updated_at
FROM sessions ORDER BY updated_at DESC",
)?;
@@ -512,8 +660,8 @@ impl StateStore {
base_branch: row.get::<_, String>(8).unwrap_or_default(),
});
let created_str: String = row.get(14)?;
let updated_str: String = row.get(15)?;
let created_str: String = row.get(16)?;
let updated_str: String = row.get(17)?;
Ok(Session {
id: row.get(0)?,
@@ -530,11 +678,13 @@ impl StateStore {
.unwrap_or_default()
.with_timezone(&chrono::Utc),
metrics: SessionMetrics {
tokens_used: row.get(9)?,
tool_calls: row.get(10)?,
files_changed: row.get(11)?,
duration_secs: row.get(12)?,
cost_usd: row.get(13)?,
input_tokens: row.get(9)?,
output_tokens: row.get(10)?,
tokens_used: row.get(11)?,
tool_calls: row.get(12)?,
files_changed: row.get(13)?,
duration_secs: row.get(14)?,
cost_usd: row.get(15)?,
},
})
})?
@@ -1216,6 +1366,94 @@ mod tests {
assert!(column_names.iter().any(|column| column == "working_dir"));
assert!(column_names.iter().any(|column| column == "pid"));
assert!(column_names.iter().any(|column| column == "input_tokens"));
assert!(column_names.iter().any(|column| column == "output_tokens"));
Ok(())
}
#[test]
fn sync_cost_tracker_metrics_aggregates_usage_into_sessions() -> Result<()> {
let tempdir = TestDir::new("store-cost-metrics")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let now = Utc::now();
db.insert_session(&Session {
id: "session-1".to_string(),
task: "sync usage".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
metrics: SessionMetrics::default(),
})?;
let metrics_dir = tempdir.path().join("metrics");
fs::create_dir_all(&metrics_dir)?;
let metrics_path = metrics_dir.join("costs.jsonl");
fs::write(
&metrics_path,
concat!(
"{\"session_id\":\"session-1\",\"input_tokens\":100,\"output_tokens\":25,\"estimated_cost_usd\":0.11}\n",
"{\"session_id\":\"session-1\",\"input_tokens\":40,\"output_tokens\":10,\"estimated_cost_usd\":0.05}\n",
"{\"session_id\":\"other-session\",\"input_tokens\":999,\"output_tokens\":1,\"estimated_cost_usd\":9.99}\n"
),
)?;
db.sync_cost_tracker_metrics(&metrics_path)?;
let session = db
.get_session("session-1")?
.expect("session should still exist");
assert_eq!(session.metrics.input_tokens, 140);
assert_eq!(session.metrics.output_tokens, 35);
assert_eq!(session.metrics.tokens_used, 175);
assert!((session.metrics.cost_usd - 0.16).abs() < f64::EPSILON);
Ok(())
}
#[test]
fn refresh_session_durations_updates_running_and_terminal_sessions() -> Result<()> {
let tempdir = TestDir::new("store-duration-metrics")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let now = Utc::now();
db.insert_session(&Session {
id: "running-1".to_string(),
task: "live run".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: Some(1234),
worktree: None,
created_at: now - ChronoDuration::seconds(95),
updated_at: now - ChronoDuration::seconds(1),
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "done-1".to_string(),
task: "finished run".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Completed,
pid: None,
worktree: None,
created_at: now - ChronoDuration::seconds(80),
updated_at: now - ChronoDuration::seconds(5),
metrics: SessionMetrics::default(),
})?;
db.refresh_session_durations()?;
let running = db.get_session("running-1")?.expect("running session should exist");
let completed = db.get_session("done-1")?.expect("completed session should exist");
assert!(running.metrics.duration_secs >= 95);
assert!(completed.metrics.duration_secs >= 75);
Ok(())
}