Overview#

_images/overview.png

thanks a lot https://excalidraw.com !!

Entities#

class MessageContract#

Bases: ABC

Contract class for messages

A message is just a container of information identified by a type. For validation purposes you can override eric_sse.entities.MessageQueueListener.on_message

abstract property type: str#

Message type

abstract property payload: dict | list | str | int | float | None#

Message payload

class Message#

Bases: MessageContract

Models a simple message

__init__(msg_type, msg_payload=None)#
Parameters:
  • msg_type (str)

  • msg_payload (dict | list | str | int | float | None)

Return type:

None

property type: str#

Message type

property payload: dict | list | str | int | float | None#

Message payload

class UniqueMessage#

Bases: MessageContract

Messages plus an unique identifier

__init__(message_id, message, sender_id=None)#
Parameters:
Return type:

None

property id: str#

Unique message identifier

property type: str#

Message type

property sender_id: str#

Returns the id of the listener that sent the message

property payload: dict | list | str | int | float | None#

Message payload

class SignedMessage#

Bases: Message

Message plus sender id

__init__(sender_id, msg_type, msg_payload=None)#
Parameters:
  • sender_id (str)

  • msg_type (str)

  • msg_payload (dict | list | str | int | float | None)

property sender_id: str#

Returns the id of the listener that sent the message

Channels and listeners#

class MessageQueueListener#

Base class for listeners.

Optionally you can override on_message method if you need to inject code at message delivery time.

__init__()#
async start()#
Return type:

None

start_sync()#
Return type:

None

async is_running()#
Return type:

bool

is_running_sync()#
Return type:

bool

async stop()#
Return type:

None

stop_sync()#
Return type:

None

on_message(msg)#

Event handler. It executes when a message is delivered to client

Parameters:

msg (MessageContract)

Return type:

None

class AbstractChannel#

Base class for channels.

Provides functionalities for listeners and message delivery management.

eric_sse.queue.InMemoryMessageQueueFactory is the default implementation used for queues_factory see eric_sse.prefabs.SSEChannel

Parameters:
__init__(stream_delay_seconds=0, queues_factory=None)#
Parameters:
add_listener()#

Add the default listener

Return type:

MessageQueueListener

register_listener(l)#

Adds a listener to channel

Parameters:

l (MessageQueueListener)

remove_listener(l_id)#
Parameters:

l_id (str)

deliver_next(listener_id)#

Returns next message for given listener id.

Raises a NoMessagesException if queue is empty

Parameters:

listener_id (str)

Return type:

MessageContract

dispatch(listener_id, msg)#

Adds a message to listener’s queue

Parameters:
broadcast(msg)#

Enqueue a message to all listeners

Parameters:

msg (MessageContract)

get_listener(listener_id)#
Parameters:

listener_id (str)

Return type:

MessageQueueListener

abstract adapt(msg)#
Parameters:

msg (MessageContract)

Return type:

Any

async message_stream(listener)#

Entry point for message streaming

A message with type = ‘error’ is yeld on invalid listener or channel

Parameters:

listener (MessageQueueListener)

Return type:

AsyncIterable[Any]

async watch()#
Return type:

AsyncIterable[Any]

notify_end()#

Broadcasts a MESSAGE_TYPE_CLOSED Message

Prefab channels and listeners#

class SSEChannel#

Bases: AbstractChannel

SSE streaming channel. See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format

Currently, ‘id’ field is not supported.

Parameters:
__init__(stream_delay_seconds=0, retry_timeout_milliseconds=5, queues_factory=None)#
Parameters:
payload_adapter: Callable[[dict | list | str | int | float | None], dict | list | str | int | float | None]#

Message payload adapter, defaults to identity (leave as is). It can be used, for example, when working in a context where receiver is responsible for payload deserialization, e.g. Sockets

adapt(msg)#

SSE adapter.

Returns:

{
    "event": "message type",
    "retry": "channel time out",
    "data": "original payload (by default)"
}
Parameters:

msg (MessageContract)

Return type:

dict

class DataProcessingChannel#

Bases: AbstractChannel

Channel intended for concurrent processing of data.

Relies on concurrent.futures.ThreadPoolExecutor. Just override adapt method to control output returned to clients

MESSAGE_TYPE_CLOSED type is intended as end of stream. It should be considered as a reserved Message type.

__init__(max_workers, stream_delay_seconds=0)#
Parameters:
  • max_workers (int) – Num of workers to use

  • stream_delay_seconds (int) – Can be used to limit response rate of streaming. Only applies to message_stream calls.

