Overview

_images/overview.png

thanks a lot https://excalidraw.com !!

Entities

class Message(type: str, payload: dict | list | str | int | float | None = None)

Models a message

It’s just a container of information identified by a type. For validation purposes you can override MessageQueueListener.on_message

class SignedMessage(sender_id: str, msg_type: str, msg_payload: dict | list | str | int | float | None = None)

A wrapper that adds sender id

class MessageQueueListener

Base class for listeners.

Optionally you can override on_message method if you need to inject code at message delivery time.

async start() None
start_sync() None
async is_running() bool
is_running_sync() bool
async stop() None
stop_sync() None
on_message(msg: Message) None

Event handler. It executes when a message is delivered to client

class AbstractChannel(stream_delay_seconds: int = 0)

Base class for channels.

Provides functionalities for listeners and message delivery management. SSEChannel is the default implementation

add_listener() MessageQueueListener

Add the default listener

register_listener(l: MessageQueueListener)

Adds a listener to channel

remove_listener(l_id: str)
deliver_next(listener_id: str) Message

Returns next message for given listener id.

Raises a NoMessagesException if queue is empty

dispatch(listener_id: str, msg: Message)

Adds a message to listener’s queue

broadcast(msg: Message)

Enqueue a message to all listeners

get_listener(listener_id: str) MessageQueueListener
abstract adapt(msg: Message) Any
async message_stream(listener: MessageQueueListener) AsyncIterable[Any]

Entry point for message streaming

In case of failure at channel resolution time, a special message with type=MESSAGE_TYPE_CLOSED is sent, and correspondant listener is stopped

async watch() AsyncIterable[Any]
notify_end()

Broadcasts a MESSAGE_TYPE_CLOSED Message

Prefab channels and listeners

class SSEChannel(stream_delay_seconds: int = 0, retry_timeout_milliseconds: int = 5)

Bases: AbstractChannel

SSE streaming channel. See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format

Currently, ‘id’ field is not supported.

class DataProcessingChannel(max_workers: int, stream_delay_seconds: int = 0)

Bases: AbstractChannel

Channel intended for concurrent processing of data.

Relies on concurrent.futures.ThreadPoolExecutor. Just override adapt method to control output returned to clients

MESSAGE_TYPE_CLOSED type is intended as end of stream. It should be considered as a reserved Message type.

async process_queue(l: MessageQueueListener) AsyncIterable[dict]

Launches the processing of the given listener’s queue

class SimpleDistributedApplicationListener(channel: AbstractChannel)

Bases: MessageQueueListener

Listener for distrubuted applications

set_action(name: str, action: Callable[[Message], list[Message]])

Hooks a callable to a string key.

Callables are selected when listener processes the message depending on its type.

They should return a list of Messages corresponding to response to action requested. Use ‘stop’ as Message type to stop receiver listener.

on_message(msg: Message) None

Executes action correspondant to message’s type

Prefab servers and clients

class SSEChannelContainer

Helper class for management of multiple SSE channels cases of use.

class SocketServer(file_descriptor_path: str)

An implementation of a socket server that acts as a controller to interact with library

Accepted format: a plain (no nested) JSON with the following keys:

{        
    "c": "channel id" 
    "v": "verb" 
    "t": "message type" 
    "p": "message payload" 
    "r": "receiver (listener id when verb is 'rl')"
}

Possible values of verb identifies a supported action:

"d" dispatch
"b" broadcast
"c" create channel
"r" add listener
"l" listen (opens a stream)
"w" watch (opens a stream)
"rl" remove a listener
"rc" remove a channel

See examples

async shutdown()

Graceful Shutdown

static start(file_descriptor_path: str)

Shortcut to start a server

class SocketClient(file_descriptor_path: str)

A little facade to interact with SocketServer

Exceptions

exception InvalidChannelException
exception InvalidListenerException
exception InvalidMessageFormat
exception NoMessagesException

Raised when trying to fetch from an empty queue