bec_lib.redis_connector.RedisConnector#

class RedisConnector(bootstrap: list, redis_cls=None)[source]#

Bases: ConnectorBase

Redis connector class. This class is a wrapper around the redis library providing a simple interface to send and receive messages from a redis server.

Initialize the connector

Parameters:
  • bootstrap (list) – list of strings in the form “host:port”

  • redis_cls (redis.client, optional) – redis client class. Defaults to None.

Methods

consumer

Return a fake thread object to be compatible with old code

delete

delete topic

execute_pipeline

Execute a pipeline and return the results

get

retrieve entry, either via hgetall or get

get_last

Get last message from stream.

keys

returns all keys matching a pattern

log_error

send an error as log

log_message

send a message as log

log_warning

send a warning

lpush

Time complexity: O(1) for each element added, so O(N) to add N elements when the command is called with multiple arguments.

lrange

O(S+N) where S is the distance of start offset from HEAD for small lists, from nearest end (HEAD or TAIL) for large lists; and N is the number of elements in the specified range.

lset

Set a value in the list at the given index

pipeline

Create a new pipeline

poll_messages

Poll for new messages, receive them and execute callbacks

producer

Return itself as a producer, to be compatible with old code

raise_alarm

Raise an alarm

raise_warning

Raise a warning

raw_send

Send a message to a topic.

register

Register a callback for a topic or a pattern

rpush

O(1) for each element added, so O(N) to add N elements when the command is called with multiple arguments.

send

Send a message to a topic

set

set redis value

set_and_publish

piped combination of self.publish and self.set

shutdown

Shutdown the connector

unregister

Unregister a callback for a topic or pattern

xadd

add to stream

xrange

read a range from stream

xread

read from stream

consumer(topics=None, patterns=None, group_id=None, event=None, cb=None, threaded=True, name=None, **kwargs)[source]#

Return a fake thread object to be compatible with old code

In order to keep this fail-safe and simple it uses ‘mock’…

delete(topic: EndpointInfo, pipe=None)[source]#

delete topic

execute_pipeline(pipeline) list[source]#

Execute a pipeline and return the results

Parameters:

pipeline (Pipeline) – redis pipeline

Returns:

list of results

Return type:

list

get(topic: EndpointInfo, pipe=None)[source]#

retrieve entry, either via hgetall or get

get_last(topic: EndpointInfo, key=None, count=1)[source]#

Get last message from stream. Repeated calls will return the same message until a new message is added to the stream.

Parameters:
  • topic (str) – redis topic

  • key (str, optional) – key to retrieve. Defaults to None. If None, the whole message is returned.

  • count (int, optional) – number of last elements to retrieve

keys(pattern: EndpointInfo) list[source]#

returns all keys matching a pattern

log_error(msg)[source]#

send an error as log

Parameters:

msg (str) – error message

log_message(msg)[source]#

send a message as log

Parameters:

msg (str) – message

log_warning(msg)[source]#

send a warning

Parameters:

msg (str) – warning message

lpush(topic: EndpointInfo, msg: str, pipe=None, max_size: int = None, expire: int = None) None[source]#

Time complexity: O(1) for each element added, so O(N) to add N elements when the command is called with multiple arguments. Insert all the specified values at the head of the list stored at key. If key does not exist, it is created as empty list before performing the push operations. When key holds a value that is not a list, an error is returned.

lrange(topic: EndpointInfo, start: int, end: int, pipe=None)[source]#

O(S+N) where S is the distance of start offset from HEAD for small lists, from nearest end (HEAD or TAIL) for large lists; and N is the number of elements in the specified range. Returns the specified elements of the list stored at key. The offsets start and stop are zero-based indexes, with 0 being the first element of the list (the head of the list), 1 being the next element and so on.

lset(topic: EndpointInfo, index: int, msg: str, pipe=None) None[source]#

Set a value in the list at the given index

pipeline() Pipeline[source]#

Create a new pipeline

poll_messages(timeout=None) None[source]#

Poll for new messages, receive them and execute callbacks

producer()[source]#

Return itself as a producer, to be compatible with old code

raise_alarm(severity: Alarms, alarm_type: str, source: str, msg: str, metadata: dict)[source]#

Raise an alarm

Parameters:
  • severity (Alarms) – alarm severity

  • alarm_type (str) – alarm type

  • source (str) – source

  • msg (str) – message

  • metadata (dict) – metadata

raise_warning(msg)#

Raise a warning

raw_send(topic: str, msg: bytes, pipe=None)[source]#

