Source code for bec_lib.observer

"""
This module provides the Observer class and the ObserverManager class. The Observer class is used
to define an observer object, which is used to monitor a device for a specific condition. The
ObserverManager class is used to manage the observer objects.
"""

from __future__ import annotations

import enum
from typing import TYPE_CHECKING

from typeguard import typechecked

from bec_lib.device import Device
from bec_lib.endpoints import MessageEndpoints

if TYPE_CHECKING:
    from bec_lib.devicemanager import DeviceManagerBase


[docs] class ObserverAction(str, enum.Enum): PAUSE = "pause" RESUME = "resume" RESTART = "restart"
[docs] class Observer: def __init__( self, name: str, device: str, on_trigger: ObserverAction = None, on_resume: ObserverAction = None, limits: list = None, low_limit: float = None, high_limit: float = None, target_value=None, ): self.name = name self.device = device self.on_trigger = on_trigger self.on_resume = on_resume self._limits = [None, None] self._check_limits(limits, low_limit, high_limit) self.target_value = target_value self._enabled = False self._check_device() self._check_trigger() self._check_targets() @property def limits(self): return self._limits @limits.setter @typechecked def limits(self, val: list): self._limits = val @property def low_limit(self): return self.limits[0] @low_limit.setter @typechecked def low_limit(self, val: float): self.limits[0] = val @property def high_limit(self): return self.limits[1] @high_limit.setter @typechecked def high_limit(self, val: float): self.limits[1] = val @property def enabled(self) -> bool: return self._enabled @enabled.setter def enabled(self, val: bool): self._enabled = val def _check_limits(self, limits, low_limit, high_limit): if limits is not None and (low_limit is not None or high_limit is not None): raise AttributeError( "Ambiguous condition: Limits are set multiple times. Use either limits or" " low_limit/high_limit." ) if limits is not None: self.limits = limits if low_limit is not None: self.low_limit = low_limit if high_limit is not None: self.high_limit = high_limit def _check_device(self): if isinstance(self.device, Device): self.device = self.device.name def _check_trigger(self): if not isinstance(self.on_trigger, ObserverAction): self.on_trigger = ObserverAction(self.on_trigger) if not isinstance(self.on_resume, ObserverAction): self.on_resume = ObserverAction(self.on_resume) def _check_targets(self): if self.limits is not None and self.target_value is not None: raise AttributeError( "Ambiguous condition: Only one target (limits, target_value) can be set." ) if all(val is None for val in self.limits) and self.target_value is None: raise AttributeError("No condition set.") def to_dict(self) -> dict: return { "name": self.name, "device": self.device, "on_trigger": self.on_trigger, "on_resume": self.on_resume, "limits": self.limits, "target_value": self.target_value, } @classmethod def from_dict(cls, config: dict) -> Observer: return cls(**config)
[docs] class ObserverManager: def __init__(self, device_manager: DeviceManagerBase): self.device_manager = device_manager self._observer = self._get_installed_observer() @typechecked def add_observer(self, observer: Observer, ignore_existing: bool = False): if not hasattr(self.device_manager.devices, observer.device): AttributeError( f"The specified observer uses device {observer.device} which is currently not" " configured." ) if self._is_device_observed(observer.device) and not ignore_existing: raise AttributeError( f"Device {observer.device} is already being observed. If you want to add an" " additional observer for this device, use 'ignore_existing=True'." ) self._observer.append(observer) self.update_observer() def _is_device_observed(self, device: str) -> bool: return any(obs.device == device for obs in self._observer) def _get_installed_observer(self): # get current observer list from Redis observer_msg = self.device_manager.connector.get(MessageEndpoints.observer()) if observer_msg is None: return [] return [Observer.from_dict(obs) for obs in observer_msg.content["observer"]] def update_observer(self): # send the current observer list to MongoDB and Redis pass def list_observer(self): pass