Source code for bec_lib.async_data

"""
This module contains the AsyncDataHandler class which is used to receive and store async device data from the BEC.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

import numpy as np

from bec_lib.endpoints import MessageEndpoints

if TYPE_CHECKING:
    from bec_lib import messages
    from bec_lib.connector import ConnectorBase


[docs] class AsyncDataHandler: def __init__(self, connector: ConnectorBase): self.connector = connector
[docs] def get_async_data_for_scan(self, scan_id: str) -> dict[list]: """ Get the async data for a given scan. Args: scan_id(str): the scan id to get the async data for Returns: dict[list]: the async data for the scan sorted by device name """ async_device_keys = self.connector.keys( MessageEndpoints.device_async_readback(scan_id, "*") ) async_data = {} for device_key in async_device_keys: key = device_key.decode() device_name = key.split(MessageEndpoints.device_async_readback(scan_id, "").endpoint)[ -1 ].split(":")[0] data = self.get_async_data_for_device(scan_id, device_name) if not data: continue async_data[device_name] = data return async_data
[docs] def get_async_data_for_device(self, scan_id: str, device_name: str) -> list: """ Get the async data for a given device in a scan. Args: scan_id(str): the scan id to get the async data for device_name(str): the device name to get the async data for Returns: list: the async data for the device """ key = MessageEndpoints.device_async_readback(scan_id, device_name) msgs = self.connector.xrange(key, min="-", max="+") if not msgs: return [] return self.process_async_data(msgs)
[docs] @staticmethod def process_async_data(msgs: list[messages.DeviceMessage]) -> dict | list[dict]: """ Process the async data. Args: device_name(str): the name of the device msgs(list[messages.DeviceMessage]): the async data to process Returns: list: the processed async data """ concat_type = None data = [] async_data = {} for msg in msgs: msg = msg["data"] if not concat_type: concat_type = msg.metadata.get("async_update", "append") data.append(msg.content["signals"]) if len(data) == 1: async_data = data[0] return async_data if concat_type == "extend": # concatenate the dictionaries for signal in data[0].keys(): async_data[signal] = {} for key in data[0][signal].keys(): if hasattr(data[0][signal][key], "__iter__"): async_data[signal][key] = np.concatenate([d[signal][key] for d in data]) else: async_data[signal][key] = [d[signal][key] for d in data] return async_data if concat_type == "append": # concatenate the lists for key in data[0].keys(): async_data[key] = [] for d in data: async_data[key].append(d[key]) return async_data if concat_type == "replace": # replace the dictionaries async_data = data[-1] return async_data raise ValueError(f"Unknown async update type: {concat_type}")