From 16e9b17ad7314cec9ef18fd1d2b3874daf64f760 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Thu, 2 Apr 2026 18:02:29 -0700 Subject: [PATCH] fix: clean up observer sessions on lifecycle end --- scripts/hooks/session-end-marker.js | 46 ++++- scripts/hooks/session-start.js | 13 ++ scripts/lib/observer-sessions.js | 175 ++++++++++++++++++ .../agents/observer-loop.sh | 61 ++++++ .../continuous-learning-v2/hooks/observe.sh | 3 + tests/hooks/observer-memory.test.js | 19 ++ tests/integration/hooks.test.js | 95 ++++++++++ 7 files changed, 408 insertions(+), 4 deletions(-) create mode 100644 scripts/lib/observer-sessions.js diff --git a/scripts/hooks/session-end-marker.js b/scripts/hooks/session-end-marker.js index c635a93d..7b27a8eb 100755 --- a/scripts/hooks/session-end-marker.js +++ b/scripts/hooks/session-end-marker.js @@ -2,12 +2,50 @@ 'use strict'; /** - * Session end marker hook - outputs stdin to stdout unchanged. - * Exports run() for in-process execution (avoids spawnSync issues on Windows). + * Session end marker hook - performs lightweight observer cleanup and + * outputs stdin to stdout unchanged. Exports run() for in-process execution. */ +const { + resolveProjectContext, + removeSessionLease, + listSessionLeases, + stopObserverForContext, + resolveSessionId +} = require('../lib/observer-sessions'); + +function log(message) { + process.stderr.write(`[SessionEnd] ${message}\n`); +} + function run(rawInput) { - return rawInput || ''; + const output = rawInput || ''; + const sessionId = resolveSessionId(); + + if (!sessionId) { + log('No CLAUDE_SESSION_ID available; skipping observer cleanup'); + return output; + } + + try { + const observerContext = resolveProjectContext(); + removeSessionLease(observerContext, sessionId); + const remainingLeases = listSessionLeases(observerContext); + + if (remainingLeases.length === 0) { + if (stopObserverForContext(observerContext)) { + log(`Stopped observer for project ${observerContext.projectId} after final session lease ended`); + } else { + log(`No running observer to stop for project ${observerContext.projectId}`); + } + } else { + log(`Retained observer for project ${observerContext.projectId}; ${remainingLeases.length} session lease(s) remain`); + } + } catch (err) { + log(`Observer cleanup skipped: ${err.message}`); + } + + return output; } // Legacy CLI execution (when run directly) @@ -22,7 +60,7 @@ if (require.main === module) { } }); process.stdin.on('end', () => { - process.stdout.write(raw); + process.stdout.write(run(raw)); }); } diff --git a/scripts/hooks/session-start.js b/scripts/hooks/session-start.js index c95881ff..93b3c508 100644 --- a/scripts/hooks/session-start.js +++ b/scripts/hooks/session-start.js @@ -20,6 +20,7 @@ const { stripAnsi, log } = require('../lib/utils'); +const { resolveProjectContext, writeSessionLease, resolveSessionId } = require('../lib/observer-sessions'); const { getPackageManager, getSelectionPrompt } = require('../lib/package-manager'); const { listAliases } = require('../lib/session-aliases'); const { detectProjectType } = require('../lib/project-detect'); @@ -163,6 +164,18 @@ async function main() { ensureDir(sessionsDir); ensureDir(learnedDir); + const observerSessionId = resolveSessionId(); + if (observerSessionId) { + const observerContext = resolveProjectContext(); + writeSessionLease(observerContext, observerSessionId, { + hook: 'SessionStart', + projectRoot: observerContext.projectRoot + }); + log(`[SessionStart] Registered observer lease for ${observerSessionId}`); + } else { + log('[SessionStart] No CLAUDE_SESSION_ID available; skipping observer lease registration'); + } + // Check for recent session files (last 7 days) const recentSessions = dedupeRecentSessions(getSessionSearchDirs()); diff --git a/scripts/lib/observer-sessions.js b/scripts/lib/observer-sessions.js new file mode 100644 index 00000000..44742a3c --- /dev/null +++ b/scripts/lib/observer-sessions.js @@ -0,0 +1,175 @@ +const fs = require('fs'); +const path = require('path'); +const crypto = require('crypto'); +const { spawnSync } = require('child_process'); +const { getClaudeDir, ensureDir, sanitizeSessionId } = require('./utils'); + +function getHomunculusDir() { + return path.join(getClaudeDir(), 'homunculus'); +} + +function getProjectsDir() { + return path.join(getHomunculusDir(), 'projects'); +} + +function getProjectRegistryPath() { + return path.join(getHomunculusDir(), 'projects.json'); +} + +function readProjectRegistry() { + try { + return JSON.parse(fs.readFileSync(getProjectRegistryPath(), 'utf8')); + } catch { + return {}; + } +} + +function runGit(args, cwd) { + const result = spawnSync('git', args, { + cwd, + encoding: 'utf8', + stdio: ['ignore', 'pipe', 'ignore'] + }); + if (result.status !== 0) return ''; + return (result.stdout || '').trim(); +} + +function stripRemoteCredentials(remoteUrl) { + if (!remoteUrl) return ''; + return String(remoteUrl).replace(/:\/\/[^@]+@/, '://'); +} + +function resolveProjectRoot(cwd = process.cwd()) { + const envRoot = process.env.CLAUDE_PROJECT_DIR; + if (envRoot && fs.existsSync(envRoot)) { + return path.resolve(envRoot); + } + + const gitRoot = runGit(['rev-parse', '--show-toplevel'], cwd); + if (gitRoot) return path.resolve(gitRoot); + + return ''; +} + +function computeProjectId(projectRoot) { + const remoteUrl = stripRemoteCredentials(runGit(['remote', 'get-url', 'origin'], projectRoot)); + return crypto.createHash('sha256').update(remoteUrl || projectRoot).digest('hex').slice(0, 12); +} + +function resolveProjectContext(cwd = process.cwd()) { + const projectRoot = resolveProjectRoot(cwd); + if (!projectRoot) { + const projectDir = getHomunculusDir(); + ensureDir(projectDir); + return { projectId: 'global', projectRoot: '', projectDir, isGlobal: true }; + } + + const registry = readProjectRegistry(); + const registryEntry = Object.values(registry).find(entry => entry && path.resolve(entry.root || '') === projectRoot); + const projectId = registryEntry?.id || computeProjectId(projectRoot); + const projectDir = path.join(getProjectsDir(), projectId); + ensureDir(projectDir); + + return { projectId, projectRoot, projectDir, isGlobal: false }; +} + +function getObserverPidFile(context) { + return path.join(context.projectDir, '.observer.pid'); +} + +function getObserverSignalCounterFile(context) { + return path.join(context.projectDir, '.observer-signal-counter'); +} + +function getObserverActivityFile(context) { + return path.join(context.projectDir, '.observer-last-activity'); +} + +function getSessionLeaseDir(context) { + return path.join(context.projectDir, '.observer-sessions'); +} + +function resolveSessionId(rawSessionId = process.env.CLAUDE_SESSION_ID) { + return sanitizeSessionId(rawSessionId || '') || ''; +} + +function getSessionLeaseFile(context, rawSessionId = process.env.CLAUDE_SESSION_ID) { + const sessionId = resolveSessionId(rawSessionId); + if (!sessionId) return ''; + return path.join(getSessionLeaseDir(context), `${sessionId}.json`); +} + +function writeSessionLease(context, rawSessionId = process.env.CLAUDE_SESSION_ID, extra = {}) { + const leaseFile = getSessionLeaseFile(context, rawSessionId); + if (!leaseFile) return ''; + + ensureDir(getSessionLeaseDir(context)); + const payload = { + sessionId: resolveSessionId(rawSessionId), + cwd: process.cwd(), + pid: process.pid, + updatedAt: new Date().toISOString(), + ...extra + }; + fs.writeFileSync(leaseFile, JSON.stringify(payload, null, 2) + '\n'); + return leaseFile; +} + +function removeSessionLease(context, rawSessionId = process.env.CLAUDE_SESSION_ID) { + const leaseFile = getSessionLeaseFile(context, rawSessionId); + if (!leaseFile) return false; + try { + fs.rmSync(leaseFile, { force: true }); + return true; + } catch { + return false; + } +} + +function listSessionLeases(context) { + const leaseDir = getSessionLeaseDir(context); + if (!fs.existsSync(leaseDir)) return []; + return fs.readdirSync(leaseDir) + .filter(name => name.endsWith('.json')) + .map(name => path.join(leaseDir, name)); +} + +function stopObserverForContext(context) { + const pidFile = getObserverPidFile(context); + if (!fs.existsSync(pidFile)) return false; + + const pid = (fs.readFileSync(pidFile, 'utf8') || '').trim(); + if (!/^[0-9]+$/.test(pid) || pid === '0' || pid === '1') { + fs.rmSync(pidFile, { force: true }); + return false; + } + + try { + process.kill(Number(pid), 0); + } catch { + fs.rmSync(pidFile, { force: true }); + return false; + } + + try { + process.kill(Number(pid), 'SIGTERM'); + } catch { + return false; + } + + fs.rmSync(pidFile, { force: true }); + fs.rmSync(getObserverSignalCounterFile(context), { force: true }); + return true; +} + +module.exports = { + resolveProjectContext, + getObserverActivityFile, + getObserverPidFile, + getSessionLeaseDir, + writeSessionLease, + removeSessionLease, + listSessionLeases, + stopObserverForContext, + resolveSessionId +}; diff --git a/skills/continuous-learning-v2/agents/observer-loop.sh b/skills/continuous-learning-v2/agents/observer-loop.sh index e2cad9af..e2045d66 100755 --- a/skills/continuous-learning-v2/agents/observer-loop.sh +++ b/skills/continuous-learning-v2/agents/observer-loop.sh @@ -14,6 +14,9 @@ ANALYZING=0 LAST_ANALYSIS_EPOCH=0 # Minimum seconds between analyses (prevents rapid re-triggering) ANALYSIS_COOLDOWN="${ECC_OBSERVER_ANALYSIS_COOLDOWN:-60}" +IDLE_TIMEOUT_SECONDS="${ECC_OBSERVER_IDLE_TIMEOUT_SECONDS:-1800}" +SESSION_LEASE_DIR="${PROJECT_DIR}/.observer-sessions" +ACTIVITY_FILE="${PROJECT_DIR}/.observer-last-activity" cleanup() { [ -n "$SLEEP_PID" ] && kill "$SLEEP_PID" 2>/dev/null @@ -24,6 +27,62 @@ cleanup() { } trap cleanup TERM INT +file_mtime_epoch() { + local file="$1" + if [ ! -f "$file" ]; then + printf '0\n' + return + fi + + if stat -c %Y "$file" >/dev/null 2>&1; then + stat -c %Y "$file" 2>/dev/null || printf '0\n' + return + fi + + if stat -f %m "$file" >/dev/null 2>&1; then + stat -f %m "$file" 2>/dev/null || printf '0\n' + return + fi + + printf '0\n' +} + +has_active_session_leases() { + if [ ! -d "$SESSION_LEASE_DIR" ]; then + return 1 + fi + + find "$SESSION_LEASE_DIR" -type f -name '*.json' -print -quit 2>/dev/null | grep -q . +} + +latest_activity_epoch() { + local observations_epoch activity_epoch + observations_epoch="$(file_mtime_epoch "$OBSERVATIONS_FILE")" + activity_epoch="$(file_mtime_epoch "$ACTIVITY_FILE")" + + if [ "$activity_epoch" -gt "$observations_epoch" ] 2>/dev/null; then + printf '%s\n' "$activity_epoch" + else + printf '%s\n' "$observations_epoch" + fi +} + +exit_if_idle_without_sessions() { + if has_active_session_leases; then + return + fi + + local last_activity now_epoch idle_for + last_activity="$(latest_activity_epoch)" + now_epoch="$(date +%s)" + idle_for=$(( now_epoch - last_activity )) + + if [ "$last_activity" -eq 0 ] || [ "$idle_for" -ge "$IDLE_TIMEOUT_SECONDS" ]; then + echo "[$(date)] Observer idle without active session leases for ${idle_for}s; exiting" >> "$LOG_FILE" + cleanup + fi +} + analyze_observations() { if [ ! -f "$OBSERVATIONS_FILE" ]; then return @@ -197,11 +256,13 @@ SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" "${CLV2_PYTHON_CMD:-python3}" "${SCRIPT_DIR}/../scripts/instinct-cli.py" prune --quiet >> "$LOG_FILE" 2>&1 || echo "[$(date)] Warning: instinct prune failed (non-fatal)" >> "$LOG_FILE" while true; do + exit_if_idle_without_sessions sleep "$OBSERVER_INTERVAL_SECONDS" & SLEEP_PID=$! wait "$SLEEP_PID" 2>/dev/null SLEEP_PID="" + exit_if_idle_without_sessions if [ "$USR1_FIRED" -eq 1 ]; then USR1_FIRED=0 else diff --git a/skills/continuous-learning-v2/hooks/observe.sh b/skills/continuous-learning-v2/hooks/observe.sh index 487c7635..67542ad0 100755 --- a/skills/continuous-learning-v2/hooks/observe.sh +++ b/skills/continuous-learning-v2/hooks/observe.sh @@ -386,6 +386,9 @@ fi # which caused runaway parallel Claude analysis processes. SIGNAL_EVERY_N="${ECC_OBSERVER_SIGNAL_EVERY_N:-20}" SIGNAL_COUNTER_FILE="${PROJECT_DIR}/.observer-signal-counter" +ACTIVITY_FILE="${PROJECT_DIR}/.observer-last-activity" + +touch "$ACTIVITY_FILE" 2>/dev/null || true should_signal=0 if [ -f "$SIGNAL_COUNTER_FILE" ]; then diff --git a/tests/hooks/observer-memory.test.js b/tests/hooks/observer-memory.test.js index 8f91df40..36b0da98 100644 --- a/tests/hooks/observer-memory.test.js +++ b/tests/hooks/observer-memory.test.js @@ -76,6 +76,12 @@ test('observe.sh default throttle is 20 observations per signal', () => { assert.ok(content.includes('ECC_OBSERVER_SIGNAL_EVERY_N:-20'), 'Default signal frequency should be every 20 observations'); }); +test('observe.sh touches observer activity marker on each observation', () => { + const content = fs.readFileSync(observeShPath, 'utf8'); + assert.ok(content.includes('ACTIVITY_FILE="${PROJECT_DIR}/.observer-last-activity"'), 'observe.sh should define a project-scoped activity marker'); + assert.ok(content.includes('touch "$ACTIVITY_FILE"'), 'observe.sh should update activity marker during observation capture'); +}); + // ────────────────────────────────────────────────────── // Test group 2: observer-loop.sh re-entrancy guard // ────────────────────────────────────────────────────── @@ -126,6 +132,19 @@ test('default cooldown is 60 seconds', () => { assert.ok(content.includes('ECC_OBSERVER_ANALYSIS_COOLDOWN:-60'), 'Default cooldown should be 60 seconds'); }); +test('observer-loop.sh defines idle timeout fallback', () => { + const content = fs.readFileSync(observerLoopPath, 'utf8'); + assert.ok(content.includes('IDLE_TIMEOUT_SECONDS'), 'observer-loop.sh should define an idle timeout'); + assert.ok(content.includes('ECC_OBSERVER_IDLE_TIMEOUT_SECONDS:-1800'), 'Default idle timeout should be 30 minutes'); +}); + +test('observer-loop.sh checks session lease directory before self-termination', () => { + const content = fs.readFileSync(observerLoopPath, 'utf8'); + assert.ok(content.includes('SESSION_LEASE_DIR="${PROJECT_DIR}/.observer-sessions"'), 'observer-loop.sh should track active observer session leases'); + assert.ok(content.includes('has_active_session_leases'), 'observer-loop.sh should define active session lease checks'); + assert.ok(content.includes('exit_if_idle_without_sessions'), 'observer-loop.sh should define idle self-termination helper'); +}); + // ────────────────────────────────────────────────────── // Test group 4: Tail-based sampling (no full file load) // ────────────────────────────────────────────────────── diff --git a/tests/integration/hooks.test.js b/tests/integration/hooks.test.js index 7ac768ba..a5132a1d 100644 --- a/tests/integration/hooks.test.js +++ b/tests/integration/hooks.test.js @@ -303,6 +303,101 @@ async function runTests() { assert.strictEqual(result.code, 0, 'Non-blocking hook should exit 0'); })) passed++; else failed++; + if (await asyncTest('session-start registers an observer lease for the active session', async () => { + const testDir = createTestDir(); + const projectDir = path.join(testDir, 'project'); + fs.mkdirSync(projectDir, { recursive: true }); + + try { + const sessionId = `session-${Date.now()}`; + const result = await runHookWithInput( + path.join(scriptsDir, 'session-start.js'), + {}, + { + HOME: testDir, + CLAUDE_PROJECT_DIR: projectDir, + CLAUDE_SESSION_ID: sessionId + } + ); + + assert.strictEqual(result.code, 0, 'SessionStart should exit 0'); + const homunculusDir = path.join(testDir, '.claude', 'homunculus'); + const projectsDir = path.join(homunculusDir, 'projects'); + const projectEntries = fs.existsSync(projectsDir) ? fs.readdirSync(projectsDir) : []; + assert.ok(projectEntries.length > 0, 'SessionStart should create a homunculus project directory'); + const leaseDir = path.join(projectsDir, projectEntries[0], '.observer-sessions'); + const leaseFiles = fs.existsSync(leaseDir) ? fs.readdirSync(leaseDir).filter(name => name.endsWith('.json')) : []; + assert.ok(leaseFiles.length === 1, `Expected one observer lease file, found ${leaseFiles.length}`); + } finally { + cleanupTestDir(testDir); + } + })) passed++; else failed++; + + if (await asyncTest('session-end-marker removes the last lease and stops the observer process', async () => { + const testDir = createTestDir(); + const projectDir = path.join(testDir, 'project'); + fs.mkdirSync(projectDir, { recursive: true }); + + const sessionId = `session-${Date.now()}`; + const sleeper = spawn(process.execPath, ['-e', "process.on('SIGTERM', () => process.exit(0)); setInterval(() => {}, 1000)"], { + stdio: 'ignore' + }); + + try { + await runHookWithInput( + path.join(scriptsDir, 'session-start.js'), + {}, + { + HOME: testDir, + CLAUDE_PROJECT_DIR: projectDir, + CLAUDE_SESSION_ID: sessionId + } + ); + + const homunculusDir = path.join(testDir, '.claude', 'homunculus'); + const projectsDir = path.join(homunculusDir, 'projects'); + const projectEntries = fs.existsSync(projectsDir) ? fs.readdirSync(projectsDir) : []; + assert.ok(projectEntries.length > 0, 'Expected SessionStart to create a homunculus project directory'); + const projectStorageDir = path.join(projectsDir, projectEntries[0]); + const pidFile = path.join(projectStorageDir, '.observer.pid'); + fs.writeFileSync(pidFile, `${sleeper.pid}\n`); + + const markerInput = { hook_event_name: 'SessionEnd' }; + const result = await runHookWithInput( + path.join(scriptsDir, 'session-end-marker.js'), + markerInput, + { + HOME: testDir, + CLAUDE_PROJECT_DIR: projectDir, + CLAUDE_SESSION_ID: sessionId + } + ); + + assert.strictEqual(result.code, 0, 'SessionEnd marker should exit 0'); + assert.strictEqual(result.stdout, JSON.stringify(markerInput), 'SessionEnd marker should pass stdin through unchanged'); + + await new Promise(resolve => setTimeout(resolve, 150)); + const exited = sleeper.exitCode !== null || sleeper.signalCode !== null; + let processAlive = !exited; + if (processAlive) { + try { + process.kill(sleeper.pid, 0); + } catch { + processAlive = false; + } + } + assert.strictEqual(processAlive, false, 'SessionEnd marker should stop the observer process when the last lease ends'); + + const leaseDir = path.join(projectStorageDir, '.observer-sessions'); + const leaseFiles = fs.existsSync(leaseDir) ? fs.readdirSync(leaseDir).filter(name => name.endsWith('.json')) : []; + assert.strictEqual(leaseFiles.length, 0, 'SessionEnd marker should remove the finished session lease'); + assert.strictEqual(fs.existsSync(pidFile), false, 'SessionEnd marker should remove the observer pid file after stopping it'); + } finally { + sleeper.kill(); + cleanupTestDir(testDir); + } + })) passed++; else failed++; + if (await asyncTest('dev server hook transforms yarn dev to tmux session', async () => { // The auto-tmux dev hook transforms dev commands (yarn dev, npm run dev, etc.) const hookCommand = getHookCommandByDescription(