Source code for bec_lib.bl_checks

"""
This module provides the BeamlineChecks class, which is used to perform beamline checks. It also provides the bl_check
decorator. 
"""

import builtins
import datetime
import functools
import threading
import time
from collections import deque
from uuid import uuid4

from typeguard import typechecked

from bec_lib.bl_conditions import BeamlineCondition
from bec_lib.logger import bec_logger

logger = bec_logger.logger


[docs] class BeamlineCheckError(Exception): pass
[docs] class BeamlineCheckRepeat(Exception): pass
[docs] def bl_check(fcn): """Decorator to perform rpc calls.""" @functools.wraps(fcn) def bl_check_wrapper(*args, **kwargs): client = builtins.__dict__.get("bec") bl_checks = client.bl_checks _run_with_bl_checks(bl_checks, fcn, *args, **kwargs) return bl_check_wrapper
def _run_with_bl_checks(bl_checks, fcn, *args, **kwargs): # pylint: disable=protected-access chk = {"id": str(uuid4()), "fcn": fcn, "args": args, "kwargs": kwargs} bl_checks._levels.append(chk) nested_call = len(bl_checks._levels) > 1 if bl_checks._is_paused and bl_checks._beamline_checks: logger.warning( "Beamline checks are currently paused. Use `bec.bl_checks.resume()` to reactivate them." ) try: if nested_call: # check if the beam was okay so far if not bl_checks.beam_is_okay: raise BeamlineCheckError("Beam is not okay.") else: bl_checks.reset() bl_checks.wait_for_beamline_checks() successful = False while not successful: try: successful, res = _run_on_failure(bl_checks, fcn, *args, **kwargs) if not bl_checks.beam_is_okay: successful = False bl_checks.wait_for_beamline_checks() except BeamlineCheckRepeat: successful = False return res finally: bl_checks._levels.pop() def _run_on_failure(bl_checks, fcn, *args, **kwargs) -> tuple: try: res = fcn(*args, **kwargs) return (True, res) except BeamlineCheckError: bl_checks.wait_for_beamline_checks() return (False, None)
[docs] class BeamlineChecks: def __init__(self, client, *args, **kwargs): super().__init__(*args, **kwargs) self.client = client self.send_to_scilog = True self._beam_is_okay = True self._beamline_checks = {} self._stop_beam_check_event = threading.Event() self._beam_check_thread = None self._started = False self._is_paused = False self._check_msgs = [] self._levels = deque()
[docs] @typechecked def register(self, check: BeamlineCondition): """ Register a beamline check. Args: check (BeamlineCondition): The beamline check to register. """ self._beamline_checks[check.name] = check setattr(self, check.name, check)
[docs] def pause(self) -> None: """ Pause beamline checks. This will disable all checks. Use `resume` to reactivate the checks. """ self._is_paused = True
[docs] def resume(self) -> None: """ Resume all paused beamline checks. """ self._is_paused = False
[docs] def available_checks(self) -> None: """ Print all available beamline checks """ for name, check in self._beamline_checks.items(): enabled = f"ENABLED: {check.enabled}" print(f"{name:<20} {enabled}")
[docs] def disable_check(self, name: str) -> None: """ Disable a beamline check. Args: name (str): The name of the beamline check to disable. """ if name not in self._beamline_checks: raise ValueError(f"Beamline check {name} not registered.") self._beamline_checks[name].enabled = False
[docs] def enable_check(self, name: str) -> None: """ Enable a beamline check. Args: name (str): The name of the beamline check to enable. """ if name not in self._beamline_checks: raise ValueError(f"Beamline check {name} not registered.") self._beamline_checks[name].enabled = True
[docs] def disable_all_checks(self) -> None: """ Disable all beamline checks. """ for name in self._beamline_checks: self.disable_check(name)
[docs] def enable_all_checks(self) -> None: """ Enable all beamline checks. """ for name in self._beamline_checks: self.enable_check(name)
def _run_beamline_checks(self): msgs = [] for name, check in self._beamline_checks.items(): if not check.enabled: continue if check.run(): continue msgs.append(check.on_failure_msg()) self._beam_is_okay = False return msgs def _check_beam(self): while not self._stop_beam_check_event.is_set(): self._check_msgs = self._run_beamline_checks() time.sleep(1)
[docs] def start(self): """Start the beamline checks.""" if self._started: return self._beam_is_okay = True self._beam_check_thread = threading.Thread(target=self._check_beam, daemon=True) self._beam_check_thread.start() self._started = True
[docs] def stop(self): """Stop the beamline checks""" if not self._started: return self._stop_beam_check_event.set() self._beam_check_thread.join()
def reset(self): self._beam_is_okay = True self._check_msgs = [] @property def beam_is_okay(self): return self._beam_is_okay def _print_beamline_checks(self): for msg in self._check_msgs: logger.warning(msg) def wait_for_beamline_checks(self): self._print_beamline_checks() if self.send_to_scilog and not self.beam_is_okay: self._send_to_scilog( f"Beamline checks failed at {str(datetime.datetime.now())}: {''.join(self._check_msgs)}", pen="red", ) self._run_beamline_checks_until_okay() if self.send_to_scilog: self._send_to_scilog( f"Operation resumed at {str(datetime.datetime.now())}.", pen="green" ) def _run_beamline_checks_until_okay(self): while True: if self._beam_status_is_okay(): break self._print_beamline_checks() time.sleep(5) def _beam_status_is_okay(self) -> bool: self._beam_is_okay = True self._check_msgs = self._run_beamline_checks() return self._beam_is_okay def _send_to_scilog(self, msg, pen="red"): try: msg = self.client.logbook.LogbookMessage() msg.add_text(f"<p><mark class='pen-{pen}'><strong>{msg}</strong></mark></p>").add_tag( ["BEC", "beam_check"] ) self.client.logbook.send_logbook_message(msg) except Exception: logger.warning("Failed to send update to SciLog.")