bec_lib.redis_connector.RedisConnector#
- class RedisConnector(bootstrap: list, redis_cls=None)[source]#
Bases:
ConnectorBase
Redis connector class. This class is a wrapper around the redis library providing a simple interface to send and receive messages from a redis server.
Initialize the connector
- Parameters:
bootstrap (list) – list of strings in the form “host:port”
redis_cls (redis.client, optional) – redis client class. Defaults to None.
Methods
Return a fake thread object to be compatible with old code
delete topic
Execute a pipeline and return the results
retrieve entry, either via hgetall or get
Get last message from stream.
returns all keys matching a pattern
send an error as log
send a message as log
send a warning
Time complexity: O(1) for each element added, so O(N) to add N elements when the command is called with multiple arguments.
O(S+N) where S is the distance of start offset from HEAD for small lists, from nearest end (HEAD or TAIL) for large lists; and N is the number of elements in the specified range.
Set a value in the list at the given index
Create a new pipeline
Poll for new messages, receive them and execute callbacks
Return itself as a producer, to be compatible with old code
Raise an alarm
Raise a warning
Send a message to a topic.
Register a callback for a topic or a pattern
O(1) for each element added, so O(N) to add N elements when the command is called with multiple arguments.
Send a message to a topic
set redis value
piped combination of self.publish and self.set
Shutdown the connector
Unregister a callback for a topic or pattern
add to stream
read a range from stream
read from stream
- consumer(topics=None, patterns=None, group_id=None, event=None, cb=None, threaded=True, name=None, **kwargs)[source]#
Return a fake thread object to be compatible with old code
In order to keep this fail-safe and simple it uses ‘mock’…
- delete(topic: EndpointInfo, pipe=None)[source]#
delete topic
- execute_pipeline(pipeline) list [source]#
Execute a pipeline and return the results
- Parameters:
pipeline (Pipeline) – redis pipeline
- Returns:
list of results
- Return type:
list
- get(topic: EndpointInfo, pipe=None)[source]#
retrieve entry, either via hgetall or get
- get_last(topic: EndpointInfo, key=None, count=1)[source]#
Get last message from stream. Repeated calls will return the same message until a new message is added to the stream.
- Parameters:
topic (str) – redis topic
key (str, optional) – key to retrieve. Defaults to None. If None, the whole message is returned.
count (int, optional) – number of last elements to retrieve
- keys(pattern: EndpointInfo) list [source]#
returns all keys matching a pattern
- lpush(topic: EndpointInfo, msg: str, pipe=None, max_size: int = None, expire: int = None) None [source]#
Time complexity: O(1) for each element added, so O(N) to add N elements when the command is called with multiple arguments. Insert all the specified values at the head of the list stored at key. If key does not exist, it is created as empty list before performing the push operations. When key holds a value that is not a list, an error is returned.
- lrange(topic: EndpointInfo, start: int, end: int, pipe=None)[source]#
O(S+N) where S is the distance of start offset from HEAD for small lists, from nearest end (HEAD or TAIL) for large lists; and N is the number of elements in the specified range. Returns the specified elements of the list stored at key. The offsets start and stop are zero-based indexes, with 0 being the first element of the list (the head of the list), 1 being the next element and so on.
- lset(topic: EndpointInfo, index: int, msg: str, pipe=None) None [source]#
Set a value in the list at the given index
- poll_messages(timeout=None) None [source]#
Poll for new messages, receive them and execute callbacks
- raise_alarm(severity: Alarms, alarm_type: str, source: str, msg: str, metadata: dict)[source]#
Raise an alarm
- Parameters:
severity (Alarms) – alarm severity
alarm_type (str) – alarm type
source (str) – source
msg (str) – message
metadata (dict) – metadata
- raise_warning(msg)#
Raise a warning
- raw_send(topic: str, msg: bytes, pipe=None)[source]#
Send a message to a topic. This is the raw version of send, it does not check the message type. Use this method if you want to send a message that is not a BECMessage.
- Parameters:
topic (str) – topic
msg (bytes) – message
pipe (Pipeline, optional) – redis pipe. Defaults to None.
- register(topics: str | list[str] | EndpointInfo | list[EndpointInfo] | None = None, patterns: str | list[str] | None = None, cb: callable | None = None, start_thread: bool = True, from_start: bool = False, newest_only: bool = False, **kwargs)[source]#
Register a callback for a topic or a pattern
- Parameters:
topics (str, list, EndpointInfo, list[EndpointInfo], optional) – topic or list of topics. Defaults to None. The topic should be a valid message endpoint in BEC and can be a string or an EndpointInfo object.
patterns (str, list, optional) – pattern or list of patterns. Defaults to None. In contrast to topics, patterns may contain “*” wildcards. The evaluated patterns should be a valid pub/sub message endpoint in BEC
cb (callable, optional) – callback. Defaults to None.
start_thread (bool, optional) – start the dispatcher thread. Defaults to True.
from_start (bool, optional) – for streams only: return data from start on first reading. Defaults to False.
newest_only (bool, optional) – for streams only: return newest data only. Defaults to False.
**kwargs – additional keyword arguments to be transmitted to the callback
Examples
>>> def my_callback(msg, **kwargs): ... print(msg) ... >>> connector.register("test", my_callback) >>> connector.register(topics="test", cb=my_callback) >>> connector.register(patterns="test:*", cb=my_callback) >>> connector.register(patterns="test:*", cb=my_callback, start_thread=False) >>> connector.register(patterns="test:*", cb=my_callback, start_thread=False, my_arg="test")
- rpush(topic: EndpointInfo, msg: str, pipe=None) int [source]#
O(1) for each element added, so O(N) to add N elements when the command is called with multiple arguments. Insert all the specified values at the tail of the list stored at key. If key does not exist, it is created as empty list before performing the push operation. When key holds a value that is not a list, an error is returned.
- send(topic: EndpointInfo, msg: BECMessage, pipe=None) None [source]#
Send a message to a topic
- Parameters:
topic (str) – topic
msg (BECMessage) – message
pipe (Pipeline, optional) – redis pipe. Defaults to None.
- set(topic: EndpointInfo, msg, pipe=None, expire: int = None) None [source]#
set redis value
- set_and_publish(topic: EndpointInfo, msg, pipe=None, expire: int = None) None [source]#
piped combination of self.publish and self.set
- unregister(topics=None, patterns=None, cb=None)[source]#
Unregister a callback for a topic or pattern
- xadd(topic: EndpointInfo, msg_dict: dict, max_size=None, pipe=None, expire: int = None)[source]#
add to stream
- Parameters:
topic (str) – redis topic
msg_dict (dict) – message to add
max_size (int, optional) – max size of stream. Defaults to None.
pipe (Pipeline, optional) – redis pipe. Defaults to None.
expire (int, optional) – expire time. Defaults to None.
Examples
>>> redis.xadd("test", {"test": "test"}) >>> redis.xadd("test", {"test": "test"}, max_size=10)
- xrange(topic: EndpointInfo, min: str, max: str, count: int = None)[source]#
read a range from stream
- Parameters:
topic (str) – redis topic
min (str) – min id. Use “-” to read from start
max (str) – max id. Use “+” to read to end
count (int, optional) – number of messages to read. Defaults to None.
- Returns:
list of messages or None
- Return type:
[list]
- xread(topic: EndpointInfo, id: str = None, count: int = None, block: int = None, from_start=False) list [source]#
read from stream
- Parameters:
topic (str) – redis topic
id (str, optional) – id to read from. Defaults to None.
count (int, optional) – number of messages to read. Defaults to None, which means all.
block (int, optional) – block for x milliseconds. Defaults to None.
from_start (bool, optional) – read from start. Defaults to False.
- Returns:
list of messages
- Return type:
[list]
Examples
>>> redis.xread("test", "0-0") >>> redis.xread("test", "0-0", count=1)
# read one message at a time >>> key = 0 >>> msg = redis.xread(“test”, key, count=1) >>> key = msg[0][1][0][0] >>> next_msg = redis.xread(“test”, key, count=1)