Persistence#

Base classes for channels and connections repositories. The main idea is that a repository relies on some KV storage engine abstraction, and uses it for storing its own configuration, and its correspondant composites repositories to build final objects to return.
To build a persistence layer is needed to provide an implementation to some of above abstractions, and use them to build a custom ChannelRepository
Here is a possible Redis implementation
Suggestions
If your implementation uses objects that are directly persistable by storage engine you are using, cleanest manner of implementing your custom layer is to simply
implement a KvStorage
and delegate to it all repositories ones.
Otherwise you should implement a dump strategy inside repositories themselves. An example is RedisQueues persistence, as Redis client used is not serializable by pickle.
Base repositories#
- class KvStorage#
Bases:
ABC
Represents a Key Value storage engine. Provides functionalities do load, persist and find by key prefix
- abstract fetch_by_prefix(prefix)#
- Parameters:
prefix (str)
- Return type:
Iterable[Any]
- abstract fetch_all()#
- Return type:
Iterable[Any]
- abstract upsert(key, value)#
- Parameters:
key (str)
value (Any)
- abstract fetch_one(key)#
- Parameters:
key (str)
- Return type:
Any
- abstract delete(key)#
- Parameters:
key (str)
- class InMemoryStorage#
Bases:
KvStorage
In memory implementation
- __init__(items=None)#
- Parameters:
items (dict[str, Any] | None)
- fetch_by_prefix(prefix)#
- Parameters:
prefix (str)
- Return type:
Iterable[Any]
- fetch_all()#
- Return type:
Iterable[Any]
- upsert(key, value)#
- Parameters:
key (str)
value (Any)
- fetch_one(key)#
- Parameters:
key (str)
- Return type:
Any
- delete(key)#
- Parameters:
key (str)
- class AbstractChannelRepository#
Bases:
ChannelRepositoryInterface
,ABC
Abstract base class for channel repositories.
Builds channels before return them using injected repositories
- __init__(storage, connections_repository, connections_factory)#
- Parameters:
storage (KvStorage)
connections_repository (ConnectionRepositoryInterface)
connections_factory (ConnectionsFactory)
- property connections_factory: ConnectionsFactory#
The connections factory that will be injected into concrete channel instances.
- property connections_repository: ConnectionRepositoryInterface#
Repository to be used to persist connections.
- load_all()#
Loads all channels
- Return type:
Iterable[AbstractChannel]
- load_one(channel_id)#
Loads a channel given its it
- Parameters:
channel_id (str)
- Return type:
- persist(channel)#
Persists a channel
- Parameters:
channel (AbstractChannel)
- delete(channel_id)#
Deletes a channel given its it
- Parameters:
channel_id (str)
- class ConnectionRepository#
Bases:
ConnectionRepositoryInterface
Concrete Connection Repository
Relies on
KvStorage
abstraction for final writes of connections data, and on correspondant repositories for related objects ones.- __init__(storage, listeners_repository, queues_repository)#
- Parameters:
storage (KvStorage)
listeners_repository (ListenerRepositoryInterface)
queues_repository (QueueRepositoryInterface)
- property queues_repository: QueueRepositoryInterface#
- property listeners_repository: ListenerRepositoryInterface#
- load_all(channel_id)#
Loads all connections managed by a given channel
- Parameters:
channel_id (str)
- Return type:
Iterable[Connection]
- load_one(channel_id, connection_id)#
Loads a connection given the connection and channel id it belongs to.
- Parameters:
channel_id (str)
connection_id (str)
- Return type:
- persist(channel_id, connection)#
Persists a connection and assign it to a channel.
- Parameters:
channel_id (str)
connection (Connection)
- delete(channel_id, connection_id)#
Deletes a connection given the connection and channel id it belongs to.
- Parameters:
channel_id (str)
connection_id (str)
Interoperability#
- class QueueRepositoryInterface#
Bases:
ABC
- abstract load(connection_id)#
Loads a queue given the connection id it belongs to.
- Parameters:
connection_id (str)
- Return type:
- abstract persist(connection_id, queue)#
Persists queue and assign to connection.
- Parameters:
connection_id (str)
queue (Queue)
- abstract delete(connection_id)#
Deletes a queue given the connection id it belongs to.
- Parameters:
connection_id (str)
- class ListenerRepositoryInterface#
Bases:
ABC
- abstract load(connection_id)#
Loads a listener given the connection id it belongs to.
- Parameters:
connection_id (str)
- Return type:
- abstract persist(connection_id, listener)#
Persists listener and assign to connection.
- Parameters:
connection_id (str)
listener (MessageQueueListener)
- abstract delete(connection_id)#
Deleted a listener given the connection id it belongs to.
- Parameters:
connection_id (str)
- class ConnectionRepositoryInterface#
Bases:
ABC
- abstract property queues_repository: QueueRepositoryInterface#
- abstract property listeners_repository: ListenerRepositoryInterface#
- abstract load_all(channel_id)#
Loads all connections managed by a given channel
- Parameters:
channel_id (str)
- Return type:
Iterable[Connection]
- abstract load_one(channel_id, connection_id)#
Loads a connection given the connection and channel id it belongs to.
- Parameters:
channel_id (str)
connection_id (str)
- Return type:
- abstract persist(channel_id, connection)#
Persists a connection and assign it to a channel.
- Parameters:
channel_id (str)
connection (Connection)
- abstract delete(channel_id, connection_id)#
Deletes a connection given the connection and channel id it belongs to.
- Parameters:
channel_id (str)
connection_id (str)
- class ChannelRepositoryInterface#
Bases:
ABC
- abstract property connections_factory: ConnectionsFactory#
The connections factory that will be injected into concrete channel instances.
- abstract property connections_repository: ConnectionRepositoryInterface#
Repository to be used to persist connections.
- abstract load_all()#
Loads all channels
- Return type:
Iterable[AbstractChannel]
- abstract load_one(channel_id)#
Loads a channel given its it
- Parameters:
channel_id (str)
- Return type:
- abstract persist(channel)#
Persists a channel
- Parameters:
channel (AbstractChannel)
- abstract delete(channel_id)#
Deletes a channel given its it
- Parameters:
channel_id (str)
- abstract create(channel_data)#
Creates a new channel and configures it depending on channel_data.
- Parameters:
channel_data (dict)
- Return type: