mirror of
https://github.com/affaan-m/everything-claude-code.git
synced 2026-04-10 03:13:29 +08:00
feat: auto-pause ecc2 sessions when budgets are exceeded
This commit is contained in:
@@ -227,7 +227,8 @@ impl PaneNavigationConfig {
|
||||
}
|
||||
|
||||
fn shortcut_matches(spec: &str, key: KeyEvent) -> bool {
|
||||
parse_shortcut(spec).is_some_and(|(modifiers, code)| key.modifiers == modifiers && key.code == code)
|
||||
parse_shortcut(spec)
|
||||
.is_some_and(|(modifiers, code)| key.modifiers == modifiers && key.code == code)
|
||||
}
|
||||
|
||||
fn parse_shortcut(spec: &str) -> Option<(KeyModifiers, KeyCode)> {
|
||||
|
||||
@@ -899,6 +899,7 @@ fn sync_runtime_session_metrics(
|
||||
) -> Result<()> {
|
||||
db.refresh_session_durations()?;
|
||||
db.sync_cost_tracker_metrics(&cfg.cost_metrics_path())?;
|
||||
let _ = session::manager::enforce_budget_hard_limits(db, cfg)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -353,6 +353,56 @@ pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> {
|
||||
stop_session_with_options(db, id, true).await
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, PartialEq)]
|
||||
pub struct BudgetEnforcementOutcome {
|
||||
pub token_budget_exceeded: bool,
|
||||
pub cost_budget_exceeded: bool,
|
||||
pub paused_sessions: Vec<String>,
|
||||
}
|
||||
|
||||
impl BudgetEnforcementOutcome {
|
||||
pub fn hard_limit_exceeded(&self) -> bool {
|
||||
self.token_budget_exceeded || self.cost_budget_exceeded
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enforce_budget_hard_limits(
|
||||
db: &StateStore,
|
||||
cfg: &Config,
|
||||
) -> Result<BudgetEnforcementOutcome> {
|
||||
let sessions = db.list_sessions()?;
|
||||
let total_tokens = sessions
|
||||
.iter()
|
||||
.map(|session| session.metrics.tokens_used)
|
||||
.sum::<u64>();
|
||||
let total_cost = sessions
|
||||
.iter()
|
||||
.map(|session| session.metrics.cost_usd)
|
||||
.sum::<f64>();
|
||||
|
||||
let mut outcome = BudgetEnforcementOutcome {
|
||||
token_budget_exceeded: cfg.token_budget > 0 && total_tokens >= cfg.token_budget,
|
||||
cost_budget_exceeded: cfg.cost_budget_usd > 0.0 && total_cost >= cfg.cost_budget_usd,
|
||||
paused_sessions: Vec::new(),
|
||||
};
|
||||
|
||||
if !outcome.hard_limit_exceeded() {
|
||||
return Ok(outcome);
|
||||
}
|
||||
|
||||
for session in sessions.into_iter().filter(|session| {
|
||||
matches!(
|
||||
session.state,
|
||||
SessionState::Pending | SessionState::Running | SessionState::Idle
|
||||
)
|
||||
}) {
|
||||
stop_session_recorded(db, &session, false)?;
|
||||
outcome.paused_sessions.push(session.id);
|
||||
}
|
||||
|
||||
Ok(outcome)
|
||||
}
|
||||
|
||||
pub fn record_tool_call(
|
||||
db: &StateStore,
|
||||
session_id: &str,
|
||||
@@ -1175,9 +1225,12 @@ async fn stop_session_with_options(
|
||||
cleanup_worktree: bool,
|
||||
) -> Result<()> {
|
||||
let session = resolve_session(db, id)?;
|
||||
stop_session_recorded(db, &session, cleanup_worktree)
|
||||
}
|
||||
|
||||
fn stop_session_recorded(db: &StateStore, session: &Session, cleanup_worktree: bool) -> Result<()> {
|
||||
if let Some(pid) = session.pid {
|
||||
kill_process(pid).await?;
|
||||
kill_process(pid)?;
|
||||
}
|
||||
|
||||
db.update_pid(&session.id, None)?;
|
||||
@@ -1193,13 +1246,27 @@ async fn stop_session_with_options(
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn kill_process(pid: u32) -> Result<()> {
|
||||
fn kill_process(pid: u32) -> Result<()> {
|
||||
send_signal(pid, libc::SIGTERM)?;
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
|
||||
std::thread::sleep(std::time::Duration::from_millis(1200));
|
||||
send_signal(pid, libc::SIGKILL)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn kill_process(pid: u32) -> Result<()> {
|
||||
let status = std::process::Command::new("taskkill")
|
||||
.args(["/PID", &pid.to_string(), "/T", "/F"])
|
||||
.status()
|
||||
.with_context(|| format!("Failed to invoke taskkill for process {pid}"))?;
|
||||
|
||||
if status.success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow::anyhow!("taskkill exited with status {status}"))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn send_signal(pid: u32, signal: i32) -> Result<()> {
|
||||
let outcome = unsafe { libc::kill(pid as i32, signal) };
|
||||
@@ -1416,9 +1483,7 @@ impl fmt::Display for SessionStatus {
|
||||
writeln!(
|
||||
f,
|
||||
"Tokens: {} total (in {} / out {})",
|
||||
s.metrics.tokens_used,
|
||||
s.metrics.input_tokens,
|
||||
s.metrics.output_tokens
|
||||
s.metrics.tokens_used, s.metrics.input_tokens, s.metrics.output_tokens
|
||||
)?;
|
||||
writeln!(f, "Tools: {}", s.metrics.tool_calls)?;
|
||||
writeln!(f, "Files: {}", s.metrics.files_changed)?;
|
||||
@@ -1885,6 +1950,115 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn enforce_budget_hard_limits_stops_active_sessions_without_cleaning_worktrees() -> Result<()> {
|
||||
let tempdir = TestDir::new("manager-budget-pause")?;
|
||||
let mut cfg = build_config(tempdir.path());
|
||||
cfg.token_budget = 100;
|
||||
cfg.cost_budget_usd = 0.0;
|
||||
|
||||
let db = StateStore::open(&cfg.db_path)?;
|
||||
let now = Utc::now();
|
||||
let worktree_path = tempdir.path().join("keep-worktree");
|
||||
fs::create_dir_all(&worktree_path)?;
|
||||
|
||||
db.insert_session(&Session {
|
||||
id: "active-over-budget".to_string(),
|
||||
task: "pause on hard limit".to_string(),
|
||||
agent_type: "claude".to_string(),
|
||||
working_dir: tempdir.path().to_path_buf(),
|
||||
state: SessionState::Running,
|
||||
pid: Some(999_999),
|
||||
worktree: Some(crate::session::WorktreeInfo {
|
||||
path: worktree_path.clone(),
|
||||
branch: "ecc/active-over-budget".to_string(),
|
||||
base_branch: "main".to_string(),
|
||||
}),
|
||||
created_at: now - Duration::minutes(1),
|
||||
updated_at: now,
|
||||
metrics: SessionMetrics::default(),
|
||||
})?;
|
||||
db.update_metrics(
|
||||
"active-over-budget",
|
||||
&SessionMetrics {
|
||||
input_tokens: 90,
|
||||
output_tokens: 30,
|
||||
tokens_used: 120,
|
||||
tool_calls: 0,
|
||||
files_changed: 0,
|
||||
duration_secs: 60,
|
||||
cost_usd: 0.0,
|
||||
},
|
||||
)?;
|
||||
|
||||
let outcome = enforce_budget_hard_limits(&db, &cfg)?;
|
||||
assert!(outcome.token_budget_exceeded);
|
||||
assert!(!outcome.cost_budget_exceeded);
|
||||
assert_eq!(
|
||||
outcome.paused_sessions,
|
||||
vec!["active-over-budget".to_string()]
|
||||
);
|
||||
|
||||
let session = db
|
||||
.get_session("active-over-budget")?
|
||||
.context("session should still exist")?;
|
||||
assert_eq!(session.state, SessionState::Stopped);
|
||||
assert_eq!(session.pid, None);
|
||||
assert!(
|
||||
worktree_path.exists(),
|
||||
"hard-limit pauses should preserve worktrees for resume"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn enforce_budget_hard_limits_ignores_inactive_sessions() -> Result<()> {
|
||||
let tempdir = TestDir::new("manager-budget-ignore-inactive")?;
|
||||
let mut cfg = build_config(tempdir.path());
|
||||
cfg.token_budget = 100;
|
||||
cfg.cost_budget_usd = 0.0;
|
||||
|
||||
let db = StateStore::open(&cfg.db_path)?;
|
||||
let now = Utc::now();
|
||||
|
||||
db.insert_session(&Session {
|
||||
id: "completed-over-budget".to_string(),
|
||||
task: "already done".to_string(),
|
||||
agent_type: "claude".to_string(),
|
||||
working_dir: tempdir.path().to_path_buf(),
|
||||
state: SessionState::Completed,
|
||||
pid: None,
|
||||
worktree: None,
|
||||
created_at: now - Duration::minutes(2),
|
||||
updated_at: now - Duration::minutes(1),
|
||||
metrics: SessionMetrics::default(),
|
||||
})?;
|
||||
db.update_metrics(
|
||||
"completed-over-budget",
|
||||
&SessionMetrics {
|
||||
input_tokens: 90,
|
||||
output_tokens: 30,
|
||||
tokens_used: 120,
|
||||
tool_calls: 0,
|
||||
files_changed: 0,
|
||||
duration_secs: 60,
|
||||
cost_usd: 0.0,
|
||||
},
|
||||
)?;
|
||||
|
||||
let outcome = enforce_budget_hard_limits(&db, &cfg)?;
|
||||
assert!(outcome.token_budget_exceeded);
|
||||
assert!(outcome.paused_sessions.is_empty());
|
||||
|
||||
let session = db
|
||||
.get_session("completed-over-budget")?
|
||||
.context("completed session should still exist")?;
|
||||
assert_eq!(session.state, SessionState::Completed);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn resume_session_requeues_failed_session() -> Result<()> {
|
||||
let tempdir = TestDir::new("manager-resume-session")?;
|
||||
|
||||
@@ -545,7 +545,9 @@ impl StateStore {
|
||||
.with_timezone(&chrono::Utc);
|
||||
let effective_end = match state {
|
||||
SessionState::Pending | SessionState::Running | SessionState::Idle => now,
|
||||
SessionState::Completed | SessionState::Failed | SessionState::Stopped => updated_at,
|
||||
SessionState::Completed | SessionState::Failed | SessionState::Stopped => {
|
||||
updated_at
|
||||
}
|
||||
};
|
||||
let duration_secs = effective_end
|
||||
.signed_duration_since(created_at)
|
||||
@@ -622,7 +624,9 @@ impl StateStore {
|
||||
rusqlite::params![
|
||||
aggregate.input_tokens,
|
||||
aggregate.output_tokens,
|
||||
aggregate.input_tokens.saturating_add(aggregate.output_tokens),
|
||||
aggregate
|
||||
.input_tokens
|
||||
.saturating_add(aggregate.output_tokens),
|
||||
aggregate.cost_usd,
|
||||
session_id,
|
||||
],
|
||||
@@ -1448,8 +1452,12 @@ mod tests {
|
||||
|
||||
db.refresh_session_durations()?;
|
||||
|
||||
let running = db.get_session("running-1")?.expect("running session should exist");
|
||||
let completed = db.get_session("done-1")?.expect("completed session should exist");
|
||||
let running = db
|
||||
.get_session("running-1")?
|
||||
.expect("running session should exist");
|
||||
let completed = db
|
||||
.get_session("done-1")?
|
||||
.expect("completed session should exist");
|
||||
|
||||
assert!(running.metrics.duration_secs >= 95);
|
||||
assert!(completed.metrics.duration_secs >= 75);
|
||||
|
||||
@@ -53,7 +53,9 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> {
|
||||
|
||||
match (key.modifiers, key.code) {
|
||||
(KeyModifiers::CONTROL, KeyCode::Char('c')) => break,
|
||||
(KeyModifiers::CONTROL, KeyCode::Char('w')) => dashboard.begin_pane_command_mode(),
|
||||
(KeyModifiers::CONTROL, KeyCode::Char('w')) => {
|
||||
dashboard.begin_pane_command_mode()
|
||||
}
|
||||
(_, KeyCode::Char('q')) => break,
|
||||
_ if dashboard.handle_pane_navigation_key(key) => {}
|
||||
(_, KeyCode::Tab) => dashboard.next_pane(),
|
||||
|
||||
@@ -2746,27 +2746,33 @@ impl Dashboard {
|
||||
self.sync_from_store();
|
||||
}
|
||||
|
||||
fn sync_runtime_metrics(&mut self) {
|
||||
fn sync_runtime_metrics(&mut self) -> Option<manager::BudgetEnforcementOutcome> {
|
||||
if let Err(error) = self.db.refresh_session_durations() {
|
||||
tracing::warn!("Failed to refresh session durations: {error}");
|
||||
}
|
||||
|
||||
let metrics_path = self.cfg.cost_metrics_path();
|
||||
let signature = cost_metrics_signature(&metrics_path);
|
||||
if signature == self.last_cost_metrics_signature {
|
||||
return;
|
||||
if signature != self.last_cost_metrics_signature {
|
||||
self.last_cost_metrics_signature = signature;
|
||||
if signature.is_some() {
|
||||
if let Err(error) = self.db.sync_cost_tracker_metrics(&metrics_path) {
|
||||
tracing::warn!("Failed to sync cost tracker metrics: {error}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.last_cost_metrics_signature = signature;
|
||||
if signature.is_some() {
|
||||
if let Err(error) = self.db.sync_cost_tracker_metrics(&metrics_path) {
|
||||
tracing::warn!("Failed to sync cost tracker metrics: {error}");
|
||||
match manager::enforce_budget_hard_limits(&self.db, &self.cfg) {
|
||||
Ok(outcome) => Some(outcome),
|
||||
Err(error) => {
|
||||
tracing::warn!("Failed to enforce budget hard limits: {error}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn sync_from_store(&mut self) {
|
||||
self.sync_runtime_metrics();
|
||||
let budget_enforcement = self.sync_runtime_metrics();
|
||||
let selected_id = self.selected_session_id().map(ToOwned::to_owned);
|
||||
self.sessions = match self.db.list_sessions() {
|
||||
Ok(sessions) => sessions,
|
||||
@@ -2794,6 +2800,56 @@ impl Dashboard {
|
||||
self.sync_selected_messages();
|
||||
self.sync_selected_lineage();
|
||||
self.refresh_logs();
|
||||
self.sync_budget_alerts();
|
||||
|
||||
if let Some(outcome) =
|
||||
budget_enforcement.filter(|outcome| !outcome.paused_sessions.is_empty())
|
||||
{
|
||||
self.set_operator_note(budget_auto_pause_note(&outcome));
|
||||
}
|
||||
}
|
||||
|
||||
fn sync_budget_alerts(&mut self) {
|
||||
let aggregate = self.aggregate_usage();
|
||||
let thresholds = self.cfg.effective_budget_alert_thresholds();
|
||||
let current_state = aggregate.overall_state;
|
||||
if current_state == self.last_budget_alert_state {
|
||||
return;
|
||||
}
|
||||
|
||||
let previous_state = self.last_budget_alert_state;
|
||||
self.last_budget_alert_state = current_state;
|
||||
|
||||
if current_state <= previous_state {
|
||||
return;
|
||||
}
|
||||
|
||||
let Some(summary_suffix) = current_state.summary_suffix(thresholds) else {
|
||||
return;
|
||||
};
|
||||
|
||||
let token_budget = if self.cfg.token_budget > 0 {
|
||||
format!(
|
||||
"{} / {}",
|
||||
format_token_count(aggregate.total_tokens),
|
||||
format_token_count(self.cfg.token_budget)
|
||||
)
|
||||
} else {
|
||||
format!("{} / no budget", format_token_count(aggregate.total_tokens))
|
||||
};
|
||||
let cost_budget = if self.cfg.cost_budget_usd > 0.0 {
|
||||
format!(
|
||||
"{} / {}",
|
||||
format_currency(aggregate.total_cost_usd),
|
||||
format_currency(self.cfg.cost_budget_usd)
|
||||
)
|
||||
} else {
|
||||
format!("{} / no budget", format_currency(aggregate.total_cost_usd))
|
||||
};
|
||||
|
||||
self.set_operator_note(format!(
|
||||
"{summary_suffix} | tokens {token_budget} | cost {cost_budget}"
|
||||
));
|
||||
}
|
||||
|
||||
fn sync_selection(&mut self) {
|
||||
@@ -4102,49 +4158,6 @@ impl Dashboard {
|
||||
(text, aggregate.overall_state.style())
|
||||
}
|
||||
|
||||
fn sync_budget_alerts(&mut self) {
|
||||
let aggregate = self.aggregate_usage();
|
||||
let thresholds = self.cfg.effective_budget_alert_thresholds();
|
||||
let current_state = aggregate.overall_state;
|
||||
if current_state == self.last_budget_alert_state {
|
||||
return;
|
||||
}
|
||||
|
||||
let previous_state = self.last_budget_alert_state;
|
||||
self.last_budget_alert_state = current_state;
|
||||
|
||||
if current_state <= previous_state {
|
||||
return;
|
||||
}
|
||||
|
||||
let Some(summary_suffix) = current_state.summary_suffix(thresholds) else {
|
||||
return;
|
||||
};
|
||||
|
||||
let token_budget = if self.cfg.token_budget > 0 {
|
||||
format!(
|
||||
"{} / {}",
|
||||
format_token_count(aggregate.total_tokens),
|
||||
format_token_count(self.cfg.token_budget)
|
||||
)
|
||||
} else {
|
||||
format!("{} / no budget", format_token_count(aggregate.total_tokens))
|
||||
};
|
||||
let cost_budget = if self.cfg.cost_budget_usd > 0.0 {
|
||||
format!(
|
||||
"{} / {}",
|
||||
format_currency(aggregate.total_cost_usd),
|
||||
format_currency(self.cfg.cost_budget_usd)
|
||||
)
|
||||
} else {
|
||||
format!("{} / no budget", format_currency(aggregate.total_cost_usd))
|
||||
};
|
||||
|
||||
self.set_operator_note(format!(
|
||||
"{summary_suffix} | tokens {token_budget} | cost {cost_budget}"
|
||||
));
|
||||
}
|
||||
|
||||
fn attention_queue_items(&self, limit: usize) -> Vec<String> {
|
||||
let mut items = Vec::new();
|
||||
let suppress_inbox_attention = self
|
||||
@@ -5307,6 +5320,20 @@ fn session_state_color(state: &SessionState) -> Color {
|
||||
}
|
||||
}
|
||||
|
||||
fn budget_auto_pause_note(outcome: &manager::BudgetEnforcementOutcome) -> String {
|
||||
let cause = match (outcome.token_budget_exceeded, outcome.cost_budget_exceeded) {
|
||||
(true, true) => "token and cost budgets exceeded",
|
||||
(true, false) => "token budget exceeded",
|
||||
(false, true) => "cost budget exceeded",
|
||||
(false, false) => "budget exceeded",
|
||||
};
|
||||
|
||||
format!(
|
||||
"{cause} | auto-paused {} active session(s)",
|
||||
outcome.paused_sessions.len()
|
||||
)
|
||||
}
|
||||
|
||||
fn format_session_id(id: &str) -> String {
|
||||
id.chars().take(8).collect()
|
||||
}
|
||||
@@ -7188,6 +7215,40 @@ diff --git a/src/next.rs b/src/next.rs
|
||||
assert_eq!(dashboard.last_budget_alert_state, BudgetState::Alert75);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn refresh_auto_pauses_over_budget_sessions_and_sets_operator_note() {
|
||||
let db = StateStore::open(Path::new(":memory:")).unwrap();
|
||||
let mut cfg = Config::default();
|
||||
cfg.token_budget = 100;
|
||||
cfg.cost_budget_usd = 0.0;
|
||||
|
||||
db.insert_session(&budget_session("sess-1", 120, 0.0))
|
||||
.expect("insert session");
|
||||
db.update_metrics(
|
||||
"sess-1",
|
||||
&SessionMetrics {
|
||||
input_tokens: 90,
|
||||
output_tokens: 30,
|
||||
tokens_used: 120,
|
||||
tool_calls: 0,
|
||||
files_changed: 0,
|
||||
duration_secs: 0,
|
||||
cost_usd: 0.0,
|
||||
},
|
||||
)
|
||||
.expect("persist metrics");
|
||||
|
||||
let mut dashboard = Dashboard::new(db, cfg);
|
||||
dashboard.refresh();
|
||||
|
||||
assert_eq!(dashboard.sessions.len(), 1);
|
||||
assert_eq!(dashboard.sessions[0].state, SessionState::Stopped);
|
||||
assert_eq!(
|
||||
dashboard.operator_note.as_deref(),
|
||||
Some("token budget exceeded | auto-paused 1 active session(s)")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_session_task_uses_selected_session_context() {
|
||||
let dashboard = test_dashboard(
|
||||
|
||||
Reference in New Issue
Block a user