feat(ecc2): sync hook activity into session metrics

This commit is contained in:
Affaan Mustafa
2026-04-09 07:02:24 -07:00
parent 6f08e78456
commit 48fd68115e
7 changed files with 664 additions and 9 deletions

View File

@@ -137,6 +137,14 @@ impl Config {
.join("costs.jsonl")
}
pub fn tool_activity_metrics_path(&self) -> PathBuf {
self.db_path
.parent()
.unwrap_or_else(|| std::path::Path::new("."))
.join("metrics")
.join("tool-usage.jsonl")
}
pub fn effective_budget_alert_thresholds(&self) -> BudgetAlertThresholds {
self.budget_alert_thresholds.sanitized()
}

View File

@@ -899,6 +899,7 @@ fn sync_runtime_session_metrics(
) -> Result<()> {
db.refresh_session_durations()?;
db.sync_cost_tracker_metrics(&cfg.cost_metrics_path())?;
db.sync_tool_activity_metrics(&cfg.tool_activity_metrics_path())?;
let _ = session::manager::enforce_budget_hard_limits(db, cfg)?;
Ok(())
}

View File

@@ -1,13 +1,14 @@
use anyhow::{Context, Result};
use rusqlite::{Connection, OptionalExtension};
use serde::Serialize;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::time::Duration;
use crate::observability::{ToolLogEntry, ToolLogPage};
use crate::config::Config;
use crate::observability::{ToolCallEvent, ToolLogEntry, ToolLogPage};
use super::output::{OutputLine, OutputStream, OUTPUT_BUFFER_LIMIT};
use super::{Session, SessionMessage, SessionMetrics, SessionState};
@@ -136,13 +137,15 @@ impl StateStore {
CREATE TABLE IF NOT EXISTS tool_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hook_event_id TEXT UNIQUE,
session_id TEXT NOT NULL REFERENCES sessions(id),
tool_name TEXT NOT NULL,
input_summary TEXT,
output_summary TEXT,
duration_ms INTEGER,
risk_score REAL DEFAULT 0.0,
timestamp TEXT NOT NULL
timestamp TEXT NOT NULL,
file_paths_json TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE IF NOT EXISTS messages (
@@ -189,6 +192,9 @@ impl StateStore {
CREATE INDEX IF NOT EXISTS idx_sessions_state ON sessions(state);
CREATE INDEX IF NOT EXISTS idx_tool_log_session ON tool_log(session_id);
CREATE UNIQUE INDEX IF NOT EXISTS idx_tool_log_hook_event
ON tool_log(hook_event_id)
WHERE hook_event_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_messages_to ON messages(to_session, read);
CREATE INDEX IF NOT EXISTS idx_session_output_session
ON session_output(session_id, id);
@@ -234,6 +240,21 @@ impl StateStore {
.context("Failed to add output_tokens column to sessions table")?;
}
if !self.has_column("tool_log", "hook_event_id")? {
self.conn
.execute("ALTER TABLE tool_log ADD COLUMN hook_event_id TEXT", [])
.context("Failed to add hook_event_id column to tool_log table")?;
}
if !self.has_column("tool_log", "file_paths_json")? {
self.conn
.execute(
"ALTER TABLE tool_log ADD COLUMN file_paths_json TEXT NOT NULL DEFAULT '[]'",
[],
)
.context("Failed to add file_paths_json column to tool_log table")?;
}
if !self.has_column("daemon_activity", "last_dispatch_deferred")? {
self.conn
.execute(
@@ -362,6 +383,12 @@ impl StateStore {
.context("Failed to add last_auto_prune_active_skipped column to daemon_activity table")?;
}
self.conn.execute_batch(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_tool_log_hook_event
ON tool_log(hook_event_id)
WHERE hook_event_id IS NOT NULL;",
)?;
Ok(())
}
@@ -636,6 +663,127 @@ impl StateStore {
Ok(())
}
pub fn sync_tool_activity_metrics(&self, metrics_path: &Path) -> Result<()> {
if !metrics_path.exists() {
return Ok(());
}
#[derive(Default)]
struct ActivityAggregate {
tool_calls: u64,
file_paths: HashSet<String>,
}
#[derive(serde::Deserialize)]
struct ToolActivityRow {
id: String,
session_id: String,
tool_name: String,
#[serde(default)]
input_summary: String,
#[serde(default)]
output_summary: String,
#[serde(default)]
duration_ms: u64,
#[serde(default)]
file_paths: Vec<String>,
#[serde(default)]
timestamp: String,
}
let file = File::open(metrics_path)
.with_context(|| format!("Failed to open {}", metrics_path.display()))?;
let reader = BufReader::new(file);
let mut aggregates: HashMap<String, ActivityAggregate> = HashMap::new();
let mut seen_event_ids = HashSet::new();
for line in reader.lines() {
let line = line?;
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let Ok(row) = serde_json::from_str::<ToolActivityRow>(trimmed) else {
continue;
};
if row.id.trim().is_empty()
|| row.session_id.trim().is_empty()
|| row.tool_name.trim().is_empty()
{
continue;
}
if !seen_event_ids.insert(row.id.clone()) {
continue;
}
let file_paths: Vec<String> = row
.file_paths
.into_iter()
.map(|path| path.trim().to_string())
.filter(|path| !path.is_empty())
.collect();
let file_paths_json =
serde_json::to_string(&file_paths).unwrap_or_else(|_| "[]".to_string());
let timestamp = if row.timestamp.trim().is_empty() {
chrono::Utc::now().to_rfc3339()
} else {
row.timestamp
};
let risk_score = ToolCallEvent::compute_risk(
&row.tool_name,
&row.input_summary,
&Config::RISK_THRESHOLDS,
)
.score;
let session_id = row.session_id.clone();
self.conn.execute(
"INSERT OR IGNORE INTO tool_log (
hook_event_id,
session_id,
tool_name,
input_summary,
output_summary,
duration_ms,
risk_score,
timestamp,
file_paths_json
)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
rusqlite::params![
row.id,
row.session_id,
row.tool_name,
row.input_summary,
row.output_summary,
row.duration_ms,
risk_score,
timestamp,
file_paths_json,
],
)?;
let aggregate = aggregates.entry(session_id).or_default();
aggregate.tool_calls = aggregate.tool_calls.saturating_add(1);
for file_path in file_paths {
aggregate.file_paths.insert(file_path);
}
}
for session in self.list_sessions()? {
let mut metrics = session.metrics.clone();
let aggregate = aggregates.get(&session.id);
metrics.tool_calls = aggregate.map(|item| item.tool_calls).unwrap_or(0);
metrics.files_changed = aggregate
.map(|item| item.file_paths.len().min(u32::MAX as usize) as u32)
.unwrap_or(0);
self.update_metrics(&session.id, &metrics)?;
}
Ok(())
}
pub fn increment_tool_calls(&self, session_id: &str) -> Result<()> {
self.conn.execute(
"UPDATE sessions SET tool_calls = tool_calls + 1, updated_at = ?1 WHERE id = ?2",
@@ -1419,6 +1567,71 @@ mod tests {
Ok(())
}
#[test]
fn sync_tool_activity_metrics_aggregates_usage_and_logs() -> Result<()> {
let tempdir = TestDir::new("store-tool-activity")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let now = Utc::now();
db.insert_session(&Session {
id: "session-1".to_string(),
task: "sync tools".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
metrics: SessionMetrics::default(),
})?;
db.insert_session(&Session {
id: "session-2".to_string(),
task: "no activity".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Pending,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
metrics: SessionMetrics::default(),
})?;
let metrics_dir = tempdir.path().join("metrics");
fs::create_dir_all(&metrics_dir)?;
let metrics_path = metrics_dir.join("tool-usage.jsonl");
fs::write(
&metrics_path,
concat!(
"{\"id\":\"evt-1\",\"session_id\":\"session-1\",\"tool_name\":\"Read\",\"input_summary\":\"Read src/lib.rs\",\"output_summary\":\"ok\",\"file_paths\":[\"src/lib.rs\"],\"timestamp\":\"2026-04-09T00:00:00Z\"}\n",
"{\"id\":\"evt-1\",\"session_id\":\"session-1\",\"tool_name\":\"Read\",\"input_summary\":\"Read src/lib.rs\",\"output_summary\":\"ok\",\"file_paths\":[\"src/lib.rs\"],\"timestamp\":\"2026-04-09T00:00:00Z\"}\n",
"{\"id\":\"evt-2\",\"session_id\":\"session-1\",\"tool_name\":\"Write\",\"input_summary\":\"Write README.md\",\"output_summary\":\"ok\",\"file_paths\":[\"src/lib.rs\",\"README.md\"],\"timestamp\":\"2026-04-09T00:01:00Z\"}\n"
),
)?;
db.sync_tool_activity_metrics(&metrics_path)?;
let session = db
.get_session("session-1")?
.expect("session should still exist");
assert_eq!(session.metrics.tool_calls, 2);
assert_eq!(session.metrics.files_changed, 2);
let inactive = db
.get_session("session-2")?
.expect("session should still exist");
assert_eq!(inactive.metrics.tool_calls, 0);
assert_eq!(inactive.metrics.files_changed, 0);
let logs = db.query_tool_logs("session-1", 1, 10)?;
assert_eq!(logs.total, 2);
assert_eq!(logs.entries[0].tool_name, "Write");
assert_eq!(logs.entries[1].tool_name, "Read");
Ok(())
}
#[test]
fn refresh_session_durations_updates_running_and_terminal_sessions() -> Result<()> {
let tempdir = TestDir::new("store-duration-metrics")?;

View File

@@ -102,6 +102,7 @@ pub struct Dashboard {
selected_search_match: usize,
session_table_state: TableState,
last_cost_metrics_signature: Option<(u64, u128)>,
last_tool_activity_signature: Option<(u64, u128)>,
last_budget_alert_state: BudgetState,
}
@@ -280,11 +281,16 @@ impl Dashboard {
output_store: SessionOutputStore,
) -> Self {
let pane_size_percent = configured_pane_size(&cfg, cfg.pane_layout);
let initial_cost_metrics_signature = cost_metrics_signature(&cfg.cost_metrics_path());
let initial_cost_metrics_signature = metrics_file_signature(&cfg.cost_metrics_path());
let initial_tool_activity_signature =
metrics_file_signature(&cfg.tool_activity_metrics_path());
let _ = db.refresh_session_durations();
if initial_cost_metrics_signature.is_some() {
let _ = db.sync_cost_tracker_metrics(&cfg.cost_metrics_path());
}
if initial_tool_activity_signature.is_some() {
let _ = db.sync_tool_activity_metrics(&cfg.tool_activity_metrics_path());
}
let sessions = db.list_sessions().unwrap_or_default();
let output_rx = output_store.subscribe();
let mut session_table_state = TableState::default();
@@ -345,6 +351,7 @@ impl Dashboard {
selected_search_match: 0,
session_table_state,
last_cost_metrics_signature: initial_cost_metrics_signature,
last_tool_activity_signature: initial_tool_activity_signature,
last_budget_alert_state: BudgetState::Normal,
};
dashboard.unread_message_counts = dashboard.db.unread_message_counts().unwrap_or_default();
@@ -2752,7 +2759,7 @@ impl Dashboard {
}
let metrics_path = self.cfg.cost_metrics_path();
let signature = cost_metrics_signature(&metrics_path);
let signature = metrics_file_signature(&metrics_path);
if signature != self.last_cost_metrics_signature {
self.last_cost_metrics_signature = signature;
if signature.is_some() {
@@ -2762,6 +2769,17 @@ impl Dashboard {
}
}
let activity_path = self.cfg.tool_activity_metrics_path();
let activity_signature = metrics_file_signature(&activity_path);
if activity_signature != self.last_tool_activity_signature {
self.last_tool_activity_signature = activity_signature;
if activity_signature.is_some() {
if let Err(error) = self.db.sync_tool_activity_metrics(&activity_path) {
tracing::warn!("Failed to sync tool activity metrics: {error}");
}
}
}
match manager::enforce_budget_hard_limits(&self.db, &self.cfg) {
Ok(outcome) => Some(outcome),
Err(error) => {
@@ -3446,7 +3464,7 @@ impl Dashboard {
occurred_at: session.updated_at,
session_id: session.id.clone(),
event_type: TimelineEventType::FileChange,
summary: format!("files changed {}", session.metrics.files_changed),
summary: format!("files touched {}", session.metrics.files_changed),
});
}
@@ -5464,7 +5482,7 @@ fn format_duration(duration_secs: u64) -> String {
format!("{hours:02}:{minutes:02}:{seconds:02}")
}
fn cost_metrics_signature(path: &std::path::Path) -> Option<(u64, u128)> {
fn metrics_file_signature(path: &std::path::Path) -> Option<(u64, u128)> {
let metadata = std::fs::metadata(path).ok()?;
let modified = metadata
.modified()
@@ -5885,7 +5903,7 @@ mod tests {
assert!(rendered.contains("created session as planner"));
assert!(rendered.contains("received query lead-123"));
assert!(rendered.contains("tool bash"));
assert!(rendered.contains("files changed 3"));
assert!(rendered.contains("files touched 3"));
}
#[test]
@@ -5944,7 +5962,7 @@ mod tests {
let rendered = dashboard.rendered_output_text(180, 30);
assert!(rendered.contains("received query lead-123"));
assert!(!rendered.contains("tool bash"));
assert!(!rendered.contains("files changed 1"));
assert!(!rendered.contains("files touched 1"));
}
#[test]
@@ -7249,6 +7267,47 @@ diff --git a/src/next.rs b/src/next.rs
);
}
#[test]
fn refresh_syncs_tool_activity_metrics_from_hook_file() {
let tempdir = std::env::temp_dir().join(format!("ecc2-activity-sync-{}", Uuid::new_v4()));
fs::create_dir_all(tempdir.join("metrics")).unwrap();
let db_path = tempdir.join("state.db");
let db = StateStore::open(&db_path).unwrap();
let now = Utc::now();
db.insert_session(&Session {
id: "sess-1".to_string(),
task: "sync activity".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
metrics: SessionMetrics::default(),
})
.unwrap();
let mut cfg = Config::default();
cfg.db_path = db_path;
let mut dashboard = Dashboard::new(db, cfg);
fs::write(
tempdir.join("metrics").join("tool-usage.jsonl"),
"{\"id\":\"evt-1\",\"session_id\":\"sess-1\",\"tool_name\":\"Read\",\"input_summary\":\"Read README.md\",\"output_summary\":\"ok\",\"file_paths\":[\"README.md\"],\"timestamp\":\"2026-04-09T00:00:00Z\"}\n",
)
.unwrap();
dashboard.refresh();
assert_eq!(dashboard.sessions.len(), 1);
assert_eq!(dashboard.sessions[0].metrics.tool_calls, 1);
assert_eq!(dashboard.sessions[0].metrics.files_changed, 1);
let _ = fs::remove_dir_all(tempdir);
}
#[test]
fn new_session_task_uses_selected_session_context() {
let dashboard = test_dashboard(
@@ -9171,6 +9230,7 @@ diff --git a/src/next.rs b/src/next.rs
selected_search_match: 0,
session_table_state,
last_cost_metrics_signature: None,
last_tool_activity_signature: None,
last_budget_alert_state: BudgetState::Normal,
}
}