Source code for bec_lib.messages

import enum
import time
import warnings
from copy import deepcopy
from typing import Any, ClassVar, Literal

import numpy as np
from pydantic import BaseModel, Field, field_validator, model_validator


[docs] class BECStatus(enum.Enum): """BEC status enum""" RUNNING = 2 BUSY = 1 IDLE = 0 ERROR = -1
[docs] class BECMessage(BaseModel): """Base Model class for BEC Messages Args: msg_type (str): ClassVar for the message type, subclasses should override this. metadata (dict, optional): Optional dictionary with metadata for the BECMessage """ msg_type: ClassVar[str] metadata: dict | None = Field(default_factory=dict)
[docs] @field_validator("metadata") @classmethod def check_metadata(cls, v): """Validate the metadata, return empty dict if None Args: v (dict, None): Metadata dictionary """ if v is None: return {} return v
@property def content(self): """Return the content of the message""" content = self.__dict__.copy() content.pop("metadata", None) return content def __eq__(self, other): if not isinstance(other, BECMessage): # don't attempt to compare against unrelated types return False try: np.testing.assert_equal(self.model_dump(), other.model_dump()) except AssertionError: return False return self.msg_type == other.msg_type and self.metadata == other.metadata def loads(self): warnings.warn( "BECMessage.loads() is deprecated and should not be used anymore. When calling Connector methods, it can be omitted. When a message needs to be deserialized call the appropriate function from bec_lib.serialization", FutureWarning, ) return self def dumps(self): warnings.warn( "BECMessage.dumps() is deprecated and should not be used anymore. When calling Connector methods, it can be omitted. When a message needs to be serialized call the appropriate function from bec_lib.serialization", FutureWarning, ) return self
[docs] class BundleMessage(BECMessage): """Message type to send a bundle of BECMessages. Used to bundle together various messages, i.e. used to emit data in the scan bundler. Args: messages (list): List of BECMessage objects that are bundled together metadata (dict, optional): Additional metadata to describe the scan Examples: >>> BundleMessage(messages=[ScanQueueMessage(...), ScanStatusMessage(...)], metadata = {...}) """ msg_type: ClassVar[str] = "bundle_message" messages: list = Field(default_factory=list[BECMessage])
[docs] def append(self, msg: BECMessage): """Append a new BECMessage to the bundle""" if not isinstance(msg, BECMessage): raise AttributeError(f"Cannot append message of type {msg.__class__.__name__}") # pylint: disable=no-member self.messages.append(msg)
def __len__(self): return len(self.messages) def __iter__(self): # pylint: disable=not-an-iterable yield from self.messages
[docs] class ScanQueueMessage(BECMessage): """Message type for sending scan requests to the scan queue Sent by the API server / user to the scan_queue topic. It will be consumed by the scan server. Args: scan_type (str): one of the registered scan types; either rpc calls or scan types defined in the scan server parameter (dict): required parameters for the given scan_stype queue (str): either "primary" or "interception" metadata (dict, optional): additional metadata to describe the scan Examples: >>> ScanQueueMessage(scan_type="dscan", parameter={"motor1": "samx", "from_m1:": -5, "to_m1": 5, "steps_m1": 10, "motor2": "samy", "from_m2": -5, "to_m2": 5, "steps_m2": 10, "exp_time": 0.1}) """ msg_type: ClassVar[str] = "scan_queue_message" scan_type: str parameter: dict queue: str = Field(default="primary")
[docs] class ScanQueueHistoryMessage(BECMessage): """Sent after removal from the active queue. Contains information about the scan. Called by the ScanWorker after processing the QueueInstructionItem. It can be checked by any service. Args: status (str): Current scan status queue_id (str): Unique queue ID info (dict): Dictionary containing additional information about the scan queue (str): Defaults to "primary" queue. Information about the queue the scan was in. metadata (dict, optional): Additional metadata to describe the scan Examples: >>> ScanQueueHistoryMessage(status="open", queue_id="1234", info={"positions": {"samx": 0.5, "samy": 0.5}}) """ msg_type: ClassVar[str] = "queue_history" status: str queue_id: str info: dict queue: str = Field(default="primary")
[docs] class ScanStatusMessage(BECMessage): """Message type for sending scan status updates. Args: scan_id (str): Unique scan ID status (Literal["open", "paused", "aborted", "halted", "closed"]) : Current scan status info (dict): Dictionary containing additional information about the scan timestamp (float, optional): Timestamp of the scan status update. If None, the current time is used. metadata (dict, optional): Additional metadata to describe and identify the scan. Examples: >>> ScanStatusMessage(scan_id="1234", status="open", info={"positions": {"samx": 0.5, "samy": 0.5}}) """ msg_type: ClassVar[str] = "scan_status" scan_id: str | None status: Literal["open", "paused", "aborted", "halted", "closed"] info: dict timestamp: float = Field(default_factory=time.time) def __str__(self): content = deepcopy(self.__dict__) if content["info"].get("positions"): content["info"]["positions"] = "..." return f"{self.__class__.__name__}({content, self.metadata}))"
[docs] class ScanQueueModificationMessage(BECMessage): """Message type for sending scan queue modifications Args: scan_id (str): Unique scan ID action (str): One of the actions defined in ACTIONS: ("pause", "deferred_pause", "continue", "abort", "clear", "restart", "halt", "resume") parameter (dict): Additional parameters for the action queue (str): Defaults to "primary" queue. The name of the queue that receives the modification. metadata (dict, optional): Additional metadata to describe and identify the scan. Examples: >>> ScanQueueModificationMessage(scan_id=scan_id, action="abort", parameter={}) """ msg_type: ClassVar[str] = "scan_queue_modification" scan_id: str | list[str] | None action: Literal[ "pause", "deferred_pause", "continue", "abort", "clear", "restart", "halt", "resume" ] parameter: dict queue: str = Field(default="primary")
[docs] class ScanQueueStatusMessage(BECMessage): """Message type for sending scan queue status updates Args: queue (dict): Dictionary containing the current queue status. Must contain a "primary" key. metadata (dict, optional): Additional metadata to describe and identify the ScanQueueStatus. Examples: >>> ScanQueueStatusMessage(queue={"primary": {}}, metadata={"RID": "1234"}) """ msg_type: ClassVar[str] = "scan_queue_status" queue: dict
[docs] @field_validator("queue") @classmethod def check_queue(cls, v): """Validate the queue""" if not isinstance(v, dict): raise ValueError(f"Invalid queue {v}. Must be a dictionary") if "primary" not in v: raise ValueError(f"Invalid queue {v}. Must contain a 'primary' key") return v
[docs] class ClientInfoMessage(BECMessage): """Message type for sending information to the client Args: message (str): message to the client show_asap (bool, optional): True if the message should be shown immediately. Defaults to True # Note: The option show_asap = True/False is temporary disabled until a decision is made on how to handle it. TODO #286 RID (str, optional): Request ID forwarded from the service, if available will be used to filter on the client site. Defaults to None. source (str, Literal[ "bec_ipython_client", "scan_server", "device_server", "scan_bundler", "file_writer", "scihub", "dap", None] : Source of the message. Defaults to None. scope (str, optional): Scope of the message; Defaults to None. One can follow a pattern to filter afterwards for specific client info; e.g. "scan", "rotation" severity (int, optional): severity level of the message (0: INFO, 1: WARNING, 2: ERROR); Defaults to 0 """ msg_type: ClassVar[str] = "client_info" message: str show_asap: bool = Field(default=True) RID: str | None = Field(default=None) source: Literal[ "bec_ipython_client", "scan_server", "device_server", "scan_bundler", "file_writer", "scihub", "dap", None, ] = Field(default=None) scope: str | None = Field(default=None) severity: int = Field( default=0 ) # TODO add enum for severity levels INFO = 0, WARNING = 1, ERROR = 2
[docs] class RequestResponseMessage(BECMessage): """Message type for sending back decisions on the acceptance of requests Args: accepted (bool): True if the request was accepted message (str, dict, optional): String or dictionary describing the decision, e.g. "Invalid request" metadata (dict, optional): Additional metadata, defaults to None. Examples: >>> RequestResponseMessage(accepted=True, message="Request accepted") """ msg_type: ClassVar[str] = "request_response" accepted: bool message: str | dict | None = Field(default=None)
[docs] class DeviceInstructionMessage(BECMessage): """Message type for sending device instructions to the device server Args: device (str, list[str], None): Device name, list of device names or None action (Literal[ "rpc", "set", "read", "kickoff", "complete", "trigger", "stage", "unstage", "pre_scan", "wait", "scan_report_instruction", "open_scan", "baseline_reading", "close_scan", "open_scan_def", "close_scan_def", "publish_data_as_read", "close_scan_group", ]) : Device action, note rpc calls can run any method of the device. The function name needs to be specified in parameters['func'] parameter (dict): Parameters required for the device action metadata (dict, optional): Metadata to describe the conditions of the device instruction Examples: >>> DeviceInstructionMessage(device="samx", action="stage", parameter={}) """ msg_type: ClassVar[str] = "device_instruction" device: str | list[str] | None action: Literal[ "rpc", "set", "read", "kickoff", "complete", "trigger", "stage", "unstage", "pre_scan", "wait", "scan_report_instruction", "open_scan", "baseline_reading", "close_scan", "open_scan_def", "close_scan_def", "publish_data_as_read", "close_scan_group", ] parameter: dict
[docs] class DeviceMessage(BECMessage): """Message type for sending device readings from the device server Args: signals (dict): Dictionary containing the device signals and their values metadata (dict, optional): Metadata to describe the conditions of the device reading Examples: >>> BECMessage.DeviceMessage(signals={'samx': {'value': 14.999033949016491, 'timestamp': 1686385306.0265112}, 'samx_setpoint': {'value': 15.0, 'timestamp': 1686385306.016806}, 'samx_motor_is_moving': {'value': 0, 'timestamp': 1686385306.026888}}}, metadata={'stream': 'primary', 'DIID': 353, 'RID': 'd3471acc-309d-43b7-8ff8-f986c3fdecf1', 'point_id': 49, 'scan_id': '8e234698-358e-402d-a272-73e168a72f66', 'queue_id': '7a232746-6c90-44f5-81f5-74ab0ea22d4a'}) """ msg_type: ClassVar[str] = "device_message" signals: dict[str, dict[Literal["value", "timestamp"], Any]]
[docs] class DeviceRPCMessage(BECMessage): """Message type for sending device RPC return values from the device server Args: device (str): Device name. return_val (Any): Return value of the RPC call. out (str or dict): Output of the RPC call. success (bool, optional): True if the RPC call was successful. Defaults to True. metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "device_rpc_message" device: str return_val: Any out: str | dict success: bool = Field(default=True)
[docs] class DeviceStatusMessage(BECMessage): """Message type for sending device status updates from the device server Args: device (str): Device name. status (int): Device status. metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "device_status_message" device: str status: int
[docs] class DeviceReqStatusMessage(BECMessage): """Message type for sending device request status updates from the device server Args: device (str): Device name. success (bool): True if the request was successful. metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "device_req_status_message" device: str success: bool
[docs] class DeviceInfoMessage(BECMessage): """Message type for sending device info updates from the device server Args: device (str): Device name. info (dict): Device info as a dictionary. metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "device_info_message" device: str info: dict
[docs] class DeviceMonitorMessage(BECMessage): """Message type for sending device monitor updates from the device server. The message is send from the device_server to monitor data coming from larger detector. Args: device (str): Device name. data (np.ndarray): Numpy array data from the monitor metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "device_monitor_message" device: str data: np.ndarray class Config: arbitrary_types_allowed = True
[docs] class ScanMessage(BECMessage): """Message type for sending scan segment data from the scan bundler Args: point_id (int): Point ID from the scan segment. scan_id (str): Scan ID. data (dict): Scan segment data. metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "scan_message" point_id: int scan_id: str data: dict
[docs] class ScanBaselineMessage(BECMessage): """Message type for sending scan baseline data from the scan bundler Args: scan_id (str): Scan ID. data (dict): Scan baseline data. metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "scan_baseline_message" scan_id: str data: dict
[docs] class DeviceConfigMessage(BECMessage): """Message type for sending device config updates Args: action (Literal['add', 'set', 'update', 'reload', or 'remove'] : Update of the device config. config (dict, or None): Device config (add, set, update) or None (reload). metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "device_config_message" action: Literal["add", "set", "update", "reload", "remove"] = Field( default=None, validate_default=True ) config: dict | None = Field(default=None)
[docs] @model_validator(mode="after") @classmethod def check_config(cls, values): """Validate the config""" if values.action in ["add", "set", "update"] and not values.config: raise ValueError(f"Invalid config {values.config}. Must be a dictionary") return values
[docs] class LogMessage(BECMessage): """Log message Args: log_type (Literal["trace", "debug", "info", "success", "warning", "error", "critical", "console_log"]) : Log type. log_msg (dict or str): Log message. metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "log_message" log_type: Literal[ "trace", "debug", "info", "success", "warning", "error", "critical", "console_log" ] log_msg: dict | str
[docs] class AlarmMessage(BECMessage): """Alarm message Args: severity (Alarms, Literal[0,1,2]): Severity level (0-2). ALARMS.WARNING = 0, ALARMS.MINOR = 1, ALARMS.MAJOR = 2 alarm_type (str): Type of alarm. source (dict): Source of the problem. msg (str): Problem description. metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "alarm_message" severity: int # TODO change once enums moved to a separate class alarm_type: str source: dict msg: str
[docs] class StatusMessage(BECMessage): """Status message Args: name (str): Name of the status. status (BECStatus): Value of the BECStatus enum (RUNNING = 2, BUSY = 1, IDLE = 0, ERROR = -1). info (dict): Status info. metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "status_message" name: str status: BECStatus info: dict
[docs] class FileMessage(BECMessage): """File message to inform about the status of a file writing operation Args: file_path (str): Path to the file. done (bool, optional): True if the file writing operation is done. Defaults to True. successful (bool, optional): True if the file writing operation was successful. Defaults to True. metadata (dict, optional): Additional metadata. Defaults to None. """ msg_type: ClassVar[str] = "file_message" file_path: str done: bool = Field(default=True) successful: bool = Field(default=True)
[docs] class FileContentMessage(BECMessage): """File content message to inform about the content of a file Args: file_path (str): Path to the file. data (str): Content of the file. metadata (dict, optional): Status metadata. Defaults to None. """ msg_type: ClassVar[str] = "file_content_message" file_path: str data: dict
[docs] class VariableMessage(BECMessage): """Message to inform about a global variable Args: value (Any): Variable value, can be of any type. metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "var_message" value: Any
[docs] class ObserverMessage(BECMessage): """Message for observer updates Args: observer (list[dict]): List of observer descriptions (dictionaries). metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "observer_message" observer: list[dict]
[docs] class ServiceMetricMessage(BECMessage): """Message for service metrics Args: name (str): Name of the service. metrics (dict): Dictionary with service metrics. metadata (dict, optional): Additional metadata. """ msg_type: ClassVar[str] = "service_metric_message" name: str metrics: dict
[docs] class ProcessedDataMessage(BECMessage): """Message for processed data Args: data (dict, list[dict]): Dictionary with processed data or list of dictionaries with processed data. metadata (dict, optional): Metadata. Defaults to None. """ msg_type: ClassVar[str] = "processed_data_message" data: dict | list[dict]
[docs] class DAPConfigMessage(BECMessage): """Message for DAP configuration Args: config (dict): DAP configuration dictionary metadata (dict, optional): Metadata. Defaults to None. """ msg_type: ClassVar[str] = "dap_config_message" config: dict
[docs] class DAPRequestMessage(BECMessage): """Message for DAP requests Args: dap_cls (str): DAP class name dap_type (Literal["continuous", "on_demand"]) : Different types of DAP modes config (dict): DAP configuration metadata (dict, optional): Metadata. Defaults to None. """ msg_type: ClassVar[str] = "dap_request_message" dap_cls: str dap_type: Literal["continuous", "on_demand"] config: dict
[docs] class DAPResponseMessage(BECMessage): """Message for DAP responses Args: success (bool): True if the request was successful data (tuple, optional): DAP data (tuple of data (dict) and metadata). Defaults to ({} , None). error (str, optional): DAP error. Defaults to None. dap_request (BECMessage, None): DAP request. Defaults to None. metadata (dict, optional): Metadata. Defaults to None. """ msg_type: ClassVar[str] = "dap_response_message" success: bool data: tuple | None = Field(default_factory=lambda: ({}, None)) error: str | None = None dap_request: BECMessage | None = Field(default=None)
[docs] class AvailableResourceMessage(BECMessage): """Message for available resources such as scans, data processing plugins etc Args: resource (dict, list[dict]): Resource description metadata (dict, optional): Metadata. Defaults to None. """ msg_type: ClassVar[str] = "available_resource_message" resource: dict | list[dict]
[docs] class ProgressMessage(BECMessage): """Message for communicating the progress of a long running task Args: value (float): Current progress value max_value (float): Maximum progress value done (bool): True if the task is done metadata (dict, optional): Metadata. Defaults to None. """ msg_type: ClassVar[str] = "progress_message" value: float max_value: float done: bool
[docs] class GUIConfigMessage(BECMessage): """Message for GUI configuration Args: config (dict): GUI configuration, check widgets for more details metadata (dict, optional): Metadata. Defaults to None. """ msg_type: ClassVar[str] = "gui_config_message" config: dict
[docs] class GUIDataMessage(BECMessage): """Message for GUI data, i.e. update for DAP processes or scans Args: data (dict): GUI data metadata (dict, optional): Metadata. Defaults to None. """ msg_type: ClassVar[str] = "gui_data_message" data: dict
[docs] class GUIInstructionMessage(BECMessage): """Message for GUI instructions Args: action (str): Instruction to be executed by the GUI metadata (dict, optional): Metadata. Defaults to None. """ msg_type: ClassVar[str] = "gui_instruction_message" action: str parameter: dict
[docs] class ServiceResponseMessage(BECMessage): """Message for service responses Args: response (dict): Service response metadata (dict, optional): Metadata. Defaults to None. """ msg_type: ClassVar[str] = "service_response_message" response: dict
[docs] class CredentialsMessage(BECMessage): """Message for credentials Args: credentials (dict): Credentials metadata (dict, optional): Metadata. Defaults to None. """ msg_type: ClassVar[str] = "credentials_message" credentials: dict