Documentation Index Fetch the complete documentation index at: https://fal.ai/docs/llms.txt
Use this file to discover all available pages before exploring further.
from fal.distributed import DistributedRunner, DistributedWorker
Classes
DistributedRunner
class fal.distributed.DistributedRunner
A class to launch and manage distributed workers.
Name Type Default Description worker_clstype[DistributedWorker]\<class 'fal.distributed.worker.DistributedWorker'\>- world_sizeint1- master_addrstr'127.0.0.1'- master_portint29500- worker_addrstr'127.0.0.1'- worker_portint54923- timeoutint86400- keepalive_payloaddict[str, Any]\{\}- keepalive_intervalint | float | NoneTypeNone- cwdstr | Path | NoneTypeNone- set_deviceOptional[bool]None-
Name Type Default Description zmq_socketOptional[Socket[Any]]- - contextOptional[mp.ProcessContext]- - keepalive_timerOptional[KeepAliveTimer]- -
close_zmq_socket def close_zmq_socket ( self ) -> 'None'
Closes the ZeroMQ socket. Returns: NoneTypeensure_alive def ensure_alive ( self ) -> 'None'
Ensures that the distributed worker processes are alive. If the processes are not alive, it raises an error. Returns: NoneTypegather_errors def gather_errors ( self ) -> 'list[Exception]'
Gathers errors from the distributed worker processes. This method should be called to collect any errors that occurred
during execution. Returns: list[Exception]get_zmq_socket def get_zmq_socket ( self ) -> 'Socket[Any]'
Returns a ZeroMQ socket of the specified type. Returns: A ZeroMQ socket.invoke async def invoke ( self , payload : 'dict[str, Any]' = {}, timeout : 'Optional[int]' = None ) -> 'Any'
Invokes the distributed worker with the given payload. Parameter Type Default Description payloaddict[str, Any]\{\}The payload to send to the worker. timeoutOptional[int]NoneThe timeout for the overall operation.
Returns: Anyis_alive def is_alive ( self ) -> 'bool'
Check if the distributed worker processes are alive. Returns: boolkeepalive def keepalive ( self , timeout : 'Optional[Union[int, float]]' = 60.0 ) -> 'None'
Sends the keepalive payload to the worker. Parameter Type Default Description timeoutint | float | NoneType60.0-
Returns: NoneTypemaybe_cancel_keepalive def maybe_cancel_keepalive ( self ) -> 'None'
Cancels the keepalive timer if it is set. Returns: NoneTypemaybe_reset_keepalive def maybe_reset_keepalive ( self ) -> 'None'
Resets the keepalive timer if it is set. Returns: NoneTypemaybe_start_keepalive def maybe_start_keepalive ( self ) -> 'None'
Starts the keepalive timer if it is set. Returns: NoneTyperun def run ( self , ** kwargs : 'Any' ) -> 'None'
The main function to run the distributed worker. This function is called by each worker process spawned by
torch.multiprocessing.spawn. This method must be synchronous. Parameter Type Default Description kwargsAny- The arguments to pass to the worker.
Returns: NoneTypestart async def start ( self , timeout : 'int' = 1800 , ** kwargs : 'Any' ) -> 'None'
Starts the distributed worker processes. Parameter Type Default Description timeoutint1800The timeout for the distributed processes. kwargsAny- -
Returns: NoneTypestop async def stop ( self , timeout : 'int' = 10 ) -> 'None'
Stops the distributed worker processes. Parameter Type Default Description timeoutint10The timeout for the distributed processes to stop.
Returns: NoneTypestream def stream ( self , payload : 'dict[str, Any]' = {}, timeout : 'Optional[int]' = None , streaming_timeout : 'Optional[int]' = None , as_text_events : 'bool' = False ) -> 'AsyncIterator[Any]'
Streams the result from the distributed worker. Parameter Type Default Description payloaddict[str, Any]\{\}The payload to send to the worker. timeoutOptional[int]NoneThe timeout for the overall operation. streaming_timeoutOptional[int]NoneThe timeout in-between streamed results. as_text_eventsboolFalseWhether to yield results as text events.
Returns: AsyncIterator[Any]terminate def terminate ( self , timeout : 'Union[int, float]' = 10 ) -> 'None'
Terminates the distributed worker processes. This method should be called to clean up the worker processes. Parameter Type Default Description timeoutint | float10-
Returns: NoneType
DistributedWorker
class fal.distributed.DistributedWorker
A base class for distributed workers.
Name Type Default Description rankint0- world_sizeint1-
Name Type Default Description queuequeue.Queue[bytes]- - loopasyncio.AbstractEventLoop- - threadthreading.Thread- -
Name Type Description device- The device for the current worker runningboolWhether the event loop is running
add_streaming_error def add_streaming_error ( self , error : 'Exception' ) -> 'None'
Add an error to the queue. Parameter Type Default Description errorException- The error to add to the queue.
Returns: NoneTypeadd_streaming_result def add_streaming_result ( self , result : 'Any' , image_format : 'str' = 'jpeg' , as_text_event : 'bool' = False ) -> 'None'
Add a streaming result to the queue. Parameter Type Default Description resultAny- The result to add to the queue. image_formatstr'jpeg'- as_text_eventboolFalse-
Returns: NoneTypeinitialize def initialize ( self , ** kwargs : 'Any' ) -> 'None'
Initialize the worker. Parameter Type Default Description kwargsAny- -
Returns: NoneTyperank_print def rank_print ( self , message : 'str' , debug : 'bool' = False ) -> 'None'
Print a message with the rank of the current worker. Parameter Type Default Description messagestr- The message to print. debugboolFalseWhether to print the message as a debug message.
Returns: NoneTyperun_in_worker def run_in_worker ( self , func : 'Callable[..., Any]' , * args : 'Any' , ** kwargs : 'Any' ) -> 'Future[Any]'
Run a function in the worker. Parameter Type Default Description funcCallable[Ellipsis, Any]- - argsAny- - kwargsAny- -
Returns: Future[Any]setup def setup ( self , ** kwargs : 'Any' ) -> 'None'
Override this method to set up the worker. This method is called once per worker. Parameter Type Default Description kwargsAny- -
Returns: NoneTypeshutdown def shutdown ( self , timeout : 'Optional[Union[int, float]]' = None ) -> 'None'
Shutdown the event loop. Parameter Type Default Description timeoutint | float | NoneTypeNoneThe timeout for the shutdown.
Returns: NoneTypesubmit def submit ( self , coro : 'Coroutine[Any, Any, Any]' ) -> 'Future[Any]'
Submit a coroutine to the event loop. Parameter Type Default Description coroCoroutine[Any, Any, Any]- The coroutine to submit to the event loop.
Returns: Future[Any]teardown def teardown ( self ) -> 'None'
Override this method to tear down the worker. This method is called once per worker. Returns: NoneType