"""
This module defines the interface for a connector
"""
from __future__ import annotations
import abc
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.messages import BECMessage, LogMessage
logger = bec_logger.logger
[docs]
class ConsumerConnectorError(Exception):
"""
ConsumerConnectorError is raised when there is an error with the connector
"""
[docs]
class MessageObject:
"""
MessageObject is a wrapper for a message and its topic
"""
def __init__(self, topic: str, value: BECMessage) -> None:
self.topic = topic
self._value = value
@property
def value(self) -> BECMessage:
"""
Get the message
"""
return self._value
def __eq__(self, ref_val: MessageObject) -> bool:
if not isinstance(ref_val, MessageObject):
return False
return self._value == ref_val.value and self.topic == ref_val.topic
def __str__(self):
return f"MessageObject(topic={self.topic}, value={self._value})"
[docs]
class StoreInterface(abc.ABC):
"""StoreBase defines the interface for storing data"""
def __init__(self, store):
pass
[docs]
@abc.abstractmethod
def pipeline(self):
"""Create a pipeline for batch operations"""
[docs]
@abc.abstractmethod
def execute_pipeline(self, pipeline):
"""Execute a pipeline"""
[docs]
@abc.abstractmethod
def lpush(
self, topic: str, msg: str, pipe=None, max_size: int = None, expire: int = None
) -> None:
"""Push a message to the left of the list"""
[docs]
@abc.abstractmethod
def lset(self, topic: str, index: int, msg: str, pipe=None) -> None:
"""Set a value in the list at the given index"""
[docs]
@abc.abstractmethod
def rpush(self, topic: str, msg: str, pipe=None) -> int:
"""Push a message to the right of the list"""
[docs]
@abc.abstractmethod
def lrange(self, topic: str, start: int, end: int, pipe=None):
"""Get a range of values from the list"""
[docs]
@abc.abstractmethod
def set(self, topic: str, msg, pipe=None, expire: int = None) -> None:
"""Set a value"""
[docs]
@abc.abstractmethod
def keys(self, pattern: str) -> list:
"""Get keys that match the pattern"""
[docs]
@abc.abstractmethod
def delete(self, topic, pipe=None):
"""Delete a key"""
[docs]
@abc.abstractmethod
def get(self, topic: str, pipe=None):
"""Get a value"""
[docs]
@abc.abstractmethod
def xadd(self, topic: str, msg_dict: dict, max_size=None, pipe=None, expire: int = None):
"""Add a message to the stream"""
[docs]
@abc.abstractmethod
def xread(
self,
topic: str,
id: str = None,
count: int = None,
block: int = None,
pipe=None,
from_start=False,
) -> list:
"""Read from the stream"""
[docs]
@abc.abstractmethod
def xrange(self, topic: str, min: str, max: str, count: int = None, pipe=None):
"""Read from the stream"""
[docs]
class PubSubInterface(abc.ABC):
"""PubSubBase defines the interface for a pub/sub connector"""
[docs]
@abc.abstractmethod
def raw_send(self, topic: str, msg: bytes) -> None:
"""Send a raw message without using the BECMessage class"""
[docs]
@abc.abstractmethod
def send(self, topic: str, msg: BECMessage) -> None:
"""Send a message"""
[docs]
@abc.abstractmethod
def register(self, topics=None, patterns=None, cb=None, start_thread=True, **kwargs):
"""Register a callback for a topic or pattern"""
[docs]
@abc.abstractmethod
def unregister(self, topics=None, pattern=None, cb=None):
"""Unregister a callback for a topic or pattern"""
[docs]
@abc.abstractmethod
def poll_messages(self, timeout=None):
"""Poll for new messages, receive them and execute callbacks"""
[docs]
class ConnectorBase(PubSubInterface, StoreInterface):
"""ConnectorBase defines the interface for a connector"""
[docs]
def raise_warning(self, msg):
"""Raise a warning"""
raise NotImplementedError
[docs]
def send_client_info(self, msg):
"""send a msg to the client, will automatically be logged too."""
raise NotImplementedError
[docs]
def set_and_publish(self, topic: str, msg, pipe=None, expire: int = None) -> None:
"""Set a value and publish it"""
raise NotImplementedError
[docs]
def shutdown(self):
"""Shutdown the connector"""