"""
BECClient class. This class is the main entry point for the BEC client and all
derived classes. It is used to initialize the client and start the client.
"""
from __future__ import annotations, print_function
import builtins
import getpass
import importlib
import inspect
from typing import TYPE_CHECKING
from rich.console import Console
from rich.table import Table
from bec_lib import messages
from bec_lib.alarm_handler import AlarmHandler, Alarms
from bec_lib.bec_service import BECService
from bec_lib.bl_checks import BeamlineChecks
from bec_lib.callback_handler import CallbackHandler, EventType
from bec_lib.dap_plugins import DAPPlugins
from bec_lib.devicemanager import DeviceManagerBase
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.redis_connector import RedisConnector
from bec_lib.scan_manager import ScanManager
from bec_lib.scans import Scans
from bec_lib.service_config import ServiceConfig
from bec_lib.user_scripts_mixin import UserScriptsMixin
if TYPE_CHECKING:
from bec_lib.connector import ConnectorBase
logger = bec_logger.logger
[docs]
class BECClient(BECService, UserScriptsMixin):
"""
The BECClient class is the main entry point for the BEC client and all derived classes.
"""
_client = None
_initialized = False
started = False
def __init__(
self,
config: ServiceConfig = None,
connector_cls: ConnectorBase = None,
wait_for_server=False,
forced=False,
parent=None,
) -> None:
"""
Initialize the BECClient
Args:
config (ServiceConfig, optional): The configuration for the client. Defaults to None.
connector_cls (ConnectorBase, optional): The connector class to use. Defaults to None.
wait_for_server (bool, optional): Whether to wait for the server to be available before starting. Defaults to False.
forced (bool, optional): Whether to force the initialization of a new client. Defaults to False.
"""
if self._initialized:
return
self.__init_params = {
"config": config if config is not None else ServiceConfig(),
"connector_cls": connector_cls if connector_cls is not None else RedisConnector,
"wait_for_server": wait_for_server,
}
self.device_manager = None
self.queue = None
self.alarm_handler = None
self.config = None
self.history = None
self.live_updates = None
self.dap = None
self.bl_checks = None
self._hli_funcs = {}
self.metadata = {}
self.file_writer_data = {}
self.callbacks = CallbackHandler()
self._parent = parent if parent is not None else self
self._initialized = True
def __new__(cls, *args, forced=False, **kwargs):
if forced or cls._client is None:
cls._client = super(BECClient, cls).__new__(cls)
cls._initialized = False
return cls._client
def __str__(self) -> str:
return "BECClient\n\nTo get a list of available commands, type `bec.show_all_commands()`"
@classmethod
def _reset_singleton(cls):
cls._client = None
cls._initialized = False
cls.started = False
@property
def username(self) -> str:
"""get the current username"""
return self._username
@property
def active_account(self) -> str:
"""get the currently active target (e)account"""
msg = self.connector.get(MessageEndpoints.account())
if msg:
return msg.value
return ""
[docs]
def start(self):
"""start the client"""
if self.started:
return
self.started = True
config = self.__init_params["config"]
connector_cls = self.__init_params["connector_cls"]
wait_for_server = self.__init_params["wait_for_server"]
super().__init__(config, connector_cls, wait_for_server=wait_for_server)
builtins.bec = self._parent
self._start_services()
logger.info("Starting new client")
def _start_services(self):
self._configure_logger()
self._load_scans()
# self.logbook = LogbookConnector(self.connector)
self._update_username()
self._start_device_manager()
self._start_scan_queue()
self._start_alarm_handler()
self.load_all_user_scripts()
self.config = self.device_manager.config_helper
self.history = self.queue.queue_storage.storage
self.dap = DAPPlugins(self)
self.bl_checks = BeamlineChecks(self)
self.bl_checks.start()
[docs]
def alarms(self, severity=Alarms.WARNING):
"""get the next alarm with at least the specified severity"""
if self.alarm_handler is None:
yield []
yield from self.alarm_handler.get_alarm(severity=severity)
[docs]
def show_all_alarms(self, severity=Alarms.WARNING):
"""print all unhandled alarms"""
alarms = self.alarm_handler.get_unhandled_alarms(severity=severity)
for alarm in alarms:
print(alarm)
[docs]
def clear_all_alarms(self):
"""remove all alarms from stack"""
self.alarm_handler.clear()
@property
def pre_scan_macros(self):
"""currently stored pre-scan macros"""
return self.connector.lrange(MessageEndpoints.pre_scan_macros(), 0, -1)
@pre_scan_macros.setter
def pre_scan_macros(self, hooks: list[str]):
self.connector.delete(MessageEndpoints.pre_scan_macros())
for hook in hooks:
msg = messages.VariableMessage(value=hook)
self.connector.lpush(MessageEndpoints.pre_scan_macros(), msg)
def _load_scans(self):
self.scans = Scans(self._parent)
builtins.__dict__["scans"] = self.scans
[docs]
def load_high_level_interface(self, module_name: str) -> None:
"""Load a high level interface module.
Runs a callback of type `EventType.NAMESPACE_UPDATE`
to inform clients about added objects in the namesapce.
Args:
module_name (str): The name of the module to load
"""
mod = importlib.import_module(f"bec_ipython_client.high_level_interfaces.{module_name}")
members = inspect.getmembers(mod)
funcs = {name: func for name, func in members if not name.startswith("__")}
self._hli_funcs = funcs
builtins.__dict__.update(funcs)
self.callbacks.run(EventType.NAMESPACE_UPDATE, action="add", ns_objects=funcs)
def _update_username(self):
self._username = getpass.getuser()
def _start_scan_queue(self):
self.queue = ScanManager(self.connector)
def _configure_logger(self):
bec_logger.logger.remove()
bec_logger.add_file_log(bec_logger.LOGLEVEL.DEBUG)
bec_logger.add_sys_stderr(bec_logger.LOGLEVEL.SUCCESS)
bec_logger.add_console_log()
def _start_device_manager(self):
logger.info("Starting device manager")
self.device_manager = DeviceManagerBase(self)
self.device_manager.initialize(self.bootstrap_server)
builtins.dev = self.device_manager.devices
def _start_alarm_handler(self):
logger.info("Starting alarm listener")
self.alarm_handler = AlarmHandler(self.connector)
self.alarm_handler.start()
[docs]
def shutdown(self):
"""shutdown the client and all its components"""
if self.started:
super().shutdown()
if self.device_manager:
self.device_manager.shutdown()
if self.queue:
self.queue.shutdown()
if self.alarm_handler:
self.alarm_handler.shutdown()
if self.bl_checks:
self.bl_checks.stop()
bec_logger.logger.remove()
self.started = False
def _print_available_commands(self, title: str, data: tuple) -> None:
console = Console()
table = Table(title=title)
table.add_column("Name", justify="center")
table.add_column("Description", justify="center")
for name, descr in data:
table.add_row(name, descr)
console.print(table)
def _print_user_script_commands(self) -> None:
data = self._get_user_script_commands()
self._print_available_commands("User scripts", data)
def _get_user_script_commands(self) -> list:
avail_commands = []
for name, val in self._scripts.items():
descr = self._get_description_from_doc_string(val["cls"].__doc__)
avail_commands.append((name, descr))
return avail_commands
def _get_scan_commands(self) -> list:
avail_commands = []
for name, scan in self.scans._available_scans.items():
descr = self._get_description_from_doc_string(scan.scan_info["doc"])
avail_commands.append((name, descr))
return avail_commands
def _print_scan_commands(self) -> None:
data = self._get_scan_commands()
self._print_available_commands("Scans", data)
def show_all_commands(self):
self._print_user_script_commands()
self._print_scan_commands()
@staticmethod
def _get_description_from_doc_string(doc_string: str) -> str:
if not doc_string:
return ""
return doc_string.strip().split("\n")[0]