"""
This module contains the ScanItem and ScanStorage classes. The ScanItem class is used to store
information about a scan. The ScanStorage class is used to store scan items.
"""
from __future__ import annotations
import builtins
import datetime
import sys
import threading
import time
from collections import defaultdict, deque
from typing import TYPE_CHECKING
from bec_lib import messages
from bec_lib.async_data import AsyncDataHandler
from bec_lib.logger import bec_logger
from bec_lib.scan_data import ScanData
from bec_lib.utils import threadlocked
if TYPE_CHECKING:
from bec_lib.scan_manager import ScanManager
logger = bec_logger.logger
try:
import pandas as pd
except ImportError:
logger.info("Unable to import `pandas` optional dependency")
[docs]
class ScanItem:
status: dict
# pylint: disable=too-many-arguments
def __init__(
self,
scan_manager: ScanManager,
queue_id: str,
scan_number: list,
scan_id: list,
status: str,
**_kwargs,
) -> None:
self.scan_manager = scan_manager
self._queue_id = queue_id
self.scan_number = scan_number
self.scan_id = scan_id
self.status = status
self.data = ScanData()
self.async_data = {}
self.baseline = ScanData()
self._async_data_handler = AsyncDataHandler(scan_manager.connector)
self.open_scan_defs = set()
self.open_queue_group = None
self.num_points = None
self.start_time = None
self.end_time = None
self.scan_report_instructions = []
self.callbacks = []
self.bec = builtins.__dict__.get("bec")
@property
def queue(self):
"""get the queue item for the current scan item"""
return self.scan_manager.queue_storage.find_queue_item_by_ID(self._queue_id)
def emit_data(self, scan_msg: messages.ScanMessage) -> None:
self.bec.callbacks.run("scan_segment", scan_msg.content, scan_msg.metadata)
self._run_request_callbacks("scan_segment", scan_msg.content, scan_msg.metadata)
def emit_status(self, scan_status: messages.ScanStatusMessage) -> None:
self.bec.callbacks.run("scan_status", scan_status.content, scan_status.metadata)
self._run_request_callbacks("scan_status", scan_status.content, scan_status.metadata)
def _run_request_callbacks(self, event_type: str, data: dict, metadata: dict):
for rid in self.queue.requestIDs:
req = self.scan_manager.request_storage.find_request_by_ID(rid)
if req is None:
continue
req.callbacks.run(event_type, data, metadata)
def poll_callbacks(self):
for rid in self.queue.requestIDs:
req = self.scan_manager.request_storage.find_request_by_ID(rid)
if req is None:
continue
req.callbacks.poll()
[docs]
def to_pandas(self) -> pd.DataFrame:
"""convert to pandas dataframe"""
if "pandas" not in sys.modules:
raise ImportError("Install `pandas` to use to_pandas() method")
tmp = defaultdict(list)
for scan_msg in self.data.messages.values():
scan_msg_data = scan_msg.content["data"]
for dev, dev_data in scan_msg_data.items():
for signal, signal_data in dev_data.items():
for key, value in signal_data.items():
tmp[(dev, signal, key)].append(value)
return pd.DataFrame(tmp)
def _update_async_data(self):
self.async_data = self._async_data_handler.get_async_data_for_scan(self.scan_id)
# msg = messages.ScanMessage(point_id=0, scan_id=self.scan_id, data=data)
# self.async_data.set(0, msg)
def __eq__(self, other):
return self.scan_id == other.scan_id
[docs]
def describe(self) -> str:
"""describe the scan item"""
start_time = (
f"\tStart time: {datetime.datetime.fromtimestamp(self.start_time).strftime('%c')}\n"
if self.start_time
else ""
)
end_time = (
f"\tEnd time: {datetime.datetime.fromtimestamp(self.end_time).strftime('%c')}\n"
if self.end_time
else ""
)
elapsed_time = (
f"\tElapsed time: {(self.end_time-self.start_time):.1f} s\n"
if self.end_time and self.start_time
else ""
)
scan_id = f"\tScan ID: {self.scan_id}\n" if self.scan_id else ""
scan_number = f"\tScan number: {self.scan_number}\n" if self.scan_number else ""
num_points = f"\tNumber of points: {self.num_points}\n" if self.num_points else ""
details = start_time + end_time + elapsed_time + scan_id + scan_number + num_points
return details
def __str__(self) -> str:
return f"ScanItem:\n {self.describe()}"
[docs]
class ScanStorage:
"""stores scan items"""
def __init__(self, scan_manager: ScanManager, maxlen=50, init_scan_number=0) -> None:
self.scan_manager = scan_manager
self.storage = deque(maxlen=maxlen)
self.last_scan_number = init_scan_number
self._lock = threading.RLock()
@property
def current_scan_info(self) -> dict:
"""get the current scan info from the scan queue"""
scan_queue = self.scan_manager.queue_storage.current_scan_queue
if not scan_queue or not scan_queue["primary"].get("info"):
return None
return scan_queue["primary"].get("info")[0]
@property
def current_scan(self) -> ScanItem | None:
"""get the current scan item"""
if not self.current_scan_id:
return None
return self.find_scan_by_ID(scan_id=self.current_scan_id[0])
@property
def current_scan_id(self) -> str | None:
"""get the current scan_id"""
if self.current_scan_info is None:
return None
return self.current_scan_info.get("scan_id")
[docs]
@threadlocked
def find_scan_by_ID(self, scan_id: str) -> ScanItem | None:
"""find a scan item based on its scan_id"""
for scan in self.storage:
if scan_id == scan.scan_id:
return scan
return None
[docs]
def update_with_scan_status(self, scan_status: messages.ScanStatusMessage) -> None:
"""update scan item in storage with a new ScanStatusMessage"""
scan_id = scan_status.content["scan_id"]
if not scan_id:
return
scan_number = scan_status.content["info"].get("scan_number")
if scan_number:
self.last_scan_number = scan_number
while True:
scan_item = self.find_scan_by_ID(scan_id=scan_status.content["scan_id"])
if scan_item:
break
time.sleep(0.1)
# update timestamps
if scan_status.content.get("status") == "open":
scan_item.start_time = scan_status.content.get("timestamp")
elif scan_status.content.get("status") == "closed":
scan_item.end_time = scan_status.content.get("timestamp")
# update status message
scan_item.status = scan_status.content.get("status")
scan_item.status_message = scan_status
# update total number of points
if scan_status.content["info"].get("num_points"):
scan_item.num_points = scan_status.content["info"].get("num_points")
# update scan number
if scan_item.scan_number is None:
scan_item.scan_number = scan_number
# add scan report info
scan_item.scan_report_instructions = scan_status.content["info"].get(
"scan_report_instructions"
)
# add scan def id
scan_def_id = scan_status.content["info"].get("scan_def_id")
if scan_def_id:
if scan_status.content.get("status") != "open":
scan_item.open_scan_defs.remove(scan_def_id)
else:
scan_item.open_scan_defs.add(scan_def_id)
# add queue group
scan_item.open_queue_group = scan_status.content["info"].get("queue_group")
# update async data
if scan_status.content.get("status") != "open":
# pylint: disable=protected-access
scan_item._update_async_data()
# run status callbacks
scan_item.emit_status(scan_status)
[docs]
def add_scan_segment(self, scan_msg: messages.ScanMessage) -> None:
"""update a scan item with a new scan segment"""
logger.info(
f"Received scan segment {scan_msg.content['point_id']} for scan"
f" {scan_msg.metadata['scan_id']}: "
)
while True:
with self._lock:
for scan_item in self.storage:
if scan_item.scan_id == scan_msg.metadata["scan_id"]:
scan_item.data.set(scan_msg.content["point_id"], scan_msg)
scan_item.emit_data(scan_msg)
return
time.sleep(0.01)
[docs]
def add_scan_baseline(self, scan_msg: messages.ScanBaselineMessage) -> None:
"""update a scan item with a new scan baseline"""
logger.info(f"Received scan baseline for scan {scan_msg.metadata['scan_id']}: ")
while True:
with self._lock:
for scan_item in self.storage:
if scan_item.scan_id == scan_msg.metadata["scan_id"]:
point = len(scan_item.baseline)
scan_item.baseline.set(point, scan_msg)
return
time.sleep(0.01)
[docs]
@threadlocked
def add_scan_item(self, queue_id: str, scan_number: list, scan_id: list, status: str):
"""append new scan item to scan storage"""
self.storage.append(ScanItem(self.scan_manager, queue_id, scan_number, scan_id, status))
[docs]
@threadlocked
def update_with_queue_status(self, queue_msg: messages.ScanQueueStatusMessage):
"""create new scan items based on their existence in the queue info"""
queue_info = queue_msg.content["queue"]["primary"].get("info")
for queue_item in queue_info:
# append = True
# for scan_obj in self.storage:
# if len(set(scan_obj.scan_id) & set(queue_item["scan_id"])) > 0:
# append = False
if not any(queue_item["is_scan"]):
continue
for ii, scan in enumerate(queue_item["scan_id"]):
if self.find_scan_by_ID(scan):
continue
logger.debug(f"Appending new scan: {queue_item}")
self.add_scan_item(
queue_id=queue_item["queue_id"],
scan_number=queue_item["scan_number"][ii],
scan_id=queue_item["scan_id"][ii],
status=queue_item["status"],
)