docs: harden videodb skill examples

This commit is contained in:
Affaan Mustafa
2026-03-10 21:03:32 -07:00
parent 9dfe149310
commit b8ab34e362
7 changed files with 173 additions and 61 deletions

View File

@@ -328,7 +328,18 @@ Use `ws_listener.py` to capture WebSocket events during recording sessions. Desk
```python ```python
import json import json
events = [json.loads(l) for l in open("/tmp/videodb_events.jsonl")] from pathlib import Path
events_file = Path("/tmp/videodb_events.jsonl")
events = []
if events_file.exists():
with events_file.open(encoding="utf-8") as handle:
for line in handle:
try:
events.append(json.loads(line))
except json.JSONDecodeError:
continue
# Get all transcripts # Get all transcripts
transcripts = [e["data"]["text"] for e in events if e.get("channel") == "transcript"] transcripts = [e["data"]["text"] for e in events if e.get("channel") == "transcript"]
@@ -361,8 +372,9 @@ For complete capture workflow, see [reference/capture.md](reference/capture.md).
| Need to combine/trim clips | `VideoAsset` on a `Timeline` | | Need to combine/trim clips | `VideoAsset` on a `Timeline` |
| Need to generate voiceover, music, or SFX | `coll.generate_voice()`, `generate_music()`, `generate_sound_effect()` | | Need to generate voiceover, music, or SFX | `coll.generate_voice()`, `generate_music()`, `generate_sound_effect()` |
## Repository ## Provenance
https://github.com/video-db/skills Reference material for this skill is vendored locally under `skills/videodb/reference/`.
Use the local copies above instead of following external repository links at runtime.
**Maintained By:** [VideoDB](https://github.com/video-db) **Maintained By:** [VideoDB](https://www.videodb.io/)

View File

@@ -168,8 +168,8 @@ kill $(cat /tmp/videodb_ws_pid)
Each line is a JSON object with added timestamps: Each line is a JSON object with added timestamps:
```json ```json
{"ts": "2026-03-02T10:15:30.123Z", "unix_ts": 1709374530.12, "channel": "visual_index", "data": {"text": "..."}} {"ts": "2026-03-02T10:15:30.123Z", "unix_ts": 1772446530.123, "channel": "visual_index", "data": {"text": "..."}}
{"ts": "2026-03-02T10:15:31.456Z", "unix_ts": 1709374531.45, "event": "capture_session.active", "capture_session_id": "cap-xxx"} {"ts": "2026-03-02T10:15:31.456Z", "unix_ts": 1772446531.456, "event": "capture_session.active", "capture_session_id": "cap-xxx"}
``` ```
### Reading Events ### Reading Events
@@ -365,10 +365,17 @@ For RTStream methods (indexing, transcription, alerts, batch config), see [rtstr
└───────┬───────┘ └───────┬───────┘
│ client.start_capture_session() │ client.start_capture_session()
v v
┌───────────────┐ WebSocket: capture_session.starting
│ starting │ ──> Capture channels connect
└───────┬───────┘
v
┌───────────────┐ WebSocket: capture_session.active ┌───────────────┐ WebSocket: capture_session.active
│ active │ ──> Start AI pipelines │ active │ ──> Start AI pipelines
└───────┬─────── └───────┬──────────────┐
client.stop_capture()
│ └──────────────┐
│ client.stop_capture() │ unrecoverable capture error
v v
┌───────────────┐ WebSocket: capture_session.stopping ┌───────────────┐ WebSocket: capture_session.stopping
│ stopping │ ──> Finalize streams │ stopping │ ──> Finalize streams
@@ -383,4 +390,8 @@ For RTStream methods (indexing, transcription, alerts, batch config), see [rtstr
┌───────────────┐ WebSocket: capture_session.exported ┌───────────────┐ WebSocket: capture_session.exported
│ exported │ ──> Access video_id, stream_url, player_url │ exported │ ──> Access video_id, stream_url, player_url
└───────────────┘ └───────────────┘
┌───────────────┐ WebSocket: capture_session.failed
│ failed │ ──> Inspect error payload and retry setup
└───────────────┘
``` ```

View File

@@ -313,7 +313,7 @@ stream_url = timeline.generate_stream()
print(f"Highlight reel: {stream_url}") print(f"Highlight reel: {stream_url}")
``` ```
### Picture-in-Picture with Background Music ### Logo Overlay with Background Music
```python ```python
import videodb import videodb
@@ -365,6 +365,7 @@ clips = [
] ]
timeline = Timeline(conn) timeline = Timeline(conn)
timeline_offset = 0.0
for clip in clips: for clip in clips:
# Add a label as an overlay on each clip # Add a label as an overlay on each clip
@@ -376,7 +377,8 @@ for clip in clips:
timeline.add_inline( timeline.add_inline(
VideoAsset(asset_id=clip["video_id"], start=clip["start"], end=clip["end"]) VideoAsset(asset_id=clip["video_id"], start=clip["start"], end=clip["end"])
) )
timeline.add_overlay(0, label) timeline.add_overlay(timeline_offset, label)
timeline_offset += clip["end"] - clip["start"]
stream_url = timeline.generate_stream() stream_url = timeline.generate_stream()
print(f"Montage: {stream_url}") print(f"Montage: {stream_url}")

View File

@@ -59,7 +59,7 @@ video.play()
| Parameter | Type | Default | Description | | Parameter | Type | Default | Description |
|-----------|------|---------|-------------| |-----------|------|---------|-------------|
| `prompt` | `str` | required | Text description of the video to generate | | `prompt` | `str` | required | Text description of the video to generate |
| `duration` | `float` | `5` | Duration in seconds (must be integer value, 5-8) | | `duration` | `int` | `5` | Duration in seconds (must be integer value, 5-8) |
| `callback_url` | `str\|None` | `None` | URL to receive async callback | | `callback_url` | `str\|None` | `None` | URL to receive async callback |
Returns a `Video` object. Generated videos are automatically added to the collection and can be used in timelines, searches, and compilations like any uploaded video. Returns a `Video` object. Generated videos are automatically added to the collection and can be used in timelines, searches, and compilations like any uploaded video.

View File

@@ -519,6 +519,7 @@ For WebSocket event structures and ws_listener usage, see [capture-reference.md]
```python ```python
import time import time
import videodb import videodb
from videodb.exceptions import InvalidRequestError
conn = videodb.connect() conn = videodb.connect()
coll = conn.get_collection() coll = conn.get_collection()
@@ -527,6 +528,7 @@ coll = conn.get_collection()
rtstream = coll.connect_rtstream( rtstream = coll.connect_rtstream(
url="rtmp://your-stream-server/live/stream-key", url="rtmp://your-stream-server/live/stream-key",
name="Weekly Standup", name="Weekly Standup",
store=True,
) )
rtstream.start() rtstream.start()
@@ -536,6 +538,10 @@ time.sleep(1800) # 30 minutes
end_ts = time.time() end_ts = time.time()
rtstream.stop() rtstream.stop()
# Generate an immediate playback URL for the captured window
stream_url = rtstream.generate_stream(start=start_ts, end=end_ts)
print(f"Recorded stream: {stream_url}")
# 3. Export to a permanent video # 3. Export to a permanent video
export_result = rtstream.export(name="Weekly Standup Recording") export_result = rtstream.export(name="Weekly Standup Recording")
print(f"Exported video: {export_result.video_id}") print(f"Exported video: {export_result.video_id}")
@@ -545,7 +551,13 @@ video = coll.get_video(export_result.video_id)
video.index_spoken_words(force=True) video.index_spoken_words(force=True)
# 5. Search for action items # 5. Search for action items
results = video.search("action items and next steps") try:
stream_url = results.compile() results = video.search("action items and next steps")
print(f"Action items clip: {stream_url}") stream_url = results.compile()
print(f"Action items clip: {stream_url}")
except InvalidRequestError as exc:
if "No results found" in str(exc):
print("No action items were detected in the recording.")
else:
raise
``` ```

View File

@@ -108,26 +108,40 @@ Compile search results into a single stream of all matching segments:
```python ```python
from videodb import SearchType from videodb import SearchType
from videodb.exceptions import InvalidRequestError
video.index_spoken_words(force=True) video.index_spoken_words(force=True)
results = video.search("key announcement", search_type=SearchType.semantic) try:
results = video.search("key announcement", search_type=SearchType.semantic)
# Compile all matching shots into one stream # Compile all matching shots into one stream
stream_url = results.compile() stream_url = results.compile()
print(f"Search results stream: {stream_url}") print(f"Search results stream: {stream_url}")
# Or play directly # Or play directly
results.play() results.play()
except InvalidRequestError as exc:
if "No results found" in str(exc):
print("No matching announcement segments were found.")
else:
raise
``` ```
### Stream Individual Search Hits ### Stream Individual Search Hits
```python ```python
results = video.search("product demo", search_type=SearchType.semantic) from videodb.exceptions import InvalidRequestError
for i, shot in enumerate(results.get_shots()): try:
stream_url = shot.generate_stream() results = video.search("product demo", search_type=SearchType.semantic)
print(f"Hit {i+1} [{shot.start:.1f}s-{shot.end:.1f}s]: {stream_url}") for i, shot in enumerate(results.get_shots()):
stream_url = shot.generate_stream()
print(f"Hit {i+1} [{shot.start:.1f}s-{shot.end:.1f}s]: {stream_url}")
except InvalidRequestError as exc:
if "No results found" in str(exc):
print("No product demo segments matched the query.")
else:
raise
``` ```
## Audio Playback ## Audio Playback
@@ -149,6 +163,7 @@ Combine search, timeline composition, and streaming in one workflow:
```python ```python
import videodb import videodb
from videodb import SearchType from videodb import SearchType
from videodb.exceptions import InvalidRequestError
from videodb.timeline import Timeline from videodb.timeline import Timeline
from videodb.asset import VideoAsset, TextAsset, TextStyle from videodb.asset import VideoAsset, TextAsset, TextStyle
@@ -161,22 +176,34 @@ video.index_spoken_words(force=True)
# Search for key moments # Search for key moments
queries = ["introduction", "main demo", "Q&A"] queries = ["introduction", "main demo", "Q&A"]
timeline = Timeline(conn) timeline = Timeline(conn)
timeline_offset = 0.0
for query in queries: for query in queries:
# Find matching segments try:
results = video.search(query, search_type=SearchType.semantic) results = video.search(query, search_type=SearchType.semantic)
for shot in results.get_shots(): shots = results.get_shots()
timeline.add_inline( except InvalidRequestError as exc:
VideoAsset(asset_id=shot.video_id, start=shot.start, end=shot.end) if "No results found" in str(exc):
) shots = []
else:
raise
# Add section label as overlay on the first shot if not shots:
timeline.add_overlay(0, TextAsset( continue
# Add the section label where this batch starts in the compiled timeline
timeline.add_overlay(timeline_offset, TextAsset(
text=query.title(), text=query.title(),
duration=2, duration=2,
style=TextStyle(fontsize=36, fontcolor="white", boxcolor="#222222"), style=TextStyle(fontsize=36, fontcolor="white", boxcolor="#222222"),
)) ))
for shot in shots:
timeline.add_inline(
VideoAsset(asset_id=shot.video_id, start=shot.start, end=shot.end)
)
timeline_offset += shot.end - shot.start
stream_url = timeline.generate_stream() stream_url = timeline.generate_stream()
print(f"Dynamic compilation: {stream_url}") print(f"Dynamic compilation: {stream_url}")
``` ```
@@ -216,6 +243,7 @@ Build a stream dynamically based on search availability:
```python ```python
import videodb import videodb
from videodb import SearchType from videodb import SearchType
from videodb.exceptions import InvalidRequestError
from videodb.timeline import Timeline from videodb.timeline import Timeline
from videodb.asset import VideoAsset, TextAsset, TextStyle from videodb.asset import VideoAsset, TextAsset, TextStyle
@@ -231,21 +259,29 @@ timeline = Timeline(conn)
topics = ["opening remarks", "technical deep dive", "closing"] topics = ["opening remarks", "technical deep dive", "closing"]
found_any = False found_any = False
timeline_offset = 0.0
for topic in topics: for topic in topics:
results = video.search(topic, search_type=SearchType.semantic) try:
shots = results.get_shots() results = video.search(topic, search_type=SearchType.semantic)
shots = results.get_shots()
except InvalidRequestError as exc:
if "No results found" in str(exc):
shots = []
else:
raise
if shots: if shots:
found_any = True found_any = True
for shot in shots: timeline.add_overlay(timeline_offset, TextAsset(
timeline.add_inline(
VideoAsset(asset_id=shot.video_id, start=shot.start, end=shot.end)
)
# Add a label overlay for the section
timeline.add_overlay(0, TextAsset(
text=topic.title(), text=topic.title(),
duration=2, duration=2,
style=TextStyle(fontsize=32, fontcolor="white", boxcolor="#1a1a2e"), style=TextStyle(fontsize=32, fontcolor="white", boxcolor="#1a1a2e"),
)) ))
for shot in shots:
timeline.add_inline(
VideoAsset(asset_id=shot.video_id, start=shot.start, end=shot.end)
)
timeline_offset += shot.end - shot.start
if found_any: if found_any:
stream_url = timeline.generate_stream() stream_url = timeline.generate_stream()
@@ -263,6 +299,7 @@ Process an event recording into a streamable recap with multiple sections:
```python ```python
import videodb import videodb
from videodb import SearchType from videodb import SearchType
from videodb.exceptions import InvalidRequestError
from videodb.timeline import Timeline from videodb.timeline import Timeline
from videodb.asset import VideoAsset, AudioAsset, ImageAsset, TextAsset, TextStyle from videodb.asset import VideoAsset, AudioAsset, ImageAsset, TextAsset, TextStyle
@@ -287,33 +324,63 @@ title_img = coll.generate_image(
# Build the recap timeline # Build the recap timeline
timeline = Timeline(conn) timeline = Timeline(conn)
timeline_offset = 0.0
# Main video segments from search # Main video segments from search
keynote = event.search("keynote announcement", search_type=SearchType.semantic) try:
if keynote.get_shots(): keynote = event.search("keynote announcement", search_type=SearchType.semantic)
for shot in keynote.get_shots()[:5]: keynote_shots = keynote.get_shots()[:5]
except InvalidRequestError as exc:
if "No results found" in str(exc):
keynote_shots = []
else:
raise
if keynote_shots:
keynote_start = timeline_offset
for shot in keynote_shots:
timeline.add_inline( timeline.add_inline(
VideoAsset(asset_id=shot.video_id, start=shot.start, end=shot.end) VideoAsset(asset_id=shot.video_id, start=shot.start, end=shot.end)
) )
timeline_offset += shot.end - shot.start
else:
keynote_start = None
demo = event.search("product demo", search_type=SearchType.semantic) try:
if demo.get_shots(): demo = event.search("product demo", search_type=SearchType.semantic)
for shot in demo.get_shots()[:5]: demo_shots = demo.get_shots()[:5]
except InvalidRequestError as exc:
if "No results found" in str(exc):
demo_shots = []
else:
raise
if demo_shots:
demo_start = timeline_offset
for shot in demo_shots:
timeline.add_inline( timeline.add_inline(
VideoAsset(asset_id=shot.video_id, start=shot.start, end=shot.end) VideoAsset(asset_id=shot.video_id, start=shot.start, end=shot.end)
) )
timeline_offset += shot.end - shot.start
else:
demo_start = None
# Overlay title card image # Overlay title card image
timeline.add_overlay(0, ImageAsset( timeline.add_overlay(0, ImageAsset(
asset_id=title_img.id, width=100, height=100, x=80, y=20, duration=5 asset_id=title_img.id, width=100, height=100, x=80, y=20, duration=5
)) ))
# Overlay section labels # Overlay section labels at the correct timeline offsets
timeline.add_overlay(5, TextAsset( if keynote_start is not None:
text="Keynote Highlights", timeline.add_overlay(max(5, keynote_start), TextAsset(
duration=3, text="Keynote Highlights",
style=TextStyle(fontsize=40, fontcolor="white", boxcolor="#0d1117"), duration=3,
)) style=TextStyle(fontsize=40, fontcolor="white", boxcolor="#0d1117"),
))
if demo_start is not None:
timeline.add_overlay(max(5, demo_start), TextAsset(
text="Demo Highlights",
duration=3,
style=TextStyle(fontsize=36, fontcolor="white", boxcolor="#0d1117"),
))
# Overlay background music # Overlay background music
timeline.add_overlay(0, AudioAsset( timeline.add_overlay(0, AudioAsset(

View File

@@ -30,6 +30,7 @@ import sys
import json import json
import signal import signal
import asyncio import asyncio
import logging
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
@@ -43,10 +44,17 @@ MAX_RETRIES = 10
INITIAL_BACKOFF = 1 # seconds INITIAL_BACKOFF = 1 # seconds
MAX_BACKOFF = 60 # seconds MAX_BACKOFF = 60 # seconds
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(message)s",
datefmt="%H:%M:%S",
)
LOGGER = logging.getLogger(__name__)
# Parse arguments # Parse arguments
def parse_args(): def parse_args() -> tuple[bool, Path]:
clear = False clear = False
output_dir = None output_dir: str | None = None
args = sys.argv[1:] args = sys.argv[1:]
for arg in args: for arg in args:
@@ -71,15 +79,15 @@ _first_connection = True
def log(msg: str): def log(msg: str):
"""Log with timestamp.""" """Log with timestamp."""
ts = datetime.now().strftime("%H:%M:%S") LOGGER.info(msg)
print(f"[{ts}] {msg}", flush=True)
def append_event(event: dict): def append_event(event: dict):
"""Append event to JSONL file with timestamps.""" """Append event to JSONL file with timestamps."""
event["ts"] = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc)
event["unix_ts"] = datetime.now(timezone.utc).timestamp() event["ts"] = now.isoformat()
with open(EVENTS_FILE, "a") as f: event["unix_ts"] = now.timestamp()
with EVENTS_FILE.open("a", encoding="utf-8") as f:
f.write(json.dumps(event) + "\n") f.write(json.dumps(event) + "\n")
@@ -93,8 +101,8 @@ def cleanup_pid():
"""Remove PID file on exit.""" """Remove PID file on exit."""
try: try:
PID_FILE.unlink(missing_ok=True) PID_FILE.unlink(missing_ok=True)
except Exception: except OSError as exc:
pass LOGGER.debug("Failed to remove PID file %s: %s", PID_FILE, exc)
async def listen_with_retry(): async def listen_with_retry():