Source code for bec_lib.tests.utils

from __future__ import annotations

import builtins
import functools
import os
import time
import uuid
from typing import TYPE_CHECKING

import yaml

import bec_lib
from bec_lib import BECClient, messages
from bec_lib.connector import ConnectorBase
from bec_lib.devicemanager import DeviceManagerBase
from bec_lib.endpoints import EndpointInfo, MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.scans import Scans

if TYPE_CHECKING:
    from bec_lib.alarm_handler import Alarms

dir_path = os.path.dirname(bec_lib.__file__)

logger = bec_logger.logger

# pylint: disable=no-member
# pylint: disable=missing-function-docstring
# pylint: disable=redefined-outer-name
# pylint: disable=protected-access


[docs] def queue_is_empty(queue) -> bool: # pragma: no cover if not queue: return True if not queue["primary"].get("info"): return True return False
[docs] def get_queue(bec): # pragma: no cover return bec.queue.connector.get(MessageEndpoints.scan_queue_status())
[docs] def wait_for_empty_queue(bec): # pragma: no cover while not get_queue(bec): time.sleep(1) while not queue_is_empty(get_queue(bec).content["queue"]): time.sleep(1) logger.info(bec.queue) while get_queue(bec).content["queue"]["primary"]["status"] != "RUNNING": time.sleep(1) logger.info(bec.queue)
[docs] class ScansMock(Scans): def _import_scans(self): pass def open_scan_def(self): pass def close_scan_def(self): pass def close_scan_group(self): pass def umv(self): pass
[docs] class ClientMock(BECClient): def _load_scans(self): self.scans = ScansMock(self) builtins.scans = self.scans # def _start_services(self): # pass def _start_metrics_emitter(self): pass def _start_update_service_info(self): pass
[docs] def get_device_info_mock(device_name, device_class) -> messages.DeviceInfoMessage: device_info = { "samx": messages.DeviceInfoMessage( device="samx", info={ "device_info": { "device_base_class": "positioner", "signals": [ { "component_name": "readback", "obj_name": device_name, "kind_int": 5, "kind_str": "Kind.hinted", }, { "component_name": "setpoint", "obj_name": f"{device_name}_setpoint", "kind_int": 1, "kind_str": "Kind.normal", }, { "component_name": "motor_is_moving", "obj_name": f"{device_name}_motor_is_moving", "kind_int": 1, "kind_str": "Kind.normal", }, { "component_name": "readback", "obj_name": device_name, "kind_int": 5, "kind_str": "Kind.hinted", }, { "component_name": "velocity", "obj_name": f"{device_name}_velocity", "kind_int": 2, "kind_str": "Kind.config", }, { "component_name": "acceleration", "obj_name": f"{device_name}_acceleration", "kind_int": 2, "kind_str": "Kind.config", }, { "component_name": "high_limit_travel", "obj_name": f"{device_name}_high_limit_travel", "kind_int": 2, "kind_str": "Kind.config", }, { "component_name": "low_limit_travel", "obj_name": f"{device_name}_low_limit_travel", "kind_int": 2, "kind_str": "Kind.config", }, { "component_name": "unused", "obj_name": f"{device_name}_unused", "kind_int": 0, "kind_str": "Kind.omitted", }, ], "hints": {"fields": ["samx"]}, "describe": { "samx": { "source": "SIM:samx", "dtype": "integer", "shape": [], "precision": 3, }, "samx_setpoint": { "source": "SIM:samx_setpoint", "dtype": "integer", "shape": [], "precision": 3, }, "samx_motor_is_moving": { "source": "SIM:samx_motor_is_moving", "dtype": "integer", "shape": [], "precision": 3, }, }, "describe_configuration": { "samx_velocity": { "source": "SIM:samx_velocity", "dtype": "integer", "shape": [], }, "samx_acceleration": { "source": "SIM:samx_acceleration", "dtype": "integer", "shape": [], }, }, "sub_devices": [], "custom_user_access": { "dummy_controller": { "_func_with_args": {"type": "func", "doc": None}, "_func_with_args_and_kwargs": {"type": "func", "doc": None}, "_func_with_kwargs": {"type": "func", "doc": None}, "_func_without_args_kwargs": {"type": "func", "doc": None}, "controller_show_all": { "type": "func", "doc": ( "dummy controller show all\n\n Raises:\n " " in: _description_\n LimitError:" " _description_\n\n Returns:\n _type_:" " _description_\n " ), }, "some_var": {"type": "int"}, }, "sim_state": {"type": "dict"}, "speed": {"type": "int"}, }, } }, ), "dyn_signals": messages.DeviceInfoMessage( device="dyn_signals", info={ "device_info": { "device_dotted_name": "dyn_signals", "device_attr_name": "dyn_signals", "device_base_class": "device", "signals": [], "hints": {"fields": []}, "describe": { "dyn_signals_messages_message1": { "source": "SIM:dyn_signals_messages_message1", "dtype": "integer", "shape": [], "precision": 3, }, "dyn_signals_messages_message2": { "source": "SIM:dyn_signals_messages_message2", "dtype": "integer", "shape": [], "precision": 3, }, "dyn_signals_messages_message3": { "source": "SIM:dyn_signals_messages_message3", "dtype": "integer", "shape": [], "precision": 3, }, "dyn_signals_messages_message4": { "source": "SIM:dyn_signals_messages_message4", "dtype": "integer", "shape": [], "precision": 3, }, "dyn_signals_messages_message5": { "source": "SIM:dyn_signals_messages_message5", "dtype": "integer", "shape": [], "precision": 3, }, }, "describe_configuration": {}, "sub_devices": [ { "device_name": "dyn_signals_messages", "device_info": { "device_attr_name": "messages", "device_dotted_name": "messages", "device_base_class": "device", "signals": [ { "component_name": "message1", "obj_name": "dyn_signals_messages_message1", "kind_int": 1, "kind_str": "Kind.normal", }, { "component_name": "message2", "obj_name": "dyn_signals_messages_message2", "kind_int": 1, "kind_str": "Kind.normal", }, { "component_name": "message3", "obj_name": "dyn_signals_messages_message3", "kind_int": 1, "kind_str": "Kind.normal", }, { "component_name": "message4", "obj_name": "dyn_signals_messages_message4", "kind_int": 1, "kind_str": "Kind.normal", }, { "component_name": "message5", "obj_name": "dyn_signals_messages_message5", "kind_int": 1, "kind_str": "Kind.normal", }, ], "hints": {"fields": []}, "describe": { "dyn_signals_messages_message1": { "source": "SIM:dyn_signals_messages_message1", "dtype": "integer", "shape": [], "precision": 3, }, "dyn_signals_messages_message2": { "source": "SIM:dyn_signals_messages_message2", "dtype": "integer", "shape": [], "precision": 3, }, "dyn_signals_messages_message3": { "source": "SIM:dyn_signals_messages_message3", "dtype": "integer", "shape": [], "precision": 3, }, "dyn_signals_messages_message4": { "source": "SIM:dyn_signals_messages_message4", "dtype": "integer", "shape": [], "precision": 3, }, "dyn_signals_messages_message5": { "source": "SIM:dyn_signals_messages_message5", "dtype": "integer", "shape": [], "precision": 3, }, }, "describe_configuration": {}, "sub_devices": [], "custom_user_access": {}, }, } ], "custom_user_access": {}, } }, ), } if device_name in device_info: return device_info[device_name] device_base_class = "positioner" if device_class == "SimPositioner" else "signal" if device_base_class == "positioner": signals = [ { "component_name": "readback", "obj_name": device_name, "kind_int": 5, "kind_str": "Kind.hinted", }, { "component_name": "setpoint", "obj_name": f"{device_name}_setpoint", "kind_int": 1, "kind_str": "Kind.normal", }, { "component_name": "motor_is_moving", "obj_name": f"{device_name}_motor_is_moving", "kind_int": 1, "kind_str": "Kind.normal", }, { "component_name": "readback", "obj_name": device_name, "kind_int": 5, "kind_str": "Kind.hinted", }, { "component_name": "velocity", "obj_name": f"{device_name}_velocity", "kind_int": 2, "kind_str": "Kind.config", }, { "component_name": "acceleration", "obj_name": f"{device_name}_acceleration", "kind_int": 2, "kind_str": "Kind.config", }, { "component_name": "high_limit_travel", "obj_name": f"{device_name}_high_limit_travel", "kind_int": 2, "kind_str": "Kind.config", }, { "component_name": "low_limit_travel", "obj_name": f"{device_name}_low_limit_travel", "kind_int": 2, "kind_str": "Kind.config", }, { "component_name": "unused", "obj_name": f"{device_name}_unused", "kind_int": 0, "kind_str": "Kind.omitted", }, ] elif device_base_class == "signal": signals = [ { "component_name": "readback", "obj_name": device_name, "kind_int": 5, "kind_str": "Kind.hinted", }, { "component_name": "velocity", "obj_name": f"{device_name}_velocity", "kind_int": 2, "kind_str": "Kind.config", }, { "component_name": "acceleration", "obj_name": f"{device_name}_acceleration", "kind_int": 2, "kind_str": "Kind.config", }, { "component_name": "high_limit_travel", "obj_name": f"{device_name}_high_limit_travel", "kind_int": 2, "kind_str": "Kind.config", }, { "component_name": "low_limit_travel", "obj_name": f"{device_name}_low_limit_travel", "kind_int": 2, "kind_str": "Kind.config", }, { "component_name": "unused", "obj_name": f"{device_name}_unused", "kind_int": 0, "kind_str": "Kind.omitted", }, ] dev_info = { "device_name": device_name, "device_info": { "device_dotted_name": device_name, "device_attr_name": device_name, "device_base_class": device_base_class, "signals": signals, }, "custom_user_acces": {}, } return messages.DeviceInfoMessage(device=device_name, info=dev_info, metadata={})
[docs] class DMClientMock(DeviceManagerBase): def _get_device_info(self, device_name) -> messages.DeviceInfoMessage: return get_device_info_mock(device_name, self.get_device(device_name)["deviceClass"]) def get_device(self, device_name): for dev in self._session["devices"]: if dev["name"] == device_name: return dev
[docs] class PipelineMock: # pragma: no cover _pipe_buffer = [] _connector = None def __init__(self, connector) -> None: self._connector = connector def execute(self): if not self._connector.store_data: self._pipe_buffer = [] return [] res = [ getattr(self._connector, method)(*args, **kwargs) for method, args, kwargs in self._pipe_buffer ] self._pipe_buffer = [] return res
[docs] class SignalMock: # pragma: no cover def __init__(self) -> None: self.is_set = False def set(self): self.is_set = True
[docs] class ConnectorMock(ConnectorBase): # pragma: no cover def __init__(self, bootstrap_server="localhost:0000", store_data=True): super().__init__(bootstrap_server) self.message_sent = [] self._get_buffer = {} self.store_data = store_data def raise_alarm( self, severity: Alarms, alarm_type: str, source: str, msg: dict, metadata: dict ): pass def log_error(self, *args, **kwargs): pass
[docs] def shutdown(self): pass
[docs] def register(self, *args, **kwargs): pass
[docs] def unregister(self, *args, **kwargs): pass
[docs] def poll_messages(self, *args, **kwargs): pass
[docs] def keys(self, *args, **kwargs): return []
[docs] def set(self, topic, msg, pipe=None, expire: int = None): if pipe: pipe._pipe_buffer.append(("set", (topic.endpoint, msg), {"expire": expire})) return self.message_sent.append({"queue": topic, "msg": msg, "expire": expire})
[docs] def raw_send(self, topic, msg, pipe=None): if pipe: pipe._pipe_buffer.append(("send", (topic.endpoint, msg), {})) return self.message_sent.append({"queue": topic, "msg": msg})
[docs] def send(self, topic, msg, pipe=None): if not isinstance(msg, messages.BECMessage): raise TypeError("Message must be a BECMessage") return self.raw_send(topic, msg, pipe)
[docs] def set_and_publish(self, topic, msg, pipe=None, expire: int = None): if pipe: pipe._pipe_buffer.append(("set_and_publish", (topic.endpoint, msg), {"expire": expire})) return self.message_sent.append({"queue": topic, "msg": msg, "expire": expire})
[docs] def lpush(self, topic, msg, pipe=None, max_size=None): if pipe: pipe._pipe_buffer.append(("lpush", (topic, msg), {})) return
[docs] def rpush(self, topic, msg, pipe=None): if pipe: pipe._pipe_buffer.append(("rpush", (topic, msg), {})) return pass
[docs] def lrange(self, topic, start, stop, pipe=None): if pipe: pipe._pipe_buffer.append(("lrange", (topic, start, stop), {})) return return []
[docs] def get(self, topic, pipe=None): if isinstance(topic, EndpointInfo): topic = topic.endpoint if pipe: pipe._pipe_buffer.append(("get", (topic,), {})) return val = self._get_buffer.get(topic) if isinstance(val, list): return val.pop(0) self._get_buffer.pop(topic, None) return val
[docs] def pipeline(self): return PipelineMock(self)
[docs] def delete(self, topic, pipe=None): if pipe: pipe._pipe_buffer.append(("delete", (topic,), {})) return
[docs] def lset(self, topic: str, index: int, msgs: str, pipe=None) -> None: if pipe: pipe._pipe_buffer.append(("lrange", (topic, index, msgs), {})) return
def producer(self): return self
[docs] def execute_pipeline(self, pipeline): pipeline.execute()
[docs] def xadd(self, topic, msg_dict, max_size=None, pipe=None, expire: int = None): if pipe: pipe._pipe_buffer.append(("xadd", (topic, msg_dict), {"expire": expire})) return pass
[docs] def xread(self, topic, id=None, count=None, block=None, pipe=None, from_start=False): if pipe: pipe._pipe_buffer.append( ("xread", (topic, id, count, block), {"from_start": from_start}) ) return return []
[docs] def xrange(self, topic, min="-", max="+", pipe=None): if pipe: pipe._pipe_buffer.append(("xrange", (topic, min, max), {})) return return []