docs: resolve videodb review findings

This commit is contained in:
Affaan Mustafa
2026-03-10 21:18:33 -07:00
parent 2581bebfd9
commit db2bf16427
5 changed files with 188 additions and 134 deletions

View File

@@ -6,7 +6,7 @@ Usage:
python scripts/ws_listener.py [OPTIONS] [output_dir]
Arguments:
output_dir Directory for output files (default: /tmp or VIDEODB_EVENTS_DIR env var)
output_dir Directory for output files (default: XDG_STATE_HOME/videodb or ~/.local/state/videodb)
Options:
--clear Clear the events file before starting (use when starting a new session)
@@ -20,10 +20,10 @@ Output (first line, for parsing):
WS_ID=<connection_id>
Examples:
python scripts/ws_listener.py & # Run in background
python scripts/ws_listener.py --clear # Clear events and start fresh
python scripts/ws_listener.py --clear /tmp/mydir # Custom dir with clear
kill $(cat /tmp/videodb_ws_pid) # Stop the listener
python scripts/ws_listener.py & # Run in background
python scripts/ws_listener.py --clear # Clear events and start fresh
python scripts/ws_listener.py --clear /tmp/mydir # Custom dir with clear
kill "$(cat ~/.local/state/videodb/videodb_ws_pid)" # Stop the listener
"""
import os
import sys
@@ -31,6 +31,7 @@ import json
import signal
import asyncio
import logging
import contextlib
from datetime import datetime, timezone
from pathlib import Path
@@ -52,6 +53,27 @@ logging.basicConfig(
LOGGER = logging.getLogger(__name__)
# Parse arguments
RETRYABLE_ERRORS = (ConnectionError, TimeoutError)
def default_output_dir() -> Path:
"""Return a private per-user state directory for listener artifacts."""
xdg_state_home = os.environ.get("XDG_STATE_HOME")
if xdg_state_home:
return Path(xdg_state_home) / "videodb"
return Path.home() / ".local" / "state" / "videodb"
def ensure_private_dir(path: Path) -> Path:
"""Create the listener state directory with private permissions."""
path.mkdir(parents=True, exist_ok=True, mode=0o700)
try:
path.chmod(0o700)
except OSError:
pass
return path
def parse_args() -> tuple[bool, Path]:
clear = False
output_dir: str | None = None
@@ -64,9 +86,9 @@ def parse_args() -> tuple[bool, Path]:
output_dir = arg
if output_dir is None:
output_dir = os.environ.get("VIDEODB_EVENTS_DIR", "/tmp")
return clear, Path(output_dir)
return clear, ensure_private_dir(default_output_dir())
return clear, ensure_private_dir(Path(output_dir))
CLEAR_EVENTS, OUTPUT_DIR = parse_args()
EVENTS_FILE = OUTPUT_DIR / "videodb_events.jsonl"
@@ -93,7 +115,7 @@ def append_event(event: dict):
def write_pid():
"""Write PID file for easy process management."""
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True, mode=0o700)
PID_FILE.write_text(str(os.getpid()))
@@ -118,43 +140,10 @@ async def listen_with_retry():
ws_wrapper = conn.connect_websocket()
ws = await ws_wrapper.connect()
ws_id = ws.connection_id
# Ensure output directory exists
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# Clear events file only on first connection if --clear flag is set
if _first_connection and CLEAR_EVENTS:
EVENTS_FILE.unlink(missing_ok=True)
log("Cleared events file")
_first_connection = False
# Write ws_id to file for easy retrieval
WS_ID_FILE.write_text(ws_id)
# Print ws_id (parseable format for LLM)
if retry_count == 0:
print(f"WS_ID={ws_id}", flush=True)
log(f"Connected (ws_id={ws_id})")
# Reset retry state on successful connection
retry_count = 0
backoff = INITIAL_BACKOFF
# Listen for messages
async for msg in ws.receive():
append_event(msg)
channel = msg.get("channel", msg.get("event", "unknown"))
text = msg.get("data", {}).get("text", "")
if text:
print(f"[{channel}] {text[:80]}", flush=True)
# If we exit the loop normally, connection was closed
log("Connection closed by server")
except asyncio.CancelledError:
log("Shutdown requested")
raise
except Exception as e:
except RETRYABLE_ERRORS as e:
retry_count += 1
log(f"Connection error: {e}")
@@ -165,6 +154,52 @@ async def listen_with_retry():
log(f"Reconnecting in {backoff}s (attempt {retry_count}/{MAX_RETRIES})...")
await asyncio.sleep(backoff)
backoff = min(backoff * 2, MAX_BACKOFF)
continue
OUTPUT_DIR.mkdir(parents=True, exist_ok=True, mode=0o700)
if _first_connection and CLEAR_EVENTS:
EVENTS_FILE.unlink(missing_ok=True)
log("Cleared events file")
_first_connection = False
WS_ID_FILE.write_text(ws_id)
if retry_count == 0:
print(f"WS_ID={ws_id}", flush=True)
log(f"Connected (ws_id={ws_id})")
retry_count = 0
backoff = INITIAL_BACKOFF
receiver = ws.receive().__aiter__()
while True:
try:
msg = await anext(receiver)
except StopAsyncIteration:
log("Connection closed by server")
break
except asyncio.CancelledError:
log("Shutdown requested")
raise
except RETRYABLE_ERRORS as e:
retry_count += 1
log(f"Connection error: {e}")
if retry_count >= MAX_RETRIES:
log(f"Max retries ({MAX_RETRIES}) exceeded, exiting")
return
log(f"Reconnecting in {backoff}s (attempt {retry_count}/{MAX_RETRIES})...")
await asyncio.sleep(backoff)
backoff = min(backoff * 2, MAX_BACKOFF)
break
append_event(msg)
channel = msg.get("channel", msg.get("event", "unknown"))
text = msg.get("data", {}).get("text", "")
if text:
print(f"[{channel}] {text[:80]}", flush=True)
async def main_async():
@@ -178,7 +213,8 @@ async def main_async():
# Register signal handlers
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, handle_signal)
with contextlib.suppress(NotImplementedError):
loop.add_signal_handler(sig, handle_signal)
# Run listener with cancellation support
listen_task = asyncio.create_task(listen_with_retry())
@@ -188,6 +224,9 @@ async def main_async():
[listen_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED,
)
if listen_task.done():
await listen_task
# Cancel remaining tasks
for task in pending:
@@ -196,6 +235,10 @@ async def main_async():
await task
except asyncio.CancelledError:
pass
for sig in (signal.SIGINT, signal.SIGTERM):
with contextlib.suppress(NotImplementedError):
loop.remove_signal_handler(sig)
log("Shutdown complete")