Source code for bec_lib.utils

"""
Utility functions for the bec_lib package.
"""

from __future__ import annotations

import csv
import datetime
import functools
from collections import defaultdict
from typing import TYPE_CHECKING

from typeguard import typechecked

if TYPE_CHECKING:
    from bec_lib.scan_items import ScanItem
    from bec_lib.scan_report import ScanReport


[docs] class user_access: def __init__(self, meth): """ Decorator to mark <device_class> methods to be accessible from the bec client Args: meth: method to be marked as accessible Examples: >>> @user_access >>> def my_method(self, *args, **kwargs): >>> return fcn(self, *args, **kwargs) """ self.meth = meth def __set_name__(self, owner, name): """Write registered method into the owner class USER_ACCESS list for the bec_ipython_client""" if not hasattr(owner, "USER_ACCESS"): owner.USER_ACCESS = [] if name not in owner.USER_ACCESS: owner.USER_ACCESS.append(name) setattr(owner, name, self.meth)
[docs] def threadlocked(fcn): """Ensure that the thread acquires and releases the lock.""" @functools.wraps(fcn) def wrapper(self, *args, **kwargs): # pylint: disable=protected-access with self._lock: return fcn(self, *args, **kwargs) return wrapper
[docs] @typechecked def scan_to_csv( scan_report: ScanReport | ScanItem | list[ScanReport] | list[ScanItem], output_name: str, delimiter: str = ",", dialect: str | None = None, header: list | None = None, write_metadata: bool = True, ) -> None: """Convert scan data to a csv file. Args: scan_report (ScanReport | ScanItem): ScanReport or ScanItem object(s). Can be a list of ScanReports, ScanItems filename (str): Name of the csv file. delimiter (str, optional): Delimiter for the csv file. Defaults to ",". dialect (str, optional): Argument for csv.Dialect. Defaults to csv.writer default, e.g. 'excel'. Other options 'excel-tab' or 'unix', still takes argument from delimiter, choose delimier='' to omit header (list, optional): Create custom header for the csv file. If None, header is created automatically. Defaults to None. write_metadata (bool, optional): If True, the metadata of the scan will be written to the header of csv file. Defaults to True. Examples: >>> scan_to_csv(scan_report, "./scan.csv") """ if not isinstance(scan_report, list): scan_report = [scan_report] header_out = [] body_out = [] data_output = [] for ii, scan_rep in enumerate(scan_report): if hasattr(scan_rep, "scan"): scan_rep = scan_rep.scan header_out.append(["#ScanNumber", f"{scan_rep.scan_number}"]) header_tmp, body_tmp = _extract_scan_data( scan_item=scan_rep, header=header, write_metadata=write_metadata ) header_out.extend(header_tmp[:-1]) body_out.extend(body_tmp) header_out.append(header_tmp[-1]) # To only append the header keys once data_output.extend(header_out) data_output.extend(body_out) _write_csv(output_name=output_name, delimiter=delimiter, dialect=dialect, output=data_output)
def _write_csv(output_name: str, delimiter: str, output: list, dialect: str = None) -> None: """Write csv file. Args: output_name (str): Name of the csv file. delimiter (str): Delimiter for the csv file. dialect (str): Argument for csv.Dialect. Defaults to None. If no None, delimiter input is ignored. Some input examples 'excel', 'excel-tab' or 'unix' data_dict (dict): Dictionary to be written to csv. Examples: >>> _write_csv("./scan.csv", ",", ["#samx", "bpm4i"], True, scan_dict) """ with open(output_name, "w", encoding="UTF-8") as file: writer = csv.writer(file, delimiter=delimiter, dialect=dialect) writer.writerows(output) def _extract_scan_data( scan_item: ScanItem, header: list = None, write_metadata: bool = True ) -> tuple: """Extract scan data from scan report. Args: scan_item (ScanItem): ScanItem object. header (list, optional): Create custom header for the csv file. If None, header is created automatically. Defaults to None. write_metadata (bool, optional): If True, the metadata of the scan will be written to the header of csv file. Defaults to True. Returns: (tuple): Tuple of header and body of the csv file. """ scan_dict = scan_to_dict(scan_item, flat=True) header_tmp = [] header_tmp.append(["#scan_id", f"{scan_item.scan_id}"]) header_tmp.append(["#ScanStatus", f"{scan_item.status}"]) start_time = f"{datetime.datetime.fromtimestamp(scan_item.start_time).strftime('%c')}" header_tmp.append(["#StartTime", start_time]) end_time = f"{datetime.datetime.fromtimestamp(scan_item.start_time).strftime('%c')}" header_tmp.append(["#EndTime", end_time]) elapsed_time = ( f"\tElapsed time: {(scan_item.end_time-scan_item.start_time):.1f} s\n" if scan_item.end_time and scan_item.start_time else "" ) header_tmp.append(["#ElapsedTime", elapsed_time]) scan_metadata = scan_item.status_message.info if write_metadata: header_tmp.append(["#ScanMetadata"]) for key, value in scan_metadata.items(): header_tmp.append(["".join(["#", key]), value]) if header: header_keys = header else: header_keys = ["scan_number", "dataset_number"] # pylint: disable=expression-not-assigned [ header_keys.extend([f"{value}_value", f"{time}_timestamp"]) for value, time in zip(scan_dict["value"].keys(), scan_dict["timestamp"].keys()) ] header_keys[0] = "".join(["#", header_keys[0]]) header_tmp.append(header_keys) body_tmp = [] num_entries = len(list(scan_dict["value"].values())[0]) for ii in range(num_entries): sub_list = [] sub_list.extend([scan_metadata["scan_number"], scan_metadata["dataset_number"]]) for key in scan_dict["value"]: sub_list.extend([scan_dict["value"][key][ii], scan_dict["timestamp"][key][ii]]) body_tmp.append(sub_list) return header_tmp, body_tmp
[docs] def scan_to_dict(scan_item: ScanItem, flat: bool = True) -> dict: """Convert scan data to a dictionary. Args: scan_item (ScanItem): ScanItem object. flat (bool, optional): If True, the dictionary will be flat. Defaults to True. Returns: (dict): Dictionary of scan data. Examples: >>> scan_to_dict(scan_report) with scan_report = scans.line_scan(...) """ if flat: scan_dict = {"timestamp": defaultdict(lambda: []), "value": defaultdict(lambda: [])} else: scan_dict = { "timestamp": defaultdict(lambda: defaultdict(lambda: [])), "value": defaultdict(lambda: defaultdict(lambda: [])), } for dev, dev_data in scan_item.data.items(): for signal, signal_data in dev_data.items(): if flat: scan_dict["timestamp"][signal] = signal_data["timestamp"] scan_dict["value"][signal] = signal_data["value"] else: scan_dict["timestamp"][dev][signal] = signal_data["timestamp"] scan_dict["value"][dev][signal] = signal_data["value"] return scan_dict