Overview#

thanks a lot https://excalidraw.com !!
Entities#
- class MessageContract#
Bases:
ABC
Contract class for messages
A message is just a container of information identified by a type. For validation purposes you can override
eric_sse.entities.MessageQueueListener.on_message
- abstract property type: str#
Message type
- abstract property payload: dict | list | str | int | float | None#
Message payload
- class Message#
Bases:
MessageContract
Models a simple message
- __init__(msg_type, msg_payload=None)#
- Parameters:
msg_type (str)
msg_payload (dict | list | str | int | float | None)
- Return type:
None
- property type: str#
Message type
- property payload: dict | list | str | int | float | None#
Message payload
- class UniqueMessage#
Bases:
MessageContract
Messages plus an unique identifier
- __init__(message_id, message, sender_id=None)#
- Parameters:
message_id (str)
message (MessageContract)
sender_id (str | None)
- Return type:
None
- property id: str#
Unique message identifier
- property type: str#
Message type
- property sender_id: str#
Returns the id of the listener that sent the message
- property payload: dict | list | str | int | float | None#
Message payload
Channels and listeners#
- class MessageQueueListener#
Base class for listeners.
Optionally you can override on_message method if you need to inject code at message delivery time.
- __init__()#
- async start()#
- Return type:
None
- start_sync()#
- Return type:
None
- async is_running()#
- Return type:
bool
- is_running_sync()#
- Return type:
bool
- async stop()#
- Return type:
None
- stop_sync()#
- Return type:
None
- on_message(msg)#
Event handler. It executes when a message is delivered to client
- Parameters:
msg (MessageContract)
- Return type:
None
- class AbstractChannel#
Base class for channels.
Provides functionalities for listeners and message delivery management.
eric_sse.queue.InMemoryMessageQueueFactory
is the default implementation used for queues_factory seeeric_sse.prefabs.SSEChannel
- Parameters:
stream_delay_seconds (int) – Wait time in seconds between message delivery.
queues_factory (eric_sse.queue.AbstractMessageQueueFactory)
- __init__(stream_delay_seconds=0, queues_factory=None)#
- Parameters:
stream_delay_seconds (int)
queues_factory (AbstractMessageQueueFactory | None)
- add_listener()#
Add the default listener
- Return type:
- register_listener(l)#
Adds a listener to channel
- Parameters:
- remove_listener(l_id)#
- Parameters:
l_id (str)
- deliver_next(listener_id)#
Returns next message for given listener id.
Raises a NoMessagesException if queue is empty
- Parameters:
listener_id (str)
- Return type:
- dispatch(listener_id, msg)#
Adds a message to listener’s queue
- Parameters:
listener_id (str)
msg (MessageContract)
- broadcast(msg)#
Enqueue a message to all listeners
- Parameters:
msg (MessageContract)
- get_listener(listener_id)#
- Parameters:
listener_id (str)
- Return type:
- abstract adapt(msg)#
- Parameters:
msg (MessageContract)
- Return type:
Any
- async message_stream(listener)#
Entry point for message streaming
A message with type = ‘error’ is yeld on invalid listener or channel
- Parameters:
listener (MessageQueueListener)
- Return type:
AsyncIterable[Any]
- async watch()#
- Return type:
AsyncIterable[Any]
- notify_end()#
Broadcasts a MESSAGE_TYPE_CLOSED Message
Prefab channels and listeners#
- class SSEChannel#
Bases:
AbstractChannel
SSE streaming channel. See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
Currently, ‘id’ field is not supported.
- Parameters:
stream_delay_seconds (int)
retry_timeout_milliseconds (int)
queues_factory (eric_sse.queue.AbstractMessageQueueFactory)
- __init__(stream_delay_seconds=0, retry_timeout_milliseconds=5, queues_factory=None)#
- Parameters:
stream_delay_seconds (int)
retry_timeout_milliseconds (int)
queues_factory (AbstractMessageQueueFactory | None)
- payload_adapter: Callable[[dict | list | str | int | float | None], dict | list | str | int | float | None]#
Message payload adapter, defaults to identity (leave as is). It can be used, for example, when working in a context where receiver is responsible for payload deserialization, e.g. Sockets
- adapt(msg)#
SSE adapter.
Returns:
{ "event": "message type", "retry": "channel time out", "data": "original payload (by default)" }
- Parameters:
msg (MessageContract)
- Return type:
dict
- class DataProcessingChannel#
Bases:
AbstractChannel
Channel intended for concurrent processing of data.
Relies on concurrent.futures.ThreadPoolExecutor. Just override adapt method to control output returned to clients
MESSAGE_TYPE_CLOSED type is intended as end of stream. It should be considered as a reserved Message type.
- __init__(max_workers, stream_delay_seconds=0)#
- Parameters:
max_workers (int) – Num of workers to use
stream_delay_seconds (int) – Can be used to limit response rate of streaming. Only applies to message_stream calls.
- async process_queue(l)#
Launches the processing of the given listener’s queue
- Parameters:
- Return type:
AsyncIterable[dict]
- class SimpleDistributedApplicationListener#
Bases:
MessageQueueListener
Listener for distributed applications
- __init__(channel)#
- Parameters:
channel (AbstractChannel)
- set_action(name, action)#
Hooks a callable to a string key.
Callables are selected when listener processes the message depending on its type.
They should return a list of Messages corresponding to response to action requested.
Reserved actions are ‘start’, ‘stop’, ‘remove’. Receiving a message with one of these types will fire correspondant action.
- Parameters:
name (str)
action (Callable[[MessageContract], list[MessageContract]])
- dispatch_to(receiver, msg)#
- Parameters:
receiver (MessageQueueListener)
msg (MessageContract)
- on_message(msg)#
Executes action correspondant to message’s type
- Parameters:
msg (SignedMessage)
- Return type:
None
- remove_sync()#
Stop and unregister
Prefab servers and clients#
- class SSEChannelContainer#
Helper class for management of multiple SSE channels cases of use.
- __init__()#
- add(queues_factory=None)#
- Parameters:
queues_factory (AbstractMessageQueueFactory | None)
- Return type:
- get(channel_id)#
- Parameters:
channel_id (str)
- Return type:
- rm(channel_id)#
- Parameters:
channel_id (str)
- class SocketServer#
An implementation of a socket server that acts as a controller to interact with library
Accepted format: a plain JSON with the following keys:
{ "c": "channel id" "v": "verb" "t": "message type" "p": "message payload" "r": "receiver (listener id when verb is 'rl')" }
Possible values of verb identifies a supported action:
"d" dispatch "b" broadcast "c" create channel "r" add listener "l" listen (opens a stream) "w" watch (opens a stream) "rl" remove a listener "rc" remove a channel
See examples
- __init__(file_descriptor_path)#
- Parameters:
file_descriptor_path (str)
- async static connect_callback(reader, writer)#
- Parameters:
reader (StreamReader)
writer (StreamWriter)
- static handle_command(raw_command)#
- Parameters:
raw_command (str)
- Return type:
AsyncIterable[str]
- async shutdown()#
Graceful Shutdown
- async main()#
- static start(file_descriptor_path)#
Shortcut to start a server
- Parameters:
file_descriptor_path (str)
- class SocketClient#
A little facade to interact with SocketServer
- __init__(file_descriptor_path)#
- Parameters:
file_descriptor_path (str)
- async send_payload(payload)#
Send an arbitrary payload to a socket
see
eric_sse.servers.SocketServer
- Parameters:
payload (dict)
- async create_channel()#
- Return type:
str
- async register(channel_id)#
- Parameters:
channel_id (str)
- async stream(channel_id, listener_id)#
- Return type:
AsyncIterable[str]
- async broadcast_message(channel_id, message_type, payload)#
- Parameters:
channel_id (str)
message_type (str)
payload (str | dict | int | float)
- async dispatch(channel_id, receiver_id, message_type, payload)#
- Parameters:
channel_id (str)
receiver_id (str)
message_type (str)
payload (str | dict | int | float)
- async remove_listener(channel_id, listener_id)#
- Parameters:
channel_id (str)
listener_id (str)
- async remove_channel(channel_id)#
- Parameters:
channel_id (str)
Queues#
- class Queue#
Bases:
ABC
Abstract base class for queues (FIFO)
- abstract pop()#
Next message from the queue.
Raises a
eric_sse.exception.NoMessagesException
if the queue is empty.- Return type:
- abstract push(message)#
- Parameters:
message (MessageContract)
- Return type:
None
- abstract delete()#
Removes all messages from the queue.
- Return type:
None
- class AbstractMessageQueueFactory#
Bases:
ABC
Abstraction for queues creation
- class InMemoryMessageQueueFactory#
Bases:
AbstractMessageQueueFactory
Default implementation used by
eric_sse.entities.AbstractChannel
- exception RepositoryError#
Bases:
Exception
Raised when an unexpected error occurs while trying to fetch messages from a queue.
Concrete implementations of
Queue
should wrap here the unexpected exceptions they catch before raising, and aneric_sse.exception.NoMessagesException
when a pop is requested on an empty queue.
Exceptions#
- exception InvalidChannelException#
- exception InvalidListenerException#
- exception InvalidMessageFormat#
- exception NoMessagesException#
Raised when trying to fetch from an empty queue