async process_queue(l)#

Launches the processing of the given listener’s queue

Parameters:

l (MessageQueueListener)

Return type:

AsyncIterable[dict]

adapt(msg)#
Parameters:

msg (Message)

Return type:

dict

class SimpleDistributedApplicationListener#

Bases: MessageQueueListener

Listener for distributed applications

__init__(channel)#
Parameters:

channel (AbstractChannel)

set_action(name, action)#

Hooks a callable to a string key.

Callables are selected when listener processes the message depending on its type.

They should return a list of Messages corresponding to response to action requested.

Reserved actions are ‘start’, ‘stop’, ‘remove’. Receiving a message with one of these types will fire correspondant action.

Parameters:
dispatch_to(receiver, msg)#
Parameters:
on_message(msg)#

Executes action correspondant to message’s type

Parameters:

msg (SignedMessage)

Return type:

None

remove_sync()#

Stop and unregister

Prefab servers and clients#

class SSEChannelContainer#

Helper class for management of multiple SSE channels cases of use.

__init__()#
add(queues_factory=None)#
Parameters:

queues_factory (AbstractMessageQueueFactory | None)

Return type:

SSEChannel

get(channel_id)#
Parameters:

channel_id (str)

Return type:

SSEChannel

rm(channel_id)#
Parameters:

channel_id (str)

class SocketServer#

An implementation of a socket server that acts as a controller to interact with library

Accepted format: a plain JSON with the following keys:

{        
    "c": "channel id" 
    "v": "verb" 
    "t": "message type" 
    "p": "message payload" 
    "r": "receiver (listener id when verb is 'rl')"
}

Possible values of verb identifies a supported action:

"d" dispatch
"b" broadcast
"c" create channel
"r" add listener
"l" listen (opens a stream)
"w" watch (opens a stream)
"rl" remove a listener
"rc" remove a channel

See examples

__init__(file_descriptor_path)#
Parameters:

file_descriptor_path (str)

async static connect_callback(reader, writer)#
Parameters:
  • reader (StreamReader)

  • writer (StreamWriter)

static handle_command(raw_command)#
Parameters:

raw_command (str)

Return type:

AsyncIterable[str]

async shutdown()#

Graceful Shutdown

async main()#
static start(file_descriptor_path)#

Shortcut to start a server

Parameters:

file_descriptor_path (str)

class SocketClient#

A little facade to interact with SocketServer

__init__(file_descriptor_path)#
Parameters:

file_descriptor_path (str)

async send_payload(payload)#

Send an arbitrary payload to a socket

see eric_sse.servers.SocketServer

Parameters:

payload (dict)

async create_channel()#
Return type:

str

async register(channel_id)#
Parameters:

channel_id (str)

async stream(channel_id, listener_id)#
Return type:

AsyncIterable[str]

async broadcast_message(channel_id, message_type, payload)#
Parameters:
  • channel_id (str)

  • message_type (str)

  • payload (str | dict | int | float)

async dispatch(channel_id, receiver_id, message_type, payload)#
Parameters:
  • channel_id (str)

  • receiver_id (str)

  • message_type (str)

  • payload (str | dict | int | float)

async remove_listener(channel_id, listener_id)#
Parameters:
  • channel_id (str)

  • listener_id (str)

async remove_channel(channel_id)#
Parameters:

channel_id (str)

Queues#

class Queue#

Bases: ABC

Abstract base class for queues (FIFO)

abstract pop()#

Next message from the queue.

Raises a eric_sse.exception.NoMessagesException if the queue is empty.

Return type:

MessageContract

abstract push(message)#
Parameters:

message (MessageContract)

Return type:

None

abstract delete()#

Removes all messages from the queue.

Return type:

None

class AbstractMessageQueueFactory#

Bases: ABC

Abstraction for queues creation

see eric_sse.entities.AbstractChannel

abstract create()#
Return type:

Queue

class InMemoryMessageQueueFactory#

Bases: AbstractMessageQueueFactory

Default implementation used by eric_sse.entities.AbstractChannel

create()#
Return type:

Queue

exception RepositoryError#

Bases: Exception

Raised when an unexpected error occurs while trying to fetch messages from a queue.

Concrete implementations of Queue should wrap here the unexpected exceptions they catch before raising, and an eric_sse.exception.NoMessagesException when a pop is requested on an empty queue.

Exceptions#

exception InvalidChannelException#
exception InvalidListenerException#
exception InvalidMessageFormat#
exception NoMessagesException#

Raised when trying to fetch from an empty queue