14 KiB
RTStream Reference
Code-level details for RTStream operations. For workflow guide, see rtstream.md.
Based on docs.videodb.io.
Collection RTStream Methods
Methods on Collection for managing RTStreams:
| Method | Returns | Description |
|---|---|---|
coll.connect_rtstream(url, name, ...) |
RTStream |
Create new RTStream from RTSP/RTMP URL |
coll.get_rtstream(id) |
RTStream |
Get existing RTStream by ID |
coll.list_rtstreams(limit, offset, status, name, ordering) |
List[RTStream] |
List all RTStreams in collection |
coll.search(query, namespace="rtstream") |
RTStreamSearchResult |
Search across all RTStreams |
Connect RTStream
import videodb
conn = videodb.connect()
coll = conn.get_collection()
rtstream = coll.connect_rtstream(
url="rtmp://your-stream-server/live/stream-key",
name="My Live Stream",
media_types=["video"], # or ["audio", "video"]
sample_rate=30, # optional
store=True, # enable recording storage for export
enable_transcript=True, # optional
ws_connection_id=ws_id, # optional, for real-time events
)
Get Existing RTStream
rtstream = coll.get_rtstream("rts-xxx")
List RTStreams
rtstreams = coll.list_rtstreams(
limit=10,
offset=0,
status="connected", # optional filter
name="meeting", # optional filter
ordering="-created_at",
)
for rts in rtstreams:
print(f"{rts.id}: {rts.name} - {rts.status}")
From Capture Session
After a capture session is active, retrieve RTStream objects:
session = conn.get_capture_session(session_id)
mics = session.get_rtstream("mic")
displays = session.get_rtstream("screen")
system_audios = session.get_rtstream("system_audio")
Or use the rtstreams data from the capture_session.active WebSocket event:
for rts in rtstreams:
rtstream = coll.get_rtstream(rts["rtstream_id"])
RTStream Methods
| Method | Returns | Description |
|---|---|---|
rtstream.start() |
None |
Begin ingestion |
rtstream.stop() |
None |
Stop ingestion |
rtstream.generate_stream(start, end) |
str |
Stream recorded segment (Unix timestamps) |
rtstream.export(name=None) |
RTStreamExportResult |
Export to permanent video |
rtstream.index_visuals(prompt, ...) |
RTStreamSceneIndex |
Create visual index with AI analysis |
rtstream.index_audio(prompt, ...) |
RTStreamSceneIndex |
Create audio index with LLM summarization |
rtstream.list_scene_indexes() |
List[RTStreamSceneIndex] |
List all scene indexes on the stream |
rtstream.get_scene_index(index_id) |
RTStreamSceneIndex |
Get a specific scene index |
rtstream.search(query, ...) |
RTStreamSearchResult |
Search indexed content |
rtstream.start_transcript(ws_connection_id, engine) |
dict |
Start live transcription |
rtstream.get_transcript(page, page_size, start, end, since) |
dict |
Get transcript pages |
rtstream.stop_transcript(engine) |
dict |
Stop transcription |
Starting and Stopping
# Begin ingestion
rtstream.start()
# ... stream is being recorded ...
# Stop ingestion
rtstream.stop()
Generating Streams
Use Unix timestamps (not seconds offsets) to generate a playback stream from recorded content:
import time
start_ts = time.time()
rtstream.start()
# Let it record for a while...
time.sleep(60)
end_ts = time.time()
rtstream.stop()
# Generate a stream URL for the recorded segment
stream_url = rtstream.generate_stream(start=start_ts, end=end_ts)
print(f"Recorded stream: {stream_url}")
Exporting to Video
Export the recorded stream to a permanent video in the collection:
export_result = rtstream.export(name="Meeting Recording 2024-01-15")
print(f"Video ID: {export_result.video_id}")
print(f"Stream URL: {export_result.stream_url}")
print(f"Player URL: {export_result.player_url}")
print(f"Duration: {export_result.duration}s")
RTStreamExportResult Properties
| Property | Type | Description |
|---|---|---|
video_id |
str |
ID of the exported video |
stream_url |
str |
HLS stream URL |
player_url |
str |
Web player URL |
name |
str |
Video name |
duration |
float |
Duration in seconds |
AI Pipelines
AI pipelines process live streams and send results via WebSocket.
RTStream AI Pipeline Methods
| Method | Returns | Description |
|---|---|---|
rtstream.index_audio(prompt, batch_config, ...) |
RTStreamSceneIndex |
Start audio indexing with LLM summarization |
rtstream.index_visuals(prompt, batch_config, ...) |
RTStreamSceneIndex |
Start visual indexing of screen content |
Audio Indexing
Generate LLM summaries of audio content at intervals:
audio_index = rtstream.index_audio(
prompt="Summarize what is being discussed",
batch_config={"type": "word", "value": 50},
model_name=None, # optional
name="meeting_audio", # optional
ws_connection_id=ws_id,
)
Audio batch_config options:
| Type | Value | Description |
|---|---|---|
"word" |
count | Segment every N words |
"sentence" |
count | Segment every N sentences |
"time" |
seconds | Segment every N seconds |
Examples:
{"type": "word", "value": 50} # every 50 words
{"type": "sentence", "value": 5} # every 5 sentences
{"type": "time", "value": 30} # every 30 seconds
Results arrive on the audio_index WebSocket channel.
Visual Indexing
Generate AI descriptions of visual content:
scene_index = rtstream.index_visuals(
prompt="Describe what is happening on screen",
batch_config={"type": "time", "value": 2, "frame_count": 5},
model_name="basic",
name="screen_monitor", # optional
ws_connection_id=ws_id,
)
Parameters:
| Parameter | Type | Description |
|---|---|---|
prompt |
str |
Instructions for the AI model (supports structured JSON output) |
batch_config |
dict |
Controls frame sampling (see below) |
model_name |
str |
Model tier: "mini", "basic", "pro", "ultra" |
name |
str |
Name for the index (optional) |
ws_connection_id |
str |
WebSocket connection ID for receiving results |
Visual batch_config:
| Key | Type | Description |
|---|---|---|
type |
str |
Only "time" is supported for visuals |
value |
int |
Window size in seconds |
frame_count |
int |
Number of frames to extract per window |
Example: {"type": "time", "value": 2, "frame_count": 5} samples 5 frames every 2 seconds and sends them to the model.
Structured JSON output:
Use a prompt that requests JSON format for structured responses:
scene_index = rtstream.index_visuals(
prompt="""Analyze the screen and return a JSON object with:
{
"app_name": "name of the active application",
"activity": "what the user is doing",
"ui_elements": ["list of visible UI elements"],
"contains_text": true/false,
"dominant_colors": ["list of main colors"]
}
Return only valid JSON.""",
batch_config={"type": "time", "value": 3, "frame_count": 3},
model_name="pro",
ws_connection_id=ws_id,
)
Results arrive on the scene_index WebSocket channel.
Batch Config Summary
| Indexing Type | type Options |
value |
Extra Keys |
|---|---|---|---|
| Audio | "word", "sentence", "time" |
words/sentences/seconds | - |
| Visual | "time" only |
seconds | frame_count |
Examples:
# Audio: every 50 words
{"type": "word", "value": 50}
# Audio: every 30 seconds
{"type": "time", "value": 30}
# Visual: 5 frames every 2 seconds
{"type": "time", "value": 2, "frame_count": 5}
Transcription
Real-time transcription via WebSocket:
# Start live transcription
rtstream.start_transcript(
ws_connection_id=ws_id,
engine=None, # optional, defaults to "assemblyai"
)
# Get transcript pages (with optional filters)
transcript = rtstream.get_transcript(
page=1,
page_size=100,
start=None, # optional: start timestamp filter
end=None, # optional: end timestamp filter
since=None, # optional: for polling, get transcripts after this timestamp
engine=None,
)
# Stop transcription
rtstream.stop_transcript(engine=None)
Transcript results arrive on the transcript WebSocket channel.
RTStreamSceneIndex
When you call index_audio() or index_visuals(), the method returns an RTStreamSceneIndex object. This object represents the running index and provides methods for managing scenes and alerts.
# index_visuals returns an RTStreamSceneIndex
scene_index = rtstream.index_visuals(
prompt="Describe what is on screen",
ws_connection_id=ws_id,
)
# index_audio also returns an RTStreamSceneIndex
audio_index = rtstream.index_audio(
prompt="Summarize the discussion",
ws_connection_id=ws_id,
)
RTStreamSceneIndex Properties
| Property | Type | Description |
|---|---|---|
rtstream_index_id |
str |
Unique ID of the index |
rtstream_id |
str |
ID of the parent RTStream |
extraction_type |
str |
Type of extraction (time or transcript) |
extraction_config |
dict |
Extraction configuration |
prompt |
str |
The prompt used for analysis |
name |
str |
Name of the index |
status |
str |
Status (connected, stopped) |
RTStreamSceneIndex Methods
| Method | Returns | Description |
|---|---|---|
index.get_scenes(start, end, page, page_size) |
dict |
Get indexed scenes |
index.start() |
None |
Start/resume the index |
index.stop() |
None |
Stop the index |
index.create_alert(event_id, callback_url, ws_connection_id) |
str |
Create alert for event detection |
index.list_alerts() |
list |
List all alerts on this index |
index.enable_alert(alert_id) |
None |
Enable an alert |
index.disable_alert(alert_id) |
None |
Disable an alert |
Getting Scenes
Poll indexed scenes from the index:
result = scene_index.get_scenes(
start=None, # optional: start timestamp
end=None, # optional: end timestamp
page=1,
page_size=100,
)
for scene in result["scenes"]:
print(f"[{scene['start']}-{scene['end']}] {scene['text']}")
if result["next_page"]:
# fetch next page
pass
Managing Scene Indexes
# List all indexes on the stream
indexes = rtstream.list_scene_indexes()
# Get a specific index by ID
scene_index = rtstream.get_scene_index(index_id)
# Stop an index
scene_index.stop()
# Restart an index
scene_index.start()
Events
Events are reusable detection rules. Create them once, attach to any index via alerts.
Connection Event Methods
| Method | Returns | Description |
|---|---|---|
conn.create_event(event_prompt, label) |
str (event_id) |
Create detection event |
conn.list_events() |
list |
List all events |
Creating an Event
event_id = conn.create_event(
event_prompt="User opened Slack application",
label="slack_opened",
)
Listing Events
events = conn.list_events()
for event in events:
print(f"{event['event_id']}: {event['label']}")
Alerts
Alerts wire events to indexes for real-time notifications. When the AI detects content matching the event description, an alert is sent.
Creating an Alert
# Get the RTStreamSceneIndex from index_visuals
scene_index = rtstream.index_visuals(
prompt="Describe what application is open on screen",
ws_connection_id=ws_id,
)
# Create an alert on the index
alert_id = scene_index.create_alert(
event_id=event_id,
callback_url="https://your-backend.com/alerts", # for webhook delivery
ws_connection_id=ws_id, # for WebSocket delivery (optional)
)
Note: callback_url is required. Pass an empty string "" if only using WebSocket delivery.
Managing Alerts
# List all alerts on an index
alerts = scene_index.list_alerts()
# Enable/disable alerts
scene_index.disable_alert(alert_id)
scene_index.enable_alert(alert_id)
Alert Delivery
| Method | Latency | Use Case |
|---|---|---|
| WebSocket | Real-time | Dashboards, live UI |
| Webhook | < 1 second | Server-to-server, automation |
WebSocket Alert Event
{
"channel": "alert",
"rtstream_id": "rts-xxx",
"data": {
"event_label": "slack_opened",
"timestamp": 1710000012340,
"text": "User opened Slack application"
}
}
Webhook Payload
{
"event_id": "event-xxx",
"label": "slack_opened",
"confidence": 0.95,
"explanation": "User opened the Slack application",
"timestamp": "2024-01-15T10:30:45Z",
"start_time": 1234.5,
"end_time": 1238.0,
"stream_url": "https://stream.videodb.io/v3/...",
"player_url": "https://console.videodb.io/player?url=..."
}
WebSocket Integration
All real-time AI results are delivered via WebSocket. Pass ws_connection_id to:
rtstream.start_transcript()rtstream.index_audio()rtstream.index_visuals()scene_index.create_alert()
WebSocket Channels
| Channel | Source | Content |
|---|---|---|
transcript |
start_transcript() |
Real-time speech-to-text |
scene_index |
index_visuals() |
Visual analysis results |
audio_index |
index_audio() |
Audio analysis results |
alert |
create_alert() |
Alert notifications |
For WebSocket event structures and ws_listener usage, see capture-reference.md.
Complete Workflow
import time
import videodb
conn = videodb.connect()
coll = conn.get_collection()
# 1. Connect and start recording
rtstream = coll.connect_rtstream(
url="rtmp://your-stream-server/live/stream-key",
name="Weekly Standup",
)
rtstream.start()
# 2. Record for the duration of the meeting
start_ts = time.time()
time.sleep(1800) # 30 minutes
end_ts = time.time()
rtstream.stop()
# 3. Export to a permanent video
export_result = rtstream.export(name="Weekly Standup Recording")
print(f"Exported video: {export_result.video_id}")
# 4. Index the exported video for search
video = coll.get_video(export_result.video_id)
video.index_spoken_words(force=True)
# 5. Search for action items
results = video.search("action items and next steps")
stream_url = results.compile()
print(f"Action items clip: {stream_url}")