Prefabs#

Prefab channels and listeners#

class SSEChannel#

Bases: AbstractChannel

SSE streaming channel. See Mozilla docs

Currently, ‘id’ field is not supported.

__init__(stream_delay_seconds=0, retry_timeout_milliseconds=5, channel_id=None, connections_factory=None)#
Parameters:
  • stream_delay_seconds (int)

  • retry_timeout_milliseconds (int)

  • channel_id (str | None)

  • connections_factory (ConnectionsFactory | None)

adapt(msg)#

SSE adapter.

Returns:

{
    "event": "message type",
    "retry": "channel time out",
    "data": "original payload"
}
Parameters:

msg (MessageContract)

Return type:

dict

class DataProcessingChannel#

Bases: AbstractChannel

Channel intended for concurrent processing of data.

Relies on concurrent.futures.Executor. Just override adapt method to control output returned to clients

MESSAGE_TYPE_CLOSED type is intended as end of stream. It should be considered as a reserved Message type.

__init__(max_workers, stream_delay_seconds=0, executor_class=<class 'concurrent.futures.thread.ThreadPoolExecutor'>)#
Parameters:
  • max_workers (int) – Num of workers to use

  • stream_delay_seconds (int) – Can be used to limit response rate of streaming. Only applies to message_stream calls.

  • executor_class (type) – The constructor of some Executor class. Defaults to ThreadPoolExecutor.

async process_queue(listener)#

Performs queue processing of a given listener, returns an AsyncIterable of dictionaries containing message process result. See adapt method

Parameters:

listener (MessageQueueListener)

Return type:

AsyncIterable[dict]

adapt(msg)#

Returns a dictionary in the following format:

{
    "event": message type
    "data": message payload
}
Parameters:

msg (MessageContract)

Return type:

dict

class SimpleDistributedApplicationListener#

Bases: MessageQueueListener

Listener for distributed applications

__init__()#
set_action(name, action)#

Hooks a callable to a string key.

Callables are selected when listener processes the message depending on its type.

They should return a list of MessageContract instances corresponding to response to action requested.

Reserved actions are ‘start’, ‘stop’. Receiving a message with one of these types will fire corresponding action.

Parameters:
dispatch_to(receiver, msg)#
Parameters:
on_message(msg)#

Executes action corresponding to message’s type

Parameters:

msg (SignedMessage)

Return type:

None

class SimpleDistributedApplicationChannel#

Bases: SSEChannel

register_listener(listener)#

Registers an existing listener

Parameters:

listener (SimpleDistributedApplicationListener)

class SSEChannelRepository#

Bases: AbstractChannelRepository

create(channel_data)#

Creates a new channel and configures it depending on channel_data.

Parameters:

channel_data (dict)

Return type:

SSEChannel

Prefab servers and clients#

class ChannelContainer#

Helper class for management of multiple channels cases of use.

__init__()#
register(channel)#
Parameters:

channel (AbstractChannel)

Return type:

None

register_iterable(channels)#
Parameters:

channels (Iterable[AbstractChannel])

Return type:

None

get(channel_id)#
Parameters:

channel_id (str)

Return type:

AbstractChannel

rm(channel_id)#
Parameters:

channel_id (str)

get_all_ids()#
Return type:

Iterable[str]

class SocketServer#

An implementation of a socket server that acts as a controller to interact with library

Accepted format: a plain JSON with the following keys:

{        
    "c": "channel id" 
    "v": "verb" 
    "t": "message type" 
    "p": "message payload" 
    "r": "receiver (listener id when verb is 'rl')"
}

Possible values of verb identifies a supported action:

"d" dispatch
"b" broadcast
"c" create channel
"r" add listener
"l" listen (opens a stream)
"w" watch (opens a stream)
"rl" remove a listener
"rc" remove a channel

See examples

__init__(file_descriptor_path)#
Parameters:

file_descriptor_path (str) – See start method

static start(file_descriptor_path)#

Shortcut to start a server given a file descriptor path

Parameters:

file_descriptor_path (str) – file descriptor path, all understood by Path is fine

async shutdown()#

Graceful Shutdown

async main()#

Starts the server

class SocketClient#

A little facade to interact with SocketServer

__init__(file_descriptor_path)#
Parameters:

file_descriptor_path (str)

async send_payload(payload)#

Send an arbitrary payload to a socket

see SocketServer

Parameters:

payload (dict)

async create_channel()#
Return type:

str

async register(channel_id)#
Parameters:

channel_id (str)

async stream(channel_id, listener_id)#
Return type:

AsyncIterable[str]

async broadcast_message(channel_id, message_type, payload)#
Parameters:
  • channel_id (str)

  • message_type (str)

  • payload (str | dict | int | float)

async dispatch(channel_id, receiver_id, message_type, payload)#
Parameters:
  • channel_id (str)

  • receiver_id (str)

  • message_type (str)

  • payload (str | dict | int | float)

async remove_listener(channel_id, listener_id)#
Parameters:
  • channel_id (str)

  • listener_id (str)

async remove_channel(channel_id)#
Parameters:

channel_id (str)