Source code for bec_lib.scan_report

"""
Scan Report class that provides a convenient way to access the status of a scan request. It is typically
the return value of a scan request. 
"""

from __future__ import annotations

import time
from math import inf
from typing import TYPE_CHECKING

from bec_lib import messages
from bec_lib.bec_errors import ScanAbortion
from bec_lib.endpoints import MessageEndpoints

if TYPE_CHECKING:
    from bec_lib.client import BECClient
    from bec_lib.queue_items import QueueItem


[docs] class ScanReport: """Scan Report class that provides a convenient way to access the status of a scan request.""" def __init__(self) -> None: self._client = None self.request = None self._queue_item = None
[docs] @classmethod def from_request( cls, request: messages.ScanQueueMessage, client: BECClient = None ) -> ScanReport: """ Create a ScanReport from a request Args: request (messages.ScanQueueMessage): request to create the report from client (BECClient, optional): BECClient instance. Defaults to None. Returns: ScanReport: ScanReport instance """ scan_report = cls() scan_report._client = client client.queue.request_storage.update_with_request(request) scan_report.request = client.queue.request_storage.find_request_by_ID( request.metadata["RID"] ) return scan_report
@property def scan(self): """get the scan item""" return self.request.scan @property def status(self): """returns the current status of the request""" scan_type = self.request.request.content["scan_type"] status = self.queue_item.status if scan_type == "mv" and status == "COMPLETED": return "COMPLETED" if self._get_mv_status() else "RUNNING" return self.queue_item.status @property def queue_item(self): """get the queue item""" if not self._queue_item: self._queue_item = self._get_queue_item(timeout=10) return self._queue_item def _get_queue_item(self, timeout=None) -> QueueItem: """ get the queue item from the queue storage Args: timeout (float, optional): timeout in seconds. Defaults to None. """ timeout = timeout if timeout is not None else inf queue_item = None elapsed_time = 0 sleep_time = 0.1 while not queue_item: queue_item = self._client.queue.queue_storage.find_queue_item_by_requestID( self.request.requestID ) elapsed_time += sleep_time time.sleep(sleep_time) if elapsed_time > timeout: raise TimeoutError return queue_item def _get_mv_status(self) -> bool: """get the status of a move request""" motors = list(self.request.request.content["parameter"]["args"].keys()) request_status = self._client.device_manager.connector.lrange( MessageEndpoints.device_req_status_container(self.request.requestID), 0, -1 ) if len(request_status) == len(motors): return True return False
[docs] def wait(self, timeout: float = None) -> ScanReport: """ wait for the request to complete Args: timeout (float, optional): timeout in seconds. Defaults to None. Raises: TimeoutError: if the timeout is reached Returns: ScanReport: ScanReport instance """ sleep_time = 0.1 scan_type = self.request.request.content["scan_type"] try: if scan_type == "mv": self._wait_move(timeout, sleep_time) else: self._wait_scan(timeout, sleep_time) except KeyboardInterrupt as exc: self._client.queue.request_scan_abortion() raise ScanAbortion("Aborted by user.") from exc return self
def _check_timeout(self, timeout: float = None, elapsed_time: float = 0) -> None: """ check if the timeout is reached Args: timeout (float, optional): timeout in seconds. Defaults to None. elapsed_time (float, optional): elapsed time in seconds. Defaults to 0. """ if timeout is None: return if elapsed_time > timeout: raise TimeoutError( f"Timeout reached while waiting for request to complete. Timeout: {timeout} s." ) def _wait_move(self, timeout: float = None, sleep_time: float = 0.1) -> None: """ wait for a move request to complete Args: timeout (float, optional): timeout in seconds. Defaults to None. sleep_time (float, optional): sleep time in seconds. Defaults to 0.1. """ elapsed_time = 0 while True: if self._get_mv_status(): break self._client.alarm_handler.raise_alarms() time.sleep(sleep_time) elapsed_time += sleep_time self._check_timeout(timeout, elapsed_time) def _wait_scan(self, timeout: float = None, sleep_time: float = 0.1) -> None: """ wait for a scan request to complete Args: timeout (float, optional): timeout in seconds. Defaults to None. sleep_time (float, optional): sleep time in seconds. Defaults to 0.1. """ elapsed_time = 0 while True: if self.status == "COMPLETED": break if self.status == "STOPPED": raise ScanAbortion self._client.callbacks.poll() time.sleep(sleep_time) elapsed_time += sleep_time self._check_timeout(timeout, elapsed_time) def __str__(self) -> str: separator = "--" * 10 details = f"\tStatus: {self.status}\n" if self.scan: details += self.scan.describe() return f"ScanReport:\n{separator}\n{details}"