Entities#

Messages#

class MessageContract#

Bases: ABC

Contract class for messages

A message is just a container of information identified by a type. For validation purposes you can override its on_message() method.

abstract property type: str#

Message type

abstract property payload: dict | list | str | int | float | None#

Message payload

class Message#

Bases: MessageContract

Models a simple message

__init__(msg_type, msg_payload=None)#
Parameters:
  • msg_type (str)

  • msg_payload (dict | list | str | int | float | None)

Return type:

None

property type: str#

Message type

property payload: dict | list | str | int | float | None#

Message payload

class UniqueMessage#

Bases: MessageContract

Messages plus an unique identifier

__init__(message, sender_id=None, message_id=None)#
Parameters:
Return type:

None

property id: str#

Unique message identifier

property type: str#

Message type

property sender_id: str#

Returns the id of the listener that sent the message

property payload: dict | list | str | int | float | None#

Message payload

class SignedMessage#

Bases: Message

Message plus sender id

__init__(sender_id, msg_type, msg_payload=None)#
Parameters:
  • sender_id (str)

  • msg_type (str)

  • msg_payload (dict | list | str | int | float | None)

property sender_id: str#

Returns the id of the listener that sent the message

Channels and connections#

class AbstractChannel#

Bases: ABC

Base class for channels.

Provides functionalities for listeners and message delivery management.

InMemoryConnectionsFactory is the default implementation used for connections_factory parameter.

see SSEChannel

Parameters:
  • stream_delay_seconds (int) – Wait time in seconds between message delivery.

  • channel_id (str) – Optionally sets the channel id.

  • connections_factory (ConnectionsFactory) – Factory to be used for creating connections instances on channel subscriptions.

__init__(stream_delay_seconds=0, channel_id=None, connections_factory=None)#
Parameters:
  • stream_delay_seconds (int)

  • channel_id (str | None)

  • connections_factory (ConnectionsFactory | None)

property id: str#

Unique identifier for this channel, it can be set by channel_id constructor parameter

abstract adapt(msg)#

Models output of channel streams

Parameters:

msg (MessageContract)

Return type:

Any

async message_stream(listener)#

Entry point for message streaming

A message with type = ‘error’ is yield on invalid listener

Parameters:

listener (MessageQueueListener)

Return type:

AsyncIterable[Any]

add_listener()#

Shortcut that creates a connection and returns correspondant listener

Return type:

MessageQueueListener

register_listener(listener)#

Registers an existing listener

Parameters:

listener (MessageQueueListener)

register_connection(connection)#

Register and existing connection.

Warning: Listener and queue should belong to the same classes returned by connection factory to avoid compatibility issues with persistence layer

Parameters:

connection (Connection)

deliver_next(listener_id)#

Returns next message for given listener id.

Raises a NoMessagesException if queue is empty

Parameters:

listener_id (str)

Return type:

MessageContract

dispatch(listener_id, msg)#

Adds a message to listener’s queue

Parameters:
broadcast(msg)#

Enqueue a message to all listeners

Parameters:

msg (MessageContract)

class Connection#

Bases: object

A connection is just a listener and its related message queue

Parameters:
__init__(listener, queue, connection_id=None)#
Parameters:
class ConnectionsFactory#

Bases: ABC

abstract create(listener=None)#

Creates a connection

Parameters:

listener (MessageQueueListener) – If provided, assigns a concrete listener

Return type:

Connection

class InMemoryConnectionsFactory#

Bases: ConnectionsFactory

Creates Connections with In memory queues (no persistence support)

create(listener=None)#

Creates a connection

Parameters:

listener (MessageQueueListener) – If provided, assigns a concrete listener

Return type:

Connection

Queues#

class Queue#

Bases: ABC

Abstract base class for queues (FIFO).

abstract pop()#

Next message from the queue.

Raises a NoMessagesException if the queue is empty.

Return type:

MessageContract

abstract push(message)#
Parameters:

message (MessageContract)

Return type:

None

Listeners#

class MessageQueueListener#

Bases: object

Base class for listeners.

Optionally you can override on_message method if you need to inject code at message delivery time.

__init__(listener_id=None)#
Parameters:

listener_id (str | None)

on_message(msg)#

Event handler. It executes when a message is delivered to client

Parameters:

msg (MessageContract)

Return type:

None

start()#
Return type:

None

stop()#
Return type:

None

is_running()#
Return type:

bool