Persistence#

_images/persistence_layer_class_diagram.png

Base classes for channels and connections repositories. The main idea is that a repository relies on some KV storage engine abstraction, and uses it for storing its own configuration, and its correspondant composites repositories to build final objects to return.

To build a persistence layer is needed to provide an implementation to some of above abstractions, and use them to build a custom ChannelRepository

Here is a possible Redis implementation

Suggestions

If your implementation uses objects that are directly persistable by storage engine you are using, cleanest manner of implementing your custom layer is to simply implement a KvStorage and delegate to it all repositories ones. Otherwise you should implement a dump strategy inside repositories themselves. An example is RedisQueues persistence, as Redis client used is not serializable by pickle.

Base repositories#

class KvStorage#

Bases: ABC

Represents a Key Value storage engine. Provides functionalities do load, persist and find by key prefix

abstract fetch_by_prefix(prefix)#
Parameters:

prefix (str)

Return type:

Iterable[Any]

abstract fetch_all()#
Return type:

Iterable[Any]

abstract upsert(key, value)#
Parameters:
  • key (str)

  • value (Any)

abstract fetch_one(key)#
Parameters:

key (str)

Return type:

Any

abstract delete(key)#
Parameters:

key (str)

class InMemoryStorage#

Bases: KvStorage

In memory implementation

__init__(items=None)#
Parameters:

items (dict[str, Any] | None)

fetch_by_prefix(prefix)#
Parameters:

prefix (str)

Return type:

Iterable[Any]

fetch_all()#
Return type:

Iterable[Any]

upsert(key, value)#
Parameters:
  • key (str)

  • value (Any)

fetch_one(key)#
Parameters:

key (str)

Return type:

Any

delete(key)#
Parameters:

key (str)

class AbstractChannelRepository#

Bases: ChannelRepositoryInterface, ABC

Abstract base class for channel repositories.

Builds channels before return them using injected repositories

__init__(storage, connections_repository, connections_factory)#
Parameters:
property connections_factory: ConnectionsFactory#

The connections factory that will be injected into concrete channel instances.

property connections_repository: ConnectionRepositoryInterface#

Repository to be used to persist connections.

load_all()#

Loads all channels

Return type:

Iterable[AbstractChannel]

load_one(channel_id)#

Loads a channel given its it

Parameters:

channel_id (str)

Return type:

AbstractChannel

persist(channel)#

Persists a channel

Parameters:

channel (AbstractChannel)

delete(channel_id)#

Deletes a channel given its it

Parameters:

channel_id (str)

class ConnectionRepository#

Bases: ConnectionRepositoryInterface

Concrete Connection Repository

Relies on KvStorage abstraction for final writes of connections data, and on correspondant repositories for related objects ones.

__init__(storage, listeners_repository, queues_repository)#
Parameters:
property queues_repository: QueueRepositoryInterface#
property listeners_repository: ListenerRepositoryInterface#
load_all(channel_id)#

Loads all connections managed by a given channel

Parameters:

channel_id (str)

Return type:

Iterable[Connection]

load_one(channel_id, connection_id)#

Loads a connection given the connection and channel id it belongs to.

Parameters:
  • channel_id (str)

  • connection_id (str)

Return type:

Connection

persist(channel_id, connection)#

Persists a connection and assign it to a channel.

Parameters:
delete(channel_id, connection_id)#

Deletes a connection given the connection and channel id it belongs to.

Parameters:
  • channel_id (str)

  • connection_id (str)

Interoperability#

class QueueRepositoryInterface#

Bases: ABC

abstract load(connection_id)#

Loads a queue given the connection id it belongs to.

Parameters:

connection_id (str)

Return type:

Queue

abstract persist(connection_id, queue)#

Persists queue and assign to connection.

Parameters:
  • connection_id (str)

  • queue (Queue)

abstract delete(connection_id)#

Deletes a queue given the connection id it belongs to.

Parameters:

connection_id (str)

class ListenerRepositoryInterface#

Bases: ABC

abstract load(connection_id)#

Loads a listener given the connection id it belongs to.

Parameters:

connection_id (str)

Return type:

MessageQueueListener

abstract persist(connection_id, listener)#

Persists listener and assign to connection.

Parameters:
abstract delete(connection_id)#

Deleted a listener given the connection id it belongs to.

Parameters:

connection_id (str)

class ConnectionRepositoryInterface#

Bases: ABC

abstract property queues_repository: QueueRepositoryInterface#
abstract property listeners_repository: ListenerRepositoryInterface#
abstract load_all(channel_id)#

Loads all connections managed by a given channel

Parameters:

channel_id (str)

Return type:

Iterable[Connection]

abstract load_one(channel_id, connection_id)#

Loads a connection given the connection and channel id it belongs to.

Parameters:
  • channel_id (str)

  • connection_id (str)

Return type:

Connection

abstract persist(channel_id, connection)#

Persists a connection and assign it to a channel.

Parameters:
abstract delete(channel_id, connection_id)#

Deletes a connection given the connection and channel id it belongs to.

Parameters:
  • channel_id (str)

  • connection_id (str)

class ChannelRepositoryInterface#

Bases: ABC

abstract property connections_factory: ConnectionsFactory#

The connections factory that will be injected into concrete channel instances.

abstract property connections_repository: ConnectionRepositoryInterface#

Repository to be used to persist connections.

abstract load_all()#

Loads all channels

Return type:

Iterable[AbstractChannel]

abstract load_one(channel_id)#

Loads a channel given its it

Parameters:

channel_id (str)

Return type:

AbstractChannel

abstract persist(channel)#

Persists a channel

Parameters:

channel (AbstractChannel)

abstract delete(channel_id)#

Deletes a channel given its it

Parameters:

channel_id (str)

abstract create(channel_data)#

Creates a new channel and configures it depending on channel_data.

Parameters:

channel_data (dict)

Return type:

AbstractChannel