feat(layer4): live messages-table wiring for proximity triggers

Finishes the steer/transmit loop — advisories now reach the agents' sessions.

- message-sink.js: createEccMessageSink() delivers via the canonical writer
  'ecc-tui messages send' (maps steer/hold -> conflict kind, transmit -> query),
  resolving the binary from override/env/built target/PATH. Injectable runner;
  best-effort (a missing binary/failed send is counted skipped, never blocks).
- proximity.js: createProximityDispatcher() adds per-trigger cooldown so a
  persistent collision fires once then stays quiet (agents get steered, not
  spammed); runProximityTick() builds the snapshot and dispatches.
- scripts/proximity-tick.js: thin CLI — one-shot, --dry-run, --watch <sec>.
  Messages are internal ECC agent-to-agent coordination, not any external channel.
- 14 new tests (sink argv/kind mapping, cooldown dedup, tick dispatch/dry-run,
  CLI parse). Full suite 2891/2891; lint green.
This commit is contained in:
Affaan Mustafa
2026-06-20 20:49:17 -04:00
parent bd1be0c1ce
commit 71d22d0a77
5 changed files with 397 additions and 2 deletions
+70
View File
@@ -0,0 +1,70 @@
'use strict';
/**
* Concrete message sink for proximity triggers: delivers a session-to-session
* message through the canonical writer, the `ecc-tui messages send` CLI. The CLI
* owns the ecc2 session DB (the `messages` table the control pane reads), so we
* shell out to it rather than writing the SQLite directly and racing the daemon.
*
* Best-effort: if the binary is not found / the command fails, the call throws,
* and the dispatcher counts it as skipped — proximity never blocks on delivery.
* The command runner and binary path are injectable for tests.
*/
const fs = require('fs');
const path = require('path');
const { execFileSync } = require('child_process');
// Proximity trigger type → ecc-tui message kind (value_enum on `--kind`).
// A steer/hold is a collision warning; a transmit is a "what are you doing" query.
const KIND_BY_TYPE = {
proximity_steer: 'conflict',
proximity_hold: 'conflict',
proximity_transmit: 'query'
};
/**
* Resolve the ecc-tui binary: explicit override, env var, a built target in the
* repo, then the bare name (hope it's on PATH).
*/
function resolveEccBin(deps = {}) {
if (deps.binPath) return deps.binPath;
if (process.env.ECC_TUI_BIN && process.env.ECC_TUI_BIN.trim()) return process.env.ECC_TUI_BIN.trim();
const repoRoot = deps.repoRoot || path.join(__dirname, '..', '..', '..');
for (const rel of ['ecc2/target/release/ecc-tui', 'ecc2/target/debug/ecc-tui']) {
const candidate = path.join(repoRoot, rel);
try {
if (fs.existsSync(candidate)) return candidate;
} catch {
/* ignore */
}
}
return 'ecc-tui';
}
/**
* Build the `messages send` argv for a proximity message.
*/
function buildSendArgs({ fromSession, toSession, content, msgType }) {
const kind = KIND_BY_TYPE[msgType] || 'query';
return ['messages', 'send', '--from', String(fromSession), '--to', String(toSession), '--kind', kind, '--text', String(content)];
}
/**
* Create a `sendMessage({ fromSession, toSession, content, msgType })` sink that
* delivers via `ecc-tui messages send`. Inject `runCommand(bin, args)` for tests.
*/
function createEccMessageSink(deps = {}) {
const run = deps.runCommand || ((bin, args) => execFileSync(bin, args, { encoding: 'utf8', timeout: 5000, stdio: ['ignore', 'pipe', 'pipe'] }));
const bin = resolveEccBin(deps);
return function sendMessage(message) {
run(bin, buildSendArgs(message));
};
}
module.exports = {
KIND_BY_TYPE,
resolveEccBin,
buildSendArgs,
createEccMessageSink
};
+74 -1
View File
@@ -179,11 +179,84 @@ function dispatchProximityTriggers(triggers, deps = {}) {
return { dispatched, skipped };
}
/**
* Stateful dispatcher with per-trigger cooldown, so a collision that persists
* across many ticks fires once and then stays quiet until it clears or the
* cooldown lapses — agents get steered, not spammed. Inject `sendMessage`
* (e.g. createEccMessageSink) and optionally `now`/`cooldownMs` for tests.
*/
function createProximityDispatcher(deps = {}) {
const send = deps.sendMessage;
const cooldownMs = Number.isFinite(deps.cooldownMs) ? deps.cooldownMs : 5 * 60 * 1000;
const now = typeof deps.now === 'function' ? deps.now : () => Date.now();
const lastFired = new Map();
const keyOf = t => `${t.to}<-${t.from}:${t.type}`;
return {
dispatch(triggers) {
let dispatched = 0;
let suppressed = 0;
let skipped = 0;
for (const t of triggers || []) {
const key = keyOf(t);
const last = lastFired.get(key);
const ts = now();
if (last !== undefined && ts - last < cooldownMs) {
suppressed += 1;
continue;
}
if (typeof send !== 'function') {
skipped += 1;
continue;
}
try {
send({ fromSession: t.from, toSession: t.to, content: t.content, msgType: t.type });
lastFired.set(key, ts);
dispatched += 1;
} catch {
skipped += 1;
}
}
return { dispatched, suppressed, skipped };
},
reset() {
lastFired.clear();
}
};
}
/**
* One proximity tick: build the snapshot, then dispatch its triggers (steer the
* agents). `buildSnapshot()` returns a control-pane snapshot with a `proximity`
* field; `dispatcher` is a createProximityDispatcher. `dryRun` reports what
* would fire without sending. Both are injected so the CLI stays a thin wrapper
* and the logic is unit-testable.
*/
async function runProximityTick(deps = {}) {
const snapshot = await deps.buildSnapshot();
const prox = (snapshot && snapshot.proximity) || { advisories: [], triggers: [], counts: {} };
const triggers = prox.triggers || [];
let result;
if (deps.dryRun || !deps.dispatcher) {
result = { dispatched: 0, suppressed: 0, skipped: triggers.length, dryRun: Boolean(deps.dryRun) };
} else {
result = deps.dispatcher.dispatch(triggers);
}
return {
counts: prox.counts || {},
advisories: prox.advisories || [],
triggers,
result
};
}
module.exports = {
buildProximitySnapshot,
sessionsToAgents,
defaultWorkingSetFor,
defaultChangedFilesFor,
parseDiffRanges,
dispatchProximityTriggers
dispatchProximityTriggers,
createProximityDispatcher,
runProximityTick
};
+111
View File
@@ -0,0 +1,111 @@
#!/usr/bin/env node
'use strict';
/**
* Proximity tick — the live loop that turns the agent-space distance metric into
* action: scan the airspace from the control-pane state, then steer/transmit by
* sending session-to-session messages via `ecc-tui messages send`.
*
* node scripts/proximity-tick.js # one shot, deliver triggers
* node scripts/proximity-tick.js --dry-run # show what would fire, send nothing
* node scripts/proximity-tick.js --watch 30 # re-scan every 30s (dedupes per cooldown)
*
* Messages are internal ECC agent-to-agent coordination (the ecc2 `messages`
* table) — not any external channel.
*/
const { resolveControlPaneConfig, buildControlPaneSnapshot } = require('./lib/control-pane/state');
const { createProximityDispatcher, runProximityTick } = require('./lib/control-pane/proximity');
const { createEccMessageSink } = require('./lib/control-pane/message-sink');
function parseArgs(argv) {
const args = argv.slice(2);
const parsed = { watchSec: 0, dryRun: false, json: false, help: false, dbPath: null, stateDbPath: null };
for (let i = 0; i < args.length; i += 1) {
const a = args[i];
if (a === '--help' || a === '-h') parsed.help = true;
else if (a === '--dry-run') parsed.dryRun = true;
else if (a === '--json') parsed.json = true;
else if (a === '--watch') {
const v = Number.parseInt(args[i + 1], 10);
if (!Number.isInteger(v) || v <= 0) throw new Error('--watch needs a positive seconds value');
parsed.watchSec = v;
i += 1;
} else if (a === '--db') {
parsed.dbPath = args[i + 1];
i += 1;
} else if (a === '--state-db') {
parsed.stateDbPath = args[i + 1];
i += 1;
} else {
throw new Error(`Unknown argument: ${a}`);
}
}
return parsed;
}
function showHelp() {
console.log(
[
'Usage: node scripts/proximity-tick.js [--watch <seconds>] [--dry-run] [--json] [--db <path>] [--state-db <path>]',
'',
'Scan agent proximity from the control-pane state and steer/transmit by sending',
'internal session-to-session messages. --dry-run sends nothing; --watch loops.'
].join('\n')
);
}
function report(tick, json) {
if (json) {
console.log(JSON.stringify(tick, null, 2));
return;
}
const c = tick.counts || {};
console.log(
`proximity: ${c.agents ?? 0} agents, ${c.advisories ?? 0} advisories (${c.resolutions ?? 0} resolutions) — ` +
`dispatched ${tick.result.dispatched}, suppressed ${tick.result.suppressed}, skipped ${tick.result.skipped}` +
(tick.result.dryRun ? ' [dry-run]' : '')
);
for (const adv of tick.advisories.slice(0, 8)) {
const who = adv.level === 'resolution' ? `${adv.steer} steers (yields to ${adv.hold})` : 'both transmit intent';
console.log(` - ${(adv.risk * 100) | 0}% ${adv.level}: ${adv.aLabel || adv.a} <-> ${adv.bLabel || adv.b} => ${who}`);
}
}
async function main() {
const opts = parseArgs(process.argv);
if (opts.help) {
showHelp();
return;
}
const config = resolveControlPaneConfig(opts);
const sink = opts.dryRun ? null : createEccMessageSink({});
const dispatcher = createProximityDispatcher({ sendMessage: sink });
const buildSnapshot = () =>
buildControlPaneSnapshot({
config,
dbPath: opts.dbPath || config.dbPath,
stateDbPath: opts.stateDbPath || config.stateDbPath,
includeProximity: true
});
const once = async () => report(await runProximityTick({ buildSnapshot, dispatcher, dryRun: opts.dryRun }), opts.json);
await once();
if (opts.watchSec > 0) {
const sleep = ms => new Promise(r => setTimeout(r, ms));
for (;;) {
await sleep(opts.watchSec * 1000);
await once();
}
}
}
if (require.main === module) {
main().catch(err => {
process.stderr.write(`proximity-tick error: ${err.message}\n`);
process.exit(1);
});
}
module.exports = { parseArgs };
+104
View File
@@ -0,0 +1,104 @@
'use strict';
/**
* Tests for the proximity message sink (ecc-tui messages send) and the
* deduping dispatcher.
*/
const assert = require('assert');
const { KIND_BY_TYPE, buildSendArgs, createEccMessageSink, resolveEccBin } = require('../../scripts/lib/control-pane/message-sink');
const { createProximityDispatcher } = require('../../scripts/lib/control-pane/proximity');
let passed = 0;
let failed = 0;
function test(name, fn) {
try {
fn();
console.log(` PASS ${name}`);
passed += 1;
} catch (e) {
console.log(` FAIL ${name}`);
console.log(` ${e.message}`);
failed += 1;
}
}
console.log('\n=== Testing proximity message sink ===\n');
test('buildSendArgs: maps trigger type to message kind and shapes the CLI argv', () => {
const args = buildSendArgs({ fromSession: 'lead', toSession: 'worker', content: 'steer away', msgType: 'proximity_steer' });
assert.deepStrictEqual(args, ['messages', 'send', '--from', 'lead', '--to', 'worker', '--kind', 'conflict', '--text', 'steer away']);
assert.strictEqual(KIND_BY_TYPE.proximity_transmit, 'query');
});
test('createEccMessageSink: delivers via the injected runner with the resolved binary', () => {
const calls = [];
const send = createEccMessageSink({
binPath: '/fake/ecc-tui',
runCommand: (bin, args) => calls.push({ bin, args })
});
send({ fromSession: 'a', toSession: 'b', content: 'hello', msgType: 'proximity_transmit' });
assert.strictEqual(calls.length, 1);
assert.strictEqual(calls[0].bin, '/fake/ecc-tui');
assert.deepStrictEqual(calls[0].args.slice(0, 2), ['messages', 'send']);
assert.ok(calls[0].args.includes('query'), 'transmit maps to query kind');
});
test('createEccMessageSink: a failing command propagates (dispatcher will count it skipped)', () => {
const send = createEccMessageSink({
binPath: 'ecc-tui',
runCommand: () => {
throw new Error('ENOENT');
}
});
assert.throws(() => send({ fromSession: 'a', toSession: 'b', content: 'x', msgType: 'proximity_steer' }));
});
test('resolveEccBin: honors explicit override', () => {
assert.strictEqual(resolveEccBin({ binPath: '/x/ecc-tui' }), '/x/ecc-tui');
});
test('dispatcher: fires once then suppresses the same trigger within cooldown', () => {
let clock = 1000;
const sent = [];
const dispatcher = createProximityDispatcher({
sendMessage: m => sent.push(m),
cooldownMs: 1000,
now: () => clock
});
const triggers = [{ to: 'worker', from: 'lead', type: 'proximity_steer', content: 'steer' }];
const r1 = dispatcher.dispatch(triggers);
assert.strictEqual(r1.dispatched, 1);
clock = 1500; // within cooldown
const r2 = dispatcher.dispatch(triggers);
assert.strictEqual(r2.dispatched, 0);
assert.strictEqual(r2.suppressed, 1);
clock = 2600; // past cooldown
const r3 = dispatcher.dispatch(triggers);
assert.strictEqual(r3.dispatched, 1);
assert.strictEqual(sent.length, 2, 'sent twice total, the middle one suppressed');
});
test('dispatcher: distinct triggers are not cross-suppressed', () => {
const sent = [];
const dispatcher = createProximityDispatcher({ sendMessage: m => sent.push(m), now: () => 0 });
const r = dispatcher.dispatch([
{ to: 'worker', from: 'lead', type: 'proximity_steer', content: 'a' },
{ to: 'lead', from: 'worker', type: 'proximity_hold', content: 'b' }
]);
assert.strictEqual(r.dispatched, 2);
});
test('dispatcher: no sink ⇒ skipped, never throws', () => {
const dispatcher = createProximityDispatcher({});
const r = dispatcher.dispatch([{ to: 'a', from: 'b', type: 'x', content: 'c' }]);
assert.strictEqual(r.skipped, 1);
assert.strictEqual(r.dispatched, 0);
});
console.log(`\nResults: Passed: ${passed}, Failed: ${failed}`);
if (failed > 0) process.exit(1);
+38 -1
View File
@@ -5,7 +5,8 @@
const assert = require('assert');
const { buildProximitySnapshot, sessionsToAgents, parseDiffRanges, dispatchProximityTriggers } = require('../../scripts/lib/control-pane/proximity');
const { buildProximitySnapshot, sessionsToAgents, parseDiffRanges, dispatchProximityTriggers, createProximityDispatcher, runProximityTick } = require('../../scripts/lib/control-pane/proximity');
const { parseArgs: parseTickArgs } = require('../../scripts/proximity-tick');
let passed = 0;
let failed = 0;
@@ -146,5 +147,41 @@ test('dispatchProximityTriggers: no sink ⇒ nothing thrown, all skipped', () =>
assert.strictEqual(r.skipped, 1);
});
test('runProximityTick: dispatches the snapshot triggers via the dispatcher', async () => {
const snapshot = {
proximity: {
counts: { agents: 2, advisories: 1, resolutions: 1 },
advisories: [{ a: 'lead', b: 'worker', level: 'resolution', risk: 0.9, steer: 'worker', hold: 'lead' }],
triggers: [
{ to: 'worker', from: 'lead', type: 'proximity_steer', content: 'steer' },
{ to: 'lead', from: 'worker', type: 'proximity_hold', content: 'hold' }
]
}
};
const sent = [];
const dispatcher = createProximityDispatcher({ sendMessage: m => sent.push(m), now: () => 0 });
const tick = await runProximityTick({ buildSnapshot: async () => snapshot, dispatcher });
assert.strictEqual(tick.result.dispatched, 2);
assert.strictEqual(sent.length, 2);
});
test('runProximityTick: dry-run sends nothing', async () => {
const snapshot = { proximity: { counts: {}, advisories: [], triggers: [{ to: 'a', from: 'b', type: 'x', content: 'c' }] } };
const sent = [];
const dispatcher = createProximityDispatcher({ sendMessage: m => sent.push(m) });
const tick = await runProximityTick({ buildSnapshot: async () => snapshot, dispatcher, dryRun: true });
assert.strictEqual(tick.result.dispatched, 0);
assert.strictEqual(tick.result.dryRun, true);
assert.strictEqual(sent.length, 0);
});
test('proximity-tick parseArgs: parses flags', () => {
const a = parseTickArgs(['node', 'proximity-tick.js', '--watch', '30', '--dry-run', '--db', '/x']);
assert.strictEqual(a.watchSec, 30);
assert.strictEqual(a.dryRun, true);
assert.strictEqual(a.dbPath, '/x');
assert.throws(() => parseTickArgs(['node', 'p', '--watch', 'nope']), /positive seconds/);
});
console.log(`\nResults: Passed: ${passed}, Failed: ${failed}`);
if (failed > 0) process.exit(1);