Persistence#

_images/persistence_layer_class_diagram.png

Base classes for channels and connections repositories. The main idea is that a repository relies on some KV storage engine abstraction, and uses it for storing its own configuration, and its correspondant composites repositories to build final objects to return.

To build a persistence layer is needed to provide an implementation to some of above abstractions, and use them to build a custom ChannelRepository

Here is a possible Redis implementation

Suggestions

If your implementation uses objects that are directly persistable by storage engine you are using, cleanest manner of implementing your custom layer is to simply give an implementation to a KvStorage and just use it in all composite repositories. Otherwise you should implement a dump strategy inside repositories themselves. An example is RedisQueues persistence, as Redis client used is not serializable by pickle.

Base repositories#

class KvStorage#

Bases: ABC

Represents a Key Value storage engine. Provides functionalities do load, persist and find by key prefix

abstract fetch_by_prefix(prefix)#

Search by KV prefix

Parameters:

prefix (str)

Return type:

Iterable[Any]

abstract fetch_all()#

Return all items that have been persisted

Return type:

Iterable[Any]

abstract upsert(key, value)#

Updates or inserts a value given its corresponding key

Parameters:
abstract fetch_one(key)#

Return value correspondant to key

Parameters:

key (str)

Return type:

Any

abstract delete(key)#

Idempotent deletion. Do not throw an error on invalid key

Parameters:

key (str)

class InMemoryStorage#

Bases: KvStorage

In memory implementation

__init__(items=None)#
Parameters:

items (dict[str, Any] | None)

fetch_by_prefix(prefix)#

Search by KV prefix

Parameters:

prefix (str)

Return type:

Iterable[Any]

fetch_all()#

Return all items that have been persisted

Return type:

Iterable[Any]

upsert(key, value)#

Updates or inserts a value given its corresponding key

Parameters:
fetch_one(key)#

Return value correspondant to key

Parameters:

key (str)

Return type:

Any

delete(key)#

Idempotent deletion. Do not throw an error on invalid key

Parameters:

key (str)

class AbstractChannelRepository#

Bases: ChannelRepositoryInterface, ABC

Abstract base class for channel repositories.

Builds channels before return them using injected repositories

__init__(storage, connections_repository, connections_factory)#
Parameters:
property connections_factory: ConnectionsFactory#

The connections factory that will be injected into concrete channel instances.

property connections_repository: ConnectionRepositoryInterface#

Repository to be used to persist connections.

abstract static _channel_to_dict(channel)#

Returns a dictionary representation of the channel to be passed to eric_sse.interfaces.ChannelRepositoryInterface.create() calls.

Parameters:

channel (AbstractChannel)

Return type:

dict

load_all()#

Loads all channels

Return type:

Iterable[AbstractChannel]

load_one(channel_id)#

Loads a channel given its it

Parameters:

channel_id (str)

Return type:

AbstractChannel

persist(channel)#

Persists a channel

Parameters:

channel (AbstractChannel)

delete(channel_id)#

Deletes a channel given its it

Parameters:

channel_id (str)

class ConnectionRepository#

Bases: ConnectionRepositoryInterface

Concrete Connection Repository

Relies on KvStorage abstraction for final writes of connections data, and on correspondant repositories for related objects ones.

__init__(storage, listeners_repository, queues_repository)#
Parameters:
load_all(channel_id)#

Loads all connections managed by a given channel

Parameters:

channel_id (str)

Return type:

Iterable[Connection]

load_one(channel_id, connection_id)#

Loads a connection given the connection and channel id it belongs to.

Parameters:
  • channel_id (str)

  • connection_id (str)

Return type:

Connection

persist(channel_id, connection)#

Persists a connection and assign it to a channel.

Parameters:
delete(channel_id, connection_id)#

Deletes a connection given the connection and channel id it belongs to.

Parameters:
  • channel_id (str)

  • connection_id (str)

Interoperability#

class QueueRepositoryInterface#

Bases: ABC

abstract load(connection_id)#

Loads a queue given the connection id it belongs to.

Parameters:

connection_id (str)

Return type:

Queue

abstract persist(connection_id, queue)#

Persists queue and assign to connection.

Parameters:
abstract delete(connection_id)#

Deletes a queue given the connection id it belongs to.

Parameters:

connection_id (str)

class ListenerRepositoryInterface#

Bases: ABC

abstract load(connection_id)#

Loads a listener given the connection id it belongs to.

Parameters:

connection_id (str)

Return type:

MessageQueueListener

abstract persist(connection_id, listener)#

Persists listener and assign to connection.

Parameters:
abstract delete(connection_id)#

Deleted a listener given the connection id it belongs to.

Parameters:

connection_id (str)

class ConnectionRepositoryInterface#

Bases: ABC

abstract property queues_repository: QueueRepositoryInterface#
abstract property listeners_repository: ListenerRepositoryInterface#
abstract load_all(channel_id)#

Loads all connections managed by a given channel

Parameters:

channel_id (str)

Return type:

Iterable[Connection]

abstract load_one(channel_id, connection_id)#

Loads a connection given the connection and channel id it belongs to.

Parameters:
  • channel_id (str)

  • connection_id (str)

Return type:

Connection

abstract persist(channel_id, connection)#

Persists a connection and assign it to a channel.

Parameters:
abstract delete(channel_id, connection_id)#

Deletes a connection given the connection and channel id it belongs to.

Parameters:
  • channel_id (str)

  • connection_id (str)

class ChannelRepositoryInterface#

Bases: ABC

abstract property connections_factory: ConnectionsFactory#

The connections factory that will be injected into concrete channel instances.

abstract property connections_repository: ConnectionRepositoryInterface#

Repository to be used to persist connections.

abstract load_all()#

Loads all channels

Return type:

Iterable[AbstractChannel]

abstract load_one(channel_id)#

Loads a channel given its it

Parameters:

channel_id (str)

Return type:

AbstractChannel

abstract persist(channel)#

Persists a channel

Parameters:

channel (AbstractChannel)

abstract delete(channel_id)#

Deletes a channel given its it

Parameters:

channel_id (str)

abstract create(channel_data)#

Creates a new channel and configures it depending on channel_data.

Parameters:

channel_data (dict)

Return type:

AbstractChannel