feat: add otel export for ecc sessions

This commit is contained in:
Affaan Mustafa
2026-04-09 09:02:39 -07:00
parent 2048f0d6f5
commit 0513898b9d
2 changed files with 521 additions and 0 deletions

View File

@@ -250,6 +250,14 @@ enum Commands {
#[arg(long)]
json: bool,
},
/// Export sessions, tool spans, and metrics in OTLP-compatible JSON
ExportOtel {
/// Session ID or alias. Omit to export all sessions.
session_id: Option<String>,
/// Write the export to a file instead of stdout
#[arg(long)]
output: Option<PathBuf>,
},
/// Stop a running session
Stop {
/// Session ID or alias
@@ -808,6 +816,21 @@ async fn main() -> Result<()> {
println!("{}", format_prune_worktrees_human(&outcome));
}
}
Some(Commands::ExportOtel { session_id, output }) => {
sync_runtime_session_metrics(&db, &cfg)?;
let resolved_session_id = session_id
.as_deref()
.map(|value| resolve_session_id(&db, value))
.transpose()?;
let export = build_otel_export(&db, resolved_session_id.as_deref())?;
let rendered = serde_json::to_string_pretty(&export)?;
if let Some(path) = output {
std::fs::write(&path, rendered)?;
println!("OTLP export written to {}", path.display());
} else {
println!("{rendered}");
}
}
Some(Commands::Stop { session_id }) => {
session::manager::stop_session(&db, &session_id).await?;
println!("Session stopped: {session_id}");
@@ -1081,6 +1104,93 @@ struct WorktreeResolutionReport {
resolution_steps: Vec<String>,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
struct OtlpExport {
resource_spans: Vec<OtlpResourceSpans>,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
struct OtlpResourceSpans {
resource: OtlpResource,
scope_spans: Vec<OtlpScopeSpans>,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
struct OtlpResource {
attributes: Vec<OtlpKeyValue>,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
struct OtlpScopeSpans {
scope: OtlpInstrumentationScope,
spans: Vec<OtlpSpan>,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
struct OtlpInstrumentationScope {
name: String,
version: String,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
struct OtlpSpan {
trace_id: String,
span_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
parent_span_id: Option<String>,
name: String,
kind: String,
start_time_unix_nano: String,
end_time_unix_nano: String,
attributes: Vec<OtlpKeyValue>,
#[serde(skip_serializing_if = "Vec::is_empty")]
links: Vec<OtlpSpanLink>,
status: OtlpSpanStatus,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
struct OtlpSpanLink {
trace_id: String,
span_id: String,
#[serde(skip_serializing_if = "Vec::is_empty")]
attributes: Vec<OtlpKeyValue>,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
struct OtlpSpanStatus {
code: String,
#[serde(skip_serializing_if = "Option::is_none")]
message: Option<String>,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
struct OtlpKeyValue {
key: String,
value: OtlpAnyValue,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
struct OtlpAnyValue {
#[serde(skip_serializing_if = "Option::is_none")]
string_value: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
int_value: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
double_value: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
bool_value: Option<bool>,
}
fn build_worktree_status_report(
session: &session::Session,
include_patch: bool,
@@ -1449,6 +1559,214 @@ fn format_prune_worktrees_human(outcome: &session::manager::WorktreePruneOutcome
lines.join("\n")
}
fn build_otel_export(
db: &session::store::StateStore,
session_id: Option<&str>,
) -> Result<OtlpExport> {
let sessions = if let Some(session_id) = session_id {
vec![db
.get_session(session_id)?
.ok_or_else(|| anyhow::anyhow!("Session not found: {session_id}"))?]
} else {
db.list_sessions()?
};
let mut spans = Vec::new();
for session in &sessions {
spans.extend(build_session_otel_spans(db, session)?);
}
Ok(OtlpExport {
resource_spans: vec![OtlpResourceSpans {
resource: OtlpResource {
attributes: vec![
otlp_string_attr("service.name", "ecc2"),
otlp_string_attr("service.version", env!("CARGO_PKG_VERSION")),
otlp_string_attr("telemetry.sdk.language", "rust"),
],
},
scope_spans: vec![OtlpScopeSpans {
scope: OtlpInstrumentationScope {
name: "ecc2".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
},
spans,
}],
}],
})
}
fn build_session_otel_spans(
db: &session::store::StateStore,
session: &session::Session,
) -> Result<Vec<OtlpSpan>> {
let trace_id = otlp_trace_id(&session.id);
let session_span_id = otlp_span_id(&format!("session:{}", session.id));
let parent_link = db.latest_task_handoff_source(&session.id)?;
let session_end = session.updated_at.max(session.created_at);
let mut spans = vec![OtlpSpan {
trace_id: trace_id.clone(),
span_id: session_span_id.clone(),
parent_span_id: None,
name: format!("session {}", session.task),
kind: "SPAN_KIND_INTERNAL".to_string(),
start_time_unix_nano: otlp_timestamp_nanos(session.created_at),
end_time_unix_nano: otlp_timestamp_nanos(session_end),
attributes: vec![
otlp_string_attr("ecc.session.id", &session.id),
otlp_string_attr("ecc.session.state", &session.state.to_string()),
otlp_string_attr("ecc.agent.type", &session.agent_type),
otlp_string_attr("ecc.session.task", &session.task),
otlp_string_attr(
"ecc.working_dir",
session.working_dir.to_string_lossy().as_ref(),
),
otlp_int_attr("ecc.metrics.input_tokens", session.metrics.input_tokens),
otlp_int_attr("ecc.metrics.output_tokens", session.metrics.output_tokens),
otlp_int_attr("ecc.metrics.tokens_used", session.metrics.tokens_used),
otlp_int_attr("ecc.metrics.tool_calls", session.metrics.tool_calls),
otlp_int_attr(
"ecc.metrics.files_changed",
u64::from(session.metrics.files_changed),
),
otlp_int_attr("ecc.metrics.duration_secs", session.metrics.duration_secs),
otlp_double_attr("ecc.metrics.cost_usd", session.metrics.cost_usd),
],
links: parent_link
.into_iter()
.map(|parent_session_id| OtlpSpanLink {
trace_id: otlp_trace_id(&parent_session_id),
span_id: otlp_span_id(&format!("session:{parent_session_id}")),
attributes: vec![otlp_string_attr(
"ecc.parent_session.id",
&parent_session_id,
)],
})
.collect(),
status: otlp_session_status(&session.state),
}];
for entry in db.list_tool_logs_for_session(&session.id)? {
let span_end = chrono::DateTime::parse_from_rfc3339(&entry.timestamp)
.unwrap_or_else(|_| session.updated_at.into())
.with_timezone(&chrono::Utc);
let span_start = span_end - chrono::Duration::milliseconds(entry.duration_ms as i64);
spans.push(OtlpSpan {
trace_id: trace_id.clone(),
span_id: otlp_span_id(&format!("tool:{}:{}", session.id, entry.id)),
parent_span_id: Some(session_span_id.clone()),
name: format!("tool {}", entry.tool_name),
kind: "SPAN_KIND_INTERNAL".to_string(),
start_time_unix_nano: otlp_timestamp_nanos(span_start),
end_time_unix_nano: otlp_timestamp_nanos(span_end),
attributes: vec![
otlp_string_attr("ecc.session.id", &entry.session_id),
otlp_string_attr("tool.name", &entry.tool_name),
otlp_string_attr("tool.input_summary", &entry.input_summary),
otlp_string_attr("tool.output_summary", &entry.output_summary),
otlp_string_attr("tool.trigger_summary", &entry.trigger_summary),
otlp_string_attr("tool.input_params_json", &entry.input_params_json),
otlp_int_attr("tool.duration_ms", entry.duration_ms),
otlp_double_attr("tool.risk_score", entry.risk_score),
],
links: Vec::new(),
status: OtlpSpanStatus {
code: "STATUS_CODE_UNSET".to_string(),
message: None,
},
});
}
Ok(spans)
}
fn otlp_timestamp_nanos(value: chrono::DateTime<chrono::Utc>) -> String {
value
.timestamp_nanos_opt()
.unwrap_or_default()
.max(0)
.to_string()
}
fn otlp_trace_id(seed: &str) -> String {
format!(
"{:016x}{:016x}",
fnv1a64(seed.as_bytes()),
fnv1a64_with_seed(seed.as_bytes(), 1099511628211)
)
}
fn otlp_span_id(seed: &str) -> String {
format!("{:016x}", fnv1a64(seed.as_bytes()))
}
fn fnv1a64(bytes: &[u8]) -> u64 {
fnv1a64_with_seed(bytes, 14695981039346656037)
}
fn fnv1a64_with_seed(bytes: &[u8], offset_basis: u64) -> u64 {
let mut hash = offset_basis;
for byte in bytes {
hash ^= u64::from(*byte);
hash = hash.wrapping_mul(1099511628211);
}
hash
}
fn otlp_string_attr(key: &str, value: &str) -> OtlpKeyValue {
OtlpKeyValue {
key: key.to_string(),
value: OtlpAnyValue {
string_value: Some(value.to_string()),
int_value: None,
double_value: None,
bool_value: None,
},
}
}
fn otlp_int_attr(key: &str, value: u64) -> OtlpKeyValue {
OtlpKeyValue {
key: key.to_string(),
value: OtlpAnyValue {
string_value: None,
int_value: Some(value.to_string()),
double_value: None,
bool_value: None,
},
}
}
fn otlp_double_attr(key: &str, value: f64) -> OtlpKeyValue {
OtlpKeyValue {
key: key.to_string(),
value: OtlpAnyValue {
string_value: None,
int_value: None,
double_value: Some(value),
bool_value: None,
},
}
}
fn otlp_session_status(state: &session::SessionState) -> OtlpSpanStatus {
match state {
session::SessionState::Completed => OtlpSpanStatus {
code: "STATUS_CODE_OK".to_string(),
message: None,
},
session::SessionState::Failed => OtlpSpanStatus {
code: "STATUS_CODE_ERROR".to_string(),
message: Some("session failed".to_string()),
},
_ => OtlpSpanStatus {
code: "STATUS_CODE_UNSET".to_string(),
message: None,
},
}
}
fn summarize_coordinate_backlog(
outcome: &session::manager::CoordinateBacklogOutcome,
) -> CoordinateBacklogPassSummary {
@@ -1556,6 +1874,66 @@ fn send_handoff_message(db: &session::store::StateStore, from_id: &str, to_id: &
mod tests {
use super::*;
use crate::config::Config;
use crate::session::store::StateStore;
use crate::session::{Session, SessionMetrics, SessionState};
use chrono::{Duration, Utc};
use std::fs;
use std::path::{Path, PathBuf};
struct TestDir {
path: PathBuf,
}
impl TestDir {
fn new(label: &str) -> Result<Self> {
let path =
std::env::temp_dir().join(format!("ecc2-main-{label}-{}", uuid::Uuid::new_v4()));
fs::create_dir_all(&path)?;
Ok(Self { path })
}
fn path(&self) -> &Path {
&self.path
}
}
impl Drop for TestDir {
fn drop(&mut self) {
let _ = fs::remove_dir_all(&self.path);
}
}
fn build_session(id: &str, task: &str, state: SessionState) -> Session {
let now = Utc::now();
Session {
id: id.to_string(),
task: task.to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp/ecc"),
state,
pid: None,
worktree: None,
created_at: now - Duration::seconds(5),
updated_at: now,
last_heartbeat_at: now,
metrics: SessionMetrics {
input_tokens: 120,
output_tokens: 30,
tokens_used: 150,
tool_calls: 2,
files_changed: 1,
duration_secs: 5,
cost_usd: 0.42,
},
}
}
fn attr_value<'a>(attrs: &'a [OtlpKeyValue], key: &str) -> Option<&'a OtlpAnyValue> {
attrs
.iter()
.find(|attr| attr.key == key)
.map(|attr| &attr.value)
}
#[test]
fn worktree_policy_defaults_to_config_setting() {
@@ -1598,6 +1976,26 @@ mod tests {
}
}
#[test]
fn cli_parses_export_otel_command() {
let cli = Cli::try_parse_from([
"ecc",
"export-otel",
"worker-1234",
"--output",
"/tmp/ecc-otel.json",
])
.expect("export-otel should parse");
match cli.command {
Some(Commands::ExportOtel { session_id, output }) => {
assert_eq!(session_id.as_deref(), Some("worker-1234"));
assert_eq!(output.as_deref(), Some(Path::new("/tmp/ecc-otel.json")));
}
_ => panic!("expected export-otel subcommand"),
}
}
#[test]
fn cli_parses_messages_send_command() {
let cli = Cli::try_parse_from([
@@ -1886,6 +2284,99 @@ mod tests {
}
}
#[test]
fn build_otel_export_includes_session_and_tool_spans() -> Result<()> {
let tempdir = TestDir::new("otel-export-session")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let session = build_session("session-1", "Investigate export", SessionState::Completed);
db.insert_session(&session)?;
db.insert_tool_log(
&session.id,
"Write",
"Write src/lib.rs",
"{\"file\":\"src/lib.rs\"}",
"Updated file",
"manual test",
120,
0.75,
&Utc::now().to_rfc3339(),
)?;
let export = build_otel_export(&db, Some("session-1"))?;
let spans = &export.resource_spans[0].scope_spans[0].spans;
assert_eq!(spans.len(), 2);
let session_span = spans
.iter()
.find(|span| span.parent_span_id.is_none())
.expect("session root span");
let tool_span = spans
.iter()
.find(|span| span.parent_span_id.is_some())
.expect("tool child span");
assert_eq!(session_span.trace_id, tool_span.trace_id);
assert_eq!(
tool_span.parent_span_id.as_deref(),
Some(session_span.span_id.as_str())
);
assert_eq!(session_span.status.code, "STATUS_CODE_OK");
assert_eq!(
attr_value(&session_span.attributes, "ecc.session.id")
.and_then(|value| value.string_value.as_deref()),
Some("session-1")
);
assert_eq!(
attr_value(&tool_span.attributes, "tool.name")
.and_then(|value| value.string_value.as_deref()),
Some("Write")
);
assert_eq!(
attr_value(&tool_span.attributes, "tool.duration_ms")
.and_then(|value| value.int_value.as_deref()),
Some("120")
);
Ok(())
}
#[test]
fn build_otel_export_links_delegated_session_to_parent_trace() -> Result<()> {
let tempdir = TestDir::new("otel-export-parent-link")?;
let db = StateStore::open(&tempdir.path().join("state.db"))?;
let parent = build_session("lead-1", "Lead task", SessionState::Running);
let child = build_session("worker-1", "Delegated task", SessionState::Running);
db.insert_session(&parent)?;
db.insert_session(&child)?;
db.send_message(
&parent.id,
&child.id,
"{\"task\":\"Delegated task\",\"context\":\"Delegated from lead\"}",
"task_handoff",
)?;
let export = build_otel_export(&db, Some("worker-1"))?;
let session_span = export.resource_spans[0].scope_spans[0]
.spans
.iter()
.find(|span| span.parent_span_id.is_none())
.expect("session root span");
assert_eq!(session_span.links.len(), 1);
assert_eq!(session_span.links[0].trace_id, otlp_trace_id("lead-1"));
assert_eq!(
session_span.links[0].span_id,
otlp_span_id("session:lead-1")
);
assert_eq!(
attr_value(&session_span.links[0].attributes, "ecc.parent_session.id")
.and_then(|value| value.string_value.as_deref()),
Some("lead-1")
);
Ok(())
}
#[test]
fn cli_parses_worktree_status_check_flag() {
let cli = Cli::try_parse_from(["ecc", "worktree-status", "--check"])

View File

@@ -1705,6 +1705,36 @@ impl StateStore {
})
}
pub fn list_tool_logs_for_session(&self, session_id: &str) -> Result<Vec<ToolLogEntry>> {
let mut stmt = self.conn.prepare(
"SELECT id, session_id, tool_name, input_summary, input_params_json, output_summary, trigger_summary, duration_ms, risk_score, timestamp
FROM tool_log
WHERE session_id = ?1
ORDER BY timestamp ASC, id ASC",
)?;
let entries = stmt
.query_map(rusqlite::params![session_id], |row| {
Ok(ToolLogEntry {
id: row.get(0)?,
session_id: row.get(1)?,
tool_name: row.get(2)?,
input_summary: row.get::<_, Option<String>>(3)?.unwrap_or_default(),
input_params_json: row
.get::<_, Option<String>>(4)?
.unwrap_or_else(|| "{}".to_string()),
output_summary: row.get::<_, Option<String>>(5)?.unwrap_or_default(),
trigger_summary: row.get::<_, Option<String>>(6)?.unwrap_or_default(),
duration_ms: row.get::<_, Option<u64>>(7)?.unwrap_or_default(),
risk_score: row.get::<_, Option<f64>>(8)?.unwrap_or_default(),
timestamp: row.get(9)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(entries)
}
pub fn list_file_activity(
&self,
session_id: &str,