Source code for bec_lib.channel_monitor
"""
This module provides a command line interface to monitor a channel.
"""
import argparse
import json
import re
import threading
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import RedisConnector
[docs]
def channel_callback(msg, **_kwargs):
"""
Callback for channel monitor.
"""
msg = msg.value
out = {"msg_type": msg.msg_type, "content": msg.content, "metadata": msg.metadata}
print(json.dumps(out, indent=4, default=lambda o: "<not serializable object>"))
def _start_register(redis_config, topic, callback, *args, **kwargs):
connector = RedisConnector(redis_config)
connector.register(topics=topic, cb=callback, **kwargs)
event = threading.Event()
event.wait()
[docs]
def channel_monitor_launch():
"""
Launch a channel monitor for a given channel.
"""
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
"--redis", default="localhost:6379", required=False, help="host:port of redis"
)
parser.add_argument(
"--channel", required=True, help="channel name, e.g. internal/devices/read/samx"
)
clargs = parser.parse_args()
redis_config = clargs.redis
topic = clargs.channel
_start_register(redis_config, topic, channel_callback)
[docs]
def log_callback(msg, log_filter=None):
"""
Callback for channel monitor.
"""
msg = msg.value
print_msg = msg.log_msg["text"]
if log_filter is not None:
found = log_filter.lower() in print_msg.lower() or re.search(log_filter, print_msg)
if found is None:
return
print(print_msg)
[docs]
def log_monitor_launch():
"""
Launch a log monitor.
"""
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
"--redis", default="localhost:6379", required=False, help="host:port of redis"
)
parser.add_argument("--filter", default=None, required=False, help="filter for log messages")
clargs = parser.parse_args()
redis_config = clargs.redis
topic = MessageEndpoints.log()
log_filter = clargs.filter
_start_register(redis_config, topic, log_callback, log_filter=log_filter)
# if __name__ == "__main__":
# import sys
# sys.argv = ["", "--filter", "sim.py"]
# log_monitor_launch()