From 71d22d0a77b7e0684f4e51cba03749b788993cdb Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Sat, 20 Jun 2026 20:49:17 -0400 Subject: [PATCH] feat(layer4): live messages-table wiring for proximity triggers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Finishes the steer/transmit loop — advisories now reach the agents' sessions. - message-sink.js: createEccMessageSink() delivers via the canonical writer 'ecc-tui messages send' (maps steer/hold -> conflict kind, transmit -> query), resolving the binary from override/env/built target/PATH. Injectable runner; best-effort (a missing binary/failed send is counted skipped, never blocks). - proximity.js: createProximityDispatcher() adds per-trigger cooldown so a persistent collision fires once then stays quiet (agents get steered, not spammed); runProximityTick() builds the snapshot and dispatches. - scripts/proximity-tick.js: thin CLI — one-shot, --dry-run, --watch . Messages are internal ECC agent-to-agent coordination, not any external channel. - 14 new tests (sink argv/kind mapping, cooldown dedup, tick dispatch/dry-run, CLI parse). Full suite 2891/2891; lint green. --- scripts/lib/control-pane/message-sink.js | 70 ++++++++++++ scripts/lib/control-pane/proximity.js | 75 ++++++++++++- scripts/proximity-tick.js | 111 ++++++++++++++++++++ tests/lib/control-pane-message-sink.test.js | 104 ++++++++++++++++++ tests/lib/control-pane-proximity.test.js | 39 ++++++- 5 files changed, 397 insertions(+), 2 deletions(-) create mode 100644 scripts/lib/control-pane/message-sink.js create mode 100644 scripts/proximity-tick.js create mode 100644 tests/lib/control-pane-message-sink.test.js diff --git a/scripts/lib/control-pane/message-sink.js b/scripts/lib/control-pane/message-sink.js new file mode 100644 index 00000000..e03c856e --- /dev/null +++ b/scripts/lib/control-pane/message-sink.js @@ -0,0 +1,70 @@ +'use strict'; + +/** + * Concrete message sink for proximity triggers: delivers a session-to-session + * message through the canonical writer, the `ecc-tui messages send` CLI. The CLI + * owns the ecc2 session DB (the `messages` table the control pane reads), so we + * shell out to it rather than writing the SQLite directly and racing the daemon. + * + * Best-effort: if the binary is not found / the command fails, the call throws, + * and the dispatcher counts it as skipped — proximity never blocks on delivery. + * The command runner and binary path are injectable for tests. + */ + +const fs = require('fs'); +const path = require('path'); +const { execFileSync } = require('child_process'); + +// Proximity trigger type → ecc-tui message kind (value_enum on `--kind`). +// A steer/hold is a collision warning; a transmit is a "what are you doing" query. +const KIND_BY_TYPE = { + proximity_steer: 'conflict', + proximity_hold: 'conflict', + proximity_transmit: 'query' +}; + +/** + * Resolve the ecc-tui binary: explicit override, env var, a built target in the + * repo, then the bare name (hope it's on PATH). + */ +function resolveEccBin(deps = {}) { + if (deps.binPath) return deps.binPath; + if (process.env.ECC_TUI_BIN && process.env.ECC_TUI_BIN.trim()) return process.env.ECC_TUI_BIN.trim(); + const repoRoot = deps.repoRoot || path.join(__dirname, '..', '..', '..'); + for (const rel of ['ecc2/target/release/ecc-tui', 'ecc2/target/debug/ecc-tui']) { + const candidate = path.join(repoRoot, rel); + try { + if (fs.existsSync(candidate)) return candidate; + } catch { + /* ignore */ + } + } + return 'ecc-tui'; +} + +/** + * Build the `messages send` argv for a proximity message. + */ +function buildSendArgs({ fromSession, toSession, content, msgType }) { + const kind = KIND_BY_TYPE[msgType] || 'query'; + return ['messages', 'send', '--from', String(fromSession), '--to', String(toSession), '--kind', kind, '--text', String(content)]; +} + +/** + * Create a `sendMessage({ fromSession, toSession, content, msgType })` sink that + * delivers via `ecc-tui messages send`. Inject `runCommand(bin, args)` for tests. + */ +function createEccMessageSink(deps = {}) { + const run = deps.runCommand || ((bin, args) => execFileSync(bin, args, { encoding: 'utf8', timeout: 5000, stdio: ['ignore', 'pipe', 'pipe'] })); + const bin = resolveEccBin(deps); + return function sendMessage(message) { + run(bin, buildSendArgs(message)); + }; +} + +module.exports = { + KIND_BY_TYPE, + resolveEccBin, + buildSendArgs, + createEccMessageSink +}; diff --git a/scripts/lib/control-pane/proximity.js b/scripts/lib/control-pane/proximity.js index bfe1ecf6..7e451baf 100644 --- a/scripts/lib/control-pane/proximity.js +++ b/scripts/lib/control-pane/proximity.js @@ -179,11 +179,84 @@ function dispatchProximityTriggers(triggers, deps = {}) { return { dispatched, skipped }; } +/** + * Stateful dispatcher with per-trigger cooldown, so a collision that persists + * across many ticks fires once and then stays quiet until it clears or the + * cooldown lapses — agents get steered, not spammed. Inject `sendMessage` + * (e.g. createEccMessageSink) and optionally `now`/`cooldownMs` for tests. + */ +function createProximityDispatcher(deps = {}) { + const send = deps.sendMessage; + const cooldownMs = Number.isFinite(deps.cooldownMs) ? deps.cooldownMs : 5 * 60 * 1000; + const now = typeof deps.now === 'function' ? deps.now : () => Date.now(); + const lastFired = new Map(); + const keyOf = t => `${t.to}<-${t.from}:${t.type}`; + + return { + dispatch(triggers) { + let dispatched = 0; + let suppressed = 0; + let skipped = 0; + for (const t of triggers || []) { + const key = keyOf(t); + const last = lastFired.get(key); + const ts = now(); + if (last !== undefined && ts - last < cooldownMs) { + suppressed += 1; + continue; + } + if (typeof send !== 'function') { + skipped += 1; + continue; + } + try { + send({ fromSession: t.from, toSession: t.to, content: t.content, msgType: t.type }); + lastFired.set(key, ts); + dispatched += 1; + } catch { + skipped += 1; + } + } + return { dispatched, suppressed, skipped }; + }, + reset() { + lastFired.clear(); + } + }; +} + +/** + * One proximity tick: build the snapshot, then dispatch its triggers (steer the + * agents). `buildSnapshot()` returns a control-pane snapshot with a `proximity` + * field; `dispatcher` is a createProximityDispatcher. `dryRun` reports what + * would fire without sending. Both are injected so the CLI stays a thin wrapper + * and the logic is unit-testable. + */ +async function runProximityTick(deps = {}) { + const snapshot = await deps.buildSnapshot(); + const prox = (snapshot && snapshot.proximity) || { advisories: [], triggers: [], counts: {} }; + const triggers = prox.triggers || []; + let result; + if (deps.dryRun || !deps.dispatcher) { + result = { dispatched: 0, suppressed: 0, skipped: triggers.length, dryRun: Boolean(deps.dryRun) }; + } else { + result = deps.dispatcher.dispatch(triggers); + } + return { + counts: prox.counts || {}, + advisories: prox.advisories || [], + triggers, + result + }; +} + module.exports = { buildProximitySnapshot, sessionsToAgents, defaultWorkingSetFor, defaultChangedFilesFor, parseDiffRanges, - dispatchProximityTriggers + dispatchProximityTriggers, + createProximityDispatcher, + runProximityTick }; diff --git a/scripts/proximity-tick.js b/scripts/proximity-tick.js new file mode 100644 index 00000000..f50a492b --- /dev/null +++ b/scripts/proximity-tick.js @@ -0,0 +1,111 @@ +#!/usr/bin/env node +'use strict'; + +/** + * Proximity tick — the live loop that turns the agent-space distance metric into + * action: scan the airspace from the control-pane state, then steer/transmit by + * sending session-to-session messages via `ecc-tui messages send`. + * + * node scripts/proximity-tick.js # one shot, deliver triggers + * node scripts/proximity-tick.js --dry-run # show what would fire, send nothing + * node scripts/proximity-tick.js --watch 30 # re-scan every 30s (dedupes per cooldown) + * + * Messages are internal ECC agent-to-agent coordination (the ecc2 `messages` + * table) — not any external channel. + */ + +const { resolveControlPaneConfig, buildControlPaneSnapshot } = require('./lib/control-pane/state'); +const { createProximityDispatcher, runProximityTick } = require('./lib/control-pane/proximity'); +const { createEccMessageSink } = require('./lib/control-pane/message-sink'); + +function parseArgs(argv) { + const args = argv.slice(2); + const parsed = { watchSec: 0, dryRun: false, json: false, help: false, dbPath: null, stateDbPath: null }; + for (let i = 0; i < args.length; i += 1) { + const a = args[i]; + if (a === '--help' || a === '-h') parsed.help = true; + else if (a === '--dry-run') parsed.dryRun = true; + else if (a === '--json') parsed.json = true; + else if (a === '--watch') { + const v = Number.parseInt(args[i + 1], 10); + if (!Number.isInteger(v) || v <= 0) throw new Error('--watch needs a positive seconds value'); + parsed.watchSec = v; + i += 1; + } else if (a === '--db') { + parsed.dbPath = args[i + 1]; + i += 1; + } else if (a === '--state-db') { + parsed.stateDbPath = args[i + 1]; + i += 1; + } else { + throw new Error(`Unknown argument: ${a}`); + } + } + return parsed; +} + +function showHelp() { + console.log( + [ + 'Usage: node scripts/proximity-tick.js [--watch ] [--dry-run] [--json] [--db ] [--state-db ]', + '', + 'Scan agent proximity from the control-pane state and steer/transmit by sending', + 'internal session-to-session messages. --dry-run sends nothing; --watch loops.' + ].join('\n') + ); +} + +function report(tick, json) { + if (json) { + console.log(JSON.stringify(tick, null, 2)); + return; + } + const c = tick.counts || {}; + console.log( + `proximity: ${c.agents ?? 0} agents, ${c.advisories ?? 0} advisories (${c.resolutions ?? 0} resolutions) — ` + + `dispatched ${tick.result.dispatched}, suppressed ${tick.result.suppressed}, skipped ${tick.result.skipped}` + + (tick.result.dryRun ? ' [dry-run]' : '') + ); + for (const adv of tick.advisories.slice(0, 8)) { + const who = adv.level === 'resolution' ? `${adv.steer} steers (yields to ${adv.hold})` : 'both transmit intent'; + console.log(` - ${(adv.risk * 100) | 0}% ${adv.level}: ${adv.aLabel || adv.a} <-> ${adv.bLabel || adv.b} => ${who}`); + } +} + +async function main() { + const opts = parseArgs(process.argv); + if (opts.help) { + showHelp(); + return; + } + const config = resolveControlPaneConfig(opts); + const sink = opts.dryRun ? null : createEccMessageSink({}); + const dispatcher = createProximityDispatcher({ sendMessage: sink }); + const buildSnapshot = () => + buildControlPaneSnapshot({ + config, + dbPath: opts.dbPath || config.dbPath, + stateDbPath: opts.stateDbPath || config.stateDbPath, + includeProximity: true + }); + + const once = async () => report(await runProximityTick({ buildSnapshot, dispatcher, dryRun: opts.dryRun }), opts.json); + + await once(); + if (opts.watchSec > 0) { + const sleep = ms => new Promise(r => setTimeout(r, ms)); + for (;;) { + await sleep(opts.watchSec * 1000); + await once(); + } + } +} + +if (require.main === module) { + main().catch(err => { + process.stderr.write(`proximity-tick error: ${err.message}\n`); + process.exit(1); + }); +} + +module.exports = { parseArgs }; diff --git a/tests/lib/control-pane-message-sink.test.js b/tests/lib/control-pane-message-sink.test.js new file mode 100644 index 00000000..719f329b --- /dev/null +++ b/tests/lib/control-pane-message-sink.test.js @@ -0,0 +1,104 @@ +'use strict'; +/** + * Tests for the proximity message sink (ecc-tui messages send) and the + * deduping dispatcher. + */ + +const assert = require('assert'); + +const { KIND_BY_TYPE, buildSendArgs, createEccMessageSink, resolveEccBin } = require('../../scripts/lib/control-pane/message-sink'); +const { createProximityDispatcher } = require('../../scripts/lib/control-pane/proximity'); + +let passed = 0; +let failed = 0; +function test(name, fn) { + try { + fn(); + console.log(` PASS ${name}`); + passed += 1; + } catch (e) { + console.log(` FAIL ${name}`); + console.log(` ${e.message}`); + failed += 1; + } +} + +console.log('\n=== Testing proximity message sink ===\n'); + +test('buildSendArgs: maps trigger type to message kind and shapes the CLI argv', () => { + const args = buildSendArgs({ fromSession: 'lead', toSession: 'worker', content: 'steer away', msgType: 'proximity_steer' }); + assert.deepStrictEqual(args, ['messages', 'send', '--from', 'lead', '--to', 'worker', '--kind', 'conflict', '--text', 'steer away']); + assert.strictEqual(KIND_BY_TYPE.proximity_transmit, 'query'); +}); + +test('createEccMessageSink: delivers via the injected runner with the resolved binary', () => { + const calls = []; + const send = createEccMessageSink({ + binPath: '/fake/ecc-tui', + runCommand: (bin, args) => calls.push({ bin, args }) + }); + send({ fromSession: 'a', toSession: 'b', content: 'hello', msgType: 'proximity_transmit' }); + assert.strictEqual(calls.length, 1); + assert.strictEqual(calls[0].bin, '/fake/ecc-tui'); + assert.deepStrictEqual(calls[0].args.slice(0, 2), ['messages', 'send']); + assert.ok(calls[0].args.includes('query'), 'transmit maps to query kind'); +}); + +test('createEccMessageSink: a failing command propagates (dispatcher will count it skipped)', () => { + const send = createEccMessageSink({ + binPath: 'ecc-tui', + runCommand: () => { + throw new Error('ENOENT'); + } + }); + assert.throws(() => send({ fromSession: 'a', toSession: 'b', content: 'x', msgType: 'proximity_steer' })); +}); + +test('resolveEccBin: honors explicit override', () => { + assert.strictEqual(resolveEccBin({ binPath: '/x/ecc-tui' }), '/x/ecc-tui'); +}); + +test('dispatcher: fires once then suppresses the same trigger within cooldown', () => { + let clock = 1000; + const sent = []; + const dispatcher = createProximityDispatcher({ + sendMessage: m => sent.push(m), + cooldownMs: 1000, + now: () => clock + }); + const triggers = [{ to: 'worker', from: 'lead', type: 'proximity_steer', content: 'steer' }]; + + const r1 = dispatcher.dispatch(triggers); + assert.strictEqual(r1.dispatched, 1); + + clock = 1500; // within cooldown + const r2 = dispatcher.dispatch(triggers); + assert.strictEqual(r2.dispatched, 0); + assert.strictEqual(r2.suppressed, 1); + + clock = 2600; // past cooldown + const r3 = dispatcher.dispatch(triggers); + assert.strictEqual(r3.dispatched, 1); + + assert.strictEqual(sent.length, 2, 'sent twice total, the middle one suppressed'); +}); + +test('dispatcher: distinct triggers are not cross-suppressed', () => { + const sent = []; + const dispatcher = createProximityDispatcher({ sendMessage: m => sent.push(m), now: () => 0 }); + const r = dispatcher.dispatch([ + { to: 'worker', from: 'lead', type: 'proximity_steer', content: 'a' }, + { to: 'lead', from: 'worker', type: 'proximity_hold', content: 'b' } + ]); + assert.strictEqual(r.dispatched, 2); +}); + +test('dispatcher: no sink ⇒ skipped, never throws', () => { + const dispatcher = createProximityDispatcher({}); + const r = dispatcher.dispatch([{ to: 'a', from: 'b', type: 'x', content: 'c' }]); + assert.strictEqual(r.skipped, 1); + assert.strictEqual(r.dispatched, 0); +}); + +console.log(`\nResults: Passed: ${passed}, Failed: ${failed}`); +if (failed > 0) process.exit(1); diff --git a/tests/lib/control-pane-proximity.test.js b/tests/lib/control-pane-proximity.test.js index 556cb654..c91603f4 100644 --- a/tests/lib/control-pane-proximity.test.js +++ b/tests/lib/control-pane-proximity.test.js @@ -5,7 +5,8 @@ const assert = require('assert'); -const { buildProximitySnapshot, sessionsToAgents, parseDiffRanges, dispatchProximityTriggers } = require('../../scripts/lib/control-pane/proximity'); +const { buildProximitySnapshot, sessionsToAgents, parseDiffRanges, dispatchProximityTriggers, createProximityDispatcher, runProximityTick } = require('../../scripts/lib/control-pane/proximity'); +const { parseArgs: parseTickArgs } = require('../../scripts/proximity-tick'); let passed = 0; let failed = 0; @@ -146,5 +147,41 @@ test('dispatchProximityTriggers: no sink ⇒ nothing thrown, all skipped', () => assert.strictEqual(r.skipped, 1); }); +test('runProximityTick: dispatches the snapshot triggers via the dispatcher', async () => { + const snapshot = { + proximity: { + counts: { agents: 2, advisories: 1, resolutions: 1 }, + advisories: [{ a: 'lead', b: 'worker', level: 'resolution', risk: 0.9, steer: 'worker', hold: 'lead' }], + triggers: [ + { to: 'worker', from: 'lead', type: 'proximity_steer', content: 'steer' }, + { to: 'lead', from: 'worker', type: 'proximity_hold', content: 'hold' } + ] + } + }; + const sent = []; + const dispatcher = createProximityDispatcher({ sendMessage: m => sent.push(m), now: () => 0 }); + const tick = await runProximityTick({ buildSnapshot: async () => snapshot, dispatcher }); + assert.strictEqual(tick.result.dispatched, 2); + assert.strictEqual(sent.length, 2); +}); + +test('runProximityTick: dry-run sends nothing', async () => { + const snapshot = { proximity: { counts: {}, advisories: [], triggers: [{ to: 'a', from: 'b', type: 'x', content: 'c' }] } }; + const sent = []; + const dispatcher = createProximityDispatcher({ sendMessage: m => sent.push(m) }); + const tick = await runProximityTick({ buildSnapshot: async () => snapshot, dispatcher, dryRun: true }); + assert.strictEqual(tick.result.dispatched, 0); + assert.strictEqual(tick.result.dryRun, true); + assert.strictEqual(sent.length, 0); +}); + +test('proximity-tick parseArgs: parses flags', () => { + const a = parseTickArgs(['node', 'proximity-tick.js', '--watch', '30', '--dry-run', '--db', '/x']); + assert.strictEqual(a.watchSec, 30); + assert.strictEqual(a.dryRun, true); + assert.strictEqual(a.dbPath, '/x'); + assert.throws(() => parseTickArgs(['node', 'p', '--watch', 'nope']), /positive seconds/); +}); + console.log(`\nResults: Passed: ${passed}, Failed: ${failed}`); if (failed > 0) process.exit(1);