Prefabs#
Prefab channels and listeners#
- class SSEChannel#
Bases:
AbstractChannel
SSE streaming channel. See Mozilla docs
Currently, ‘id’ field is not supported.
- __init__(stream_delay_seconds=0, retry_timeout_milliseconds=5, channel_id=None, connections_factory=None)#
- Parameters:
stream_delay_seconds (int)
retry_timeout_milliseconds (int)
channel_id (str | None)
connections_factory (ConnectionsFactory | None)
- adapt(msg)#
SSE adapter.
Returns:
{ "event": "message type", "retry": "channel time out", "data": "original payload" }
- Parameters:
msg (MessageContract)
- Return type:
dict
- class DataProcessingChannel#
Bases:
AbstractChannel
Channel intended for concurrent processing of data.
Relies on concurrent.futures.Executor. Just override adapt method to control output returned to clients
MESSAGE_TYPE_CLOSED type is intended as end of stream. It should be considered as a reserved Message type.
- __init__(max_workers, stream_delay_seconds=0, executor_class=<class 'concurrent.futures.thread.ThreadPoolExecutor'>)#
- Parameters:
max_workers (int) – Num of workers to use
stream_delay_seconds (int) – Can be used to limit response rate of streaming. Only applies to message_stream calls.
executor_class (type) – The constructor of some Executor class. Defaults to ThreadPoolExecutor.
- async process_queue(listener)#
Performs queue processing of a given listener, returns an AsyncIterable of dictionaries containing message process result. See adapt method
- Parameters:
listener (MessageQueueListener)
- Return type:
AsyncIterable[dict]
- adapt(msg)#
Returns a dictionary in the following format:
{ "event": message type "data": message payload }
- Parameters:
msg (MessageContract)
- Return type:
dict
- class SimpleDistributedApplicationListener#
Bases:
MessageQueueListener
Listener for distributed applications
- __init__()#
- set_action(name, action)#
Hooks a callable to a string key.
Callables are selected when listener processes the message depending on its type.
They should return a list of MessageContract instances corresponding to response to action requested.
Reserved actions are ‘start’, ‘stop’. Receiving a message with one of these types will fire corresponding action.
- Parameters:
name (str)
action (Callable[[MessageContract], list[MessageContract]])
- dispatch_to(receiver, msg)#
- Parameters:
receiver (MessageQueueListener)
msg (MessageContract)
- on_message(msg)#
Executes action corresponding to message’s type
- Parameters:
msg (SignedMessage)
- Return type:
None
- class SimpleDistributedApplicationChannel#
Bases:
SSEChannel
- register_listener(listener)#
Registers an existing listener
- Parameters:
listener (SimpleDistributedApplicationListener)
- class SSEChannelRepository#
Bases:
AbstractChannelRepository
- create(channel_data)#
Creates a new channel and configures it depending on channel_data.
- Parameters:
channel_data (dict)
- Return type:
Prefab servers and clients#
- class ChannelContainer#
Helper class for management of multiple channels cases of use.
- __init__()#
- register(channel)#
- Parameters:
channel (AbstractChannel)
- Return type:
None
- register_iterable(channels)#
- Parameters:
channels (Iterable[AbstractChannel])
- Return type:
None
- get(channel_id)#
- Parameters:
channel_id (str)
- Return type:
- rm(channel_id)#
- Parameters:
channel_id (str)
- get_all_ids()#
- Return type:
Iterable[str]
- class SocketServer#
An implementation of a socket server that acts as a controller to interact with library
Accepted format: a plain JSON with the following keys:
{ "c": "channel id" "v": "verb" "t": "message type" "p": "message payload" "r": "receiver (listener id when verb is 'rl')" }
Possible values of verb identifies a supported action:
"d" dispatch "b" broadcast "c" create channel "r" add listener "l" listen (opens a stream) "w" watch (opens a stream) "rl" remove a listener "rc" remove a channel
See examples
- __init__(file_descriptor_path)#
- Parameters:
file_descriptor_path (str) – See start method
- static start(file_descriptor_path)#
Shortcut to start a server given a file descriptor path
- Parameters:
file_descriptor_path (str) – file descriptor path, all understood by Path is fine
- async shutdown()#
Graceful Shutdown
- async main()#
Starts the server
- class SocketClient#
A little facade to interact with SocketServer
- __init__(file_descriptor_path)#
- Parameters:
file_descriptor_path (str)
- async send_payload(payload)#
Send an arbitrary payload to a socket
see
SocketServer
- Parameters:
payload (dict)
- async create_channel()#
- Return type:
str
- async register(channel_id)#
- Parameters:
channel_id (str)
- async stream(channel_id, listener_id)#
- Return type:
AsyncIterable[str]
- async broadcast_message(channel_id, message_type, payload)#
- Parameters:
channel_id (str)
message_type (str)
payload (str | dict | int | float)
- async dispatch(channel_id, receiver_id, message_type, payload)#
- Parameters:
channel_id (str)
receiver_id (str)
message_type (str)
payload (str | dict | int | float)
- async remove_listener(channel_id, listener_id)#
- Parameters:
channel_id (str)
listener_id (str)
- async remove_channel(channel_id)#
- Parameters:
channel_id (str)