Use this file to discover all available pages before exploring further.
WMA is experimental and the public API may change. First-party client libraries for the browser and native platforms are not yet released.
WMA is fal’s interface for running interactive world models over a bidirectional WebRTC stream. A runner on fal accepts a WebRTC session, produces video frames, and ships them back to a browser or native client while the session stays open. fal hosts a bridge at wma.fal.run that the client first talks to, but the bridge is only used to establish the connection. Once signaling is done, media flows peer-to-peer between the runner and the client.There are two ways to build a WMA app depending on how much of the transport layer you want to own:
fal.RealtimeApp is the high-level abstraction. You handle frames in Python and fal handles the WebRTC plumbing, session lifecycle, and batching helpers.
fal.App with a /start-session endpoint is the raw path. You pick your own WebRTC library and run the SDP exchange yourself. The WMA bridge POSTs the client’s offer to this endpoint and streams the response back, holding the HTTP connection open for the full lifetime of the session.
fal.RealtimeApp is the fastest way to get a world model running. You define an on_connect handler, attach a track to the session, and let fal manage the rest.
on_connect is called once per incoming session. It gives you two objects:
event_handler registers track and data-channel callbacks, and attaches outbound tracks back to the peer.
session_params is a mutable dict shared with the client for the duration of the session. See Session parameters below.
Register track callbacks on event_handler to react to the media the client publishes (for example, the browser webcam). Call event_handler.add_track(...) to push a track back to the peer.
BatchedFnTrack is a custom track that buffers frames from a source track, groups them by batch_size, and runs your inference function on each batch. The function receives the batch and returns a numpy array or a Pillow image per frame, which WMA then encodes back into the outbound track.
BatchedFnTrack( source_track, # the inbound track (e.g. the browser webcam) batch_size=4, # number of frames to group before running fn fn=run_inference, # callable over the batched frames)
Use it when your model is cheaper to run in batches, or when you want the inference cadence decoupled from the inbound frame rate.
session_params is a dynamic dict that mutates over the lifetime of a session. When the client sends a payload like {"prompt": "..."} over the data channel, the matching key on session_params updates in place. Your inference function can read the latest value on every batch without wiring up a separate queue.Type it with TypedDict to document the fields your app consumes:
class SessionParams(TypedDict, total=False): prompt: str guidance_scale: float
Any field that is not sent by the client is simply absent, so treat session_params as partial and provide defaults at the call site.
If you want to use a specific WebRTC library, own your media pipeline, or drop WMA into an existing fal.App, skip fal.RealtimeApp and expose a /start-session endpoint. WMA treats this endpoint as a streaming endpoint: the first SSE event you yield is your SDP answer, and the HTTP response stays open for the entire session. When your peer connection closes or the client drops, the generator exits and everything tears down together.This ties the session lifetime directly to the HTTP request lifetime. You don’t track sessions in a dict, you don’t manage heartbeats, and any cleanup you put in a finally block runs when the session ends, whether the peer closed cleanly or the client disconnected.
import asyncioimport jsonimport falfrom pydantic import BaseModelfrom fastapi.responses import StreamingResponseclass OfferRequest(BaseModel): sdp: str type: str session_id: strclass GrayscaleTrack: """Wraps a video track and converts each frame to grayscale.""" kind = "video" def __init__(self, track): self._track = track self.id = track.id async def recv(self): import av frame = await self._track.recv() img = frame.to_ndarray(format="bgr24") import numpy as np gray = np.mean(img, axis=2, keepdims=True).astype(np.uint8) img_gray = np.broadcast_to(gray, img.shape).copy() new_frame = av.VideoFrame.from_ndarray(img_gray, format="bgr24") new_frame.pts = frame.pts new_frame.time_base = frame.time_base return new_frame def stop(self): self._track.stop()class GrayscaleDemo(fal.App, name="grayscale-demo"): requirements = ["aiortc", "numpy"] max_multiplexing = 10 keepalive = 3600 machine_type = "M" @fal.endpoint("/start-session") async def start_session(self, request: OfferRequest) -> StreamingResponse: from aiortc import RTCPeerConnection, RTCSessionDescription pc = RTCPeerConnection() dc = pc.createDataChannel("ping") @dc.on("message") def on_dc_message(message): d = json.loads(message) if d.get("type") == "ping": dc.send(json.dumps({"type": "pong", "client_ts": d["ts"]})) @pc.on("track") def on_track(track): if track.kind == "video": pc.addTrack(GrayscaleTrack(track)) await pc.setRemoteDescription( RTCSessionDescription(sdp=request.sdp, type=request.type) ) answer = await pc.createAnswer() await pc.setLocalDescription(answer) closed = asyncio.Event() @pc.on("connectionstatechange") def _on_state_change(): if pc.connectionState in ("closed", "failed", "disconnected"): closed.set() async def event_stream(): try: # First event: hand the SDP answer back to the client. yield "data: " + json.dumps({ "sdp": pc.localDescription.sdp, "type": pc.localDescription.type, "session_id": request.session_id, }) + "\n\n" # Hold the request open while the peer connection is alive. # Emit an SSE comment every 15s so intermediaries don't # time the connection out. while not closed.is_set(): try: await asyncio.wait_for(closed.wait(), timeout=15) except asyncio.TimeoutError: yield ": keepalive\n\n" finally: await pc.close() return StreamingResponse(event_stream(), media_type="text/event-stream")
This path is for power users. You get full control over the ICE/SDP handshake, transceivers, codecs, and any custom data-channel protocol. The PeerConnection stays alive in the generator’s local scope, so it is not garbage-collected while the stream is active. Put any teardown logic in the finally block so it runs whether the session ends naturally or the client drops mid-stream.
First-party WMA client libraries for the browser and native platforms are not yet released. Until they ship, you can talk to the bridge directly from any WebRTC-capable client. The bridge lives at wma.fal.run and exposes two endpoints.
Creates a new WebRTC session. Send the SDP offer from your local RTCPeerConnection along with the app_id to route to. The bridge forwards the offer to a runner, collects the SDP answer, and returns it as JSON.Wait for ICE gathering to finish before sending the offer. The bridge does not support trickle ICE, so the offer must contain all ICE candidates.Request headers:
Authorization: Key <your-fal-key>
Content-Type: application/json
Request body:
{ "app_id": "your-username/your-app", "sdp": "<offer SDP from RTCPeerConnection.createOffer()>", "type": "offer"}
Apply the returned SDP as the remote description on your RTCPeerConnection and let ICE negotiate. Once the connection is established, media flows peer-to-peer between your client and the runner.
Keeps the session alive. The bridge expects a heartbeat at regular intervals. If heartbeats stop arriving, the bridge tears the session down: the runner’s /start-session generator exits and its finally block runs.Request headers:
Authorization: Key <your-fal-key>
Content-Type: application/json
Request body:
{ "session_id": "<session_id from /session>"}
Send heartbeats every 5 seconds. To end a session, stop sending heartbeats and close the peer connection.
WMA sessions are not resumable. If the peer connection drops (ICE failure, network change, runner restart), start over: close the old RTCPeerConnection, stop any heartbeat timers, build a new peer connection, generate a fresh offer, and call /session again. Each call to /session spins up a new session on the runner.
The snippet below shows the full client flow in a browser. It connects the camera to a WMA app, receives the processed video back, and keeps the session alive with heartbeats.
const FAL_KEY = "your-fal-key";const APP_ID = "your-username/your-app";const WMA_URL = "https://wma.fal.run";// 1. Create a peer connection and add the camera.const pc = new RTCPeerConnection({ iceServers: [{ urls: "stun:stun.l.google.com:19302" }],});const stream = await navigator.mediaDevices.getUserMedia({ video: true });stream.getTracks().forEach((track) => pc.addTrack(track, stream));// Receive the processed video from the server.pc.ontrack = (ev) => { remoteVideo.srcObject = ev.streams[0] ?? new MediaStream([ev.track]);};// The server may open a data channel for application-level messages// (e.g. ping/pong latency probes or session parameter updates).pc.ondatachannel = (ev) => { const dc = ev.channel; dc.onmessage = (msg) => { const data = JSON.parse(msg.data); // handle messages from the server };};// 2. Create the offer and wait for ICE candidates to be gathered.const offer = await pc.createOffer();await pc.setLocalDescription(offer);await new Promise((resolve) => { if (pc.iceGatheringState === "complete") return resolve(); const timeout = setTimeout(resolve, 5000); pc.onicegatheringstatechange = () => { if (pc.iceGatheringState === "complete") { clearTimeout(timeout); resolve(); } };});// 3. Send the offer to the WMA bridge and apply the answer.const resp = await fetch(WMA_URL + "/session", { method: "POST", headers: { "Authorization": "Key " + FAL_KEY, "Content-Type": "application/json", }, body: JSON.stringify({ app_id: APP_ID, sdp: pc.localDescription.sdp, type: pc.localDescription.type, }),});const { session_id, sdp, type } = await resp.json();await pc.setRemoteDescription(new RTCSessionDescription({ sdp, type }));// 4. Keep the session alive with heartbeats.const heartbeat = setInterval(() => fetch(WMA_URL + "/session/heartbeat", { method: "POST", headers: { "Authorization": "Key " + FAL_KEY, "Content-Type": "application/json", }, body: JSON.stringify({ session_id }), }), 5000);// 5. Clean up when the connection ends.function cleanup() { clearInterval(heartbeat); pc.close(); stream.getTracks().forEach((t) => t.stop());}pc.onconnectionstatechange = () => { if (pc.connectionState === "failed" || pc.connectionState === "closed") { cleanup(); }};