Send a message to a topic. This is the raw version of send, it does not check the message type. Use this method if you want to send a message that is not a BECMessage.

Parameters:
  • topic (str) – topic

  • msg (bytes) – message

  • pipe (Pipeline, optional) – redis pipe. Defaults to None.

register(topics: str | list[str] | EndpointInfo | list[EndpointInfo] | None = None, patterns: str | list[str] | None = None, cb: callable | None = None, start_thread: bool = True, from_start: bool = False, newest_only: bool = False, **kwargs)[source]#

Register a callback for a topic or a pattern

Parameters:
  • topics (str, list, EndpointInfo, list[EndpointInfo], optional) – topic or list of topics. Defaults to None. The topic should be a valid message endpoint in BEC and can be a string or an EndpointInfo object.

  • patterns (str, list, optional) – pattern or list of patterns. Defaults to None. In contrast to topics, patterns may contain “*” wildcards. The evaluated patterns should be a valid pub/sub message endpoint in BEC

  • cb (callable, optional) – callback. Defaults to None.

  • start_thread (bool, optional) – start the dispatcher thread. Defaults to True.

  • from_start (bool, optional) – for streams only: return data from start on first reading. Defaults to False.

  • newest_only (bool, optional) – for streams only: return newest data only. Defaults to False.

  • **kwargs – additional keyword arguments to be transmitted to the callback

Examples

>>> def my_callback(msg, **kwargs):
...     print(msg)
...
>>> connector.register("test", my_callback)
>>> connector.register(topics="test", cb=my_callback)
>>> connector.register(patterns="test:*", cb=my_callback)
>>> connector.register(patterns="test:*", cb=my_callback, start_thread=False)
>>> connector.register(patterns="test:*", cb=my_callback, start_thread=False, my_arg="test")
rpush(topic: EndpointInfo, msg: str, pipe=None) int[source]#

O(1) for each element added, so O(N) to add N elements when the command is called with multiple arguments. Insert all the specified values at the tail of the list stored at key. If key does not exist, it is created as empty list before performing the push operation. When key holds a value that is not a list, an error is returned.

send(topic: EndpointInfo, msg: BECMessage, pipe=None) None[source]#

Send a message to a topic

Parameters:
  • topic (str) – topic

  • msg (BECMessage) – message

  • pipe (Pipeline, optional) – redis pipe. Defaults to None.

set(topic: EndpointInfo, msg, pipe=None, expire: int = None) None[source]#

set redis value

set_and_publish(topic: EndpointInfo, msg, pipe=None, expire: int = None) None[source]#

piped combination of self.publish and self.set

shutdown()[source]#

Shutdown the connector

unregister(topics=None, patterns=None, cb=None)[source]#

Unregister a callback for a topic or pattern

xadd(topic: EndpointInfo, msg_dict: dict, max_size=None, pipe=None, expire: int = None)[source]#

add to stream

Parameters:
  • topic (str) – redis topic

  • msg_dict (dict) – message to add

  • max_size (int, optional) – max size of stream. Defaults to None.

  • pipe (Pipeline, optional) – redis pipe. Defaults to None.

  • expire (int, optional) – expire time. Defaults to None.

Examples

>>> redis.xadd("test", {"test": "test"})
>>> redis.xadd("test", {"test": "test"}, max_size=10)
xrange(topic: EndpointInfo, min: str, max: str, count: int = None)[source]#

read a range from stream

Parameters:
  • topic (str) – redis topic

  • min (str) – min id. Use “-” to read from start

  • max (str) – max id. Use “+” to read to end

  • count (int, optional) – number of messages to read. Defaults to None.

Returns:

list of messages or None

Return type:

[list]

xread(topic: EndpointInfo, id: str = None, count: int = None, block: int = None, from_start=False) list[source]#

read from stream

Parameters:
  • topic (str) – redis topic

  • id (str, optional) – id to read from. Defaults to None.

  • count (int, optional) – number of messages to read. Defaults to None, which means all.

  • block (int, optional) – block for x milliseconds. Defaults to None.

  • from_start (bool, optional) – read from start. Defaults to False.

Returns:

list of messages

Return type:

[list]

Examples

>>> redis.xread("test", "0-0")
>>> redis.xread("test", "0-0", count=1)

# read one message at a time >>> key = 0 >>> msg = redis.xread(“test”, key, count=1) >>> key = msg[0][1][0][0] >>> next_msg = redis.xread(“test”, key, count=1)