Source code for bec_lib.queue_items
"""
This module contains the QueueItem and QueueStorage classes.
"""
from __future__ import annotations
import functools
import threading
from collections import deque
from typing import TYPE_CHECKING
from rich.console import Console
from rich.table import Table
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.utils import threadlocked
if TYPE_CHECKING:
from bec_lib.request_items import RequestItem
from bec_lib.scan_items import ScanItem
from bec_lib.scan_manager import ScanManager
[docs]
def update_queue(fcn):
"""Decorator to update the queue item"""
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
self._update_with_buffer()
return fcn(self, *args, **kwargs)
return wrapper
[docs]
class QueueItem:
# pylint: disable=too-many-arguments
def __init__(
self,
scan_manager: ScanManager,
queue_id: str,
request_blocks: list,
status: str,
active_request_block: dict,
scan_id: list[str],
**_kwargs,
) -> None:
self.scan_manager = scan_manager
self.queue_id = queue_id
self.request_blocks = request_blocks
self._status = status
self.active_request_block = active_request_block
self.scan_ids = scan_id
@property
@update_queue
def scans(self) -> list[ScanItem]:
"""get the scans items assigned to the current queue item"""
return [
self.scan_manager.scan_storage.find_scan_by_ID(scan_id) for scan_id in self.scan_ids
]
@property
@update_queue
def requestIDs(self):
return [request_block["RID"] for request_block in self.request_blocks]
@property
@update_queue
def requests(self) -> list[RequestItem]:
"""get the request items assigned to the current queue item"""
return [
self.scan_manager.request_storage.find_request_by_ID(requestID)
for requestID in self.requestIDs
]
@property
@update_queue
def status(self):
return self._status
def _update_with_buffer(self):
current_queue = self.scan_manager.queue_storage.current_scan_queue
queue_info = current_queue["primary"].get("info")
for queue_item in queue_info:
if queue_item["queue_id"] == self.queue_id:
self.update_queue_item(queue_item)
return
history = self.scan_manager.queue_storage.queue_history
for queue_item in history:
if queue_item.content["queue_id"] == self.queue_id:
self.update_queue_item(queue_item.content["info"])
return
[docs]
def update_queue_item(self, queue_item):
"""update the queue item"""
self.request_blocks = queue_item.get("request_blocks")
self._status = queue_item.get("status")
self.active_request_block = queue_item.get("active_request_block")
self.scan_ids = queue_item.get("scan_id")
@property
def queue_position(self) -> int | None:
"""get the current queue position"""
current_queue = self.scan_manager.queue_storage.current_scan_queue
for queue_group in current_queue.values():
if not isinstance(queue_group, dict):
continue
for queue_position, queue in enumerate(queue_group["info"]):
if self.queue_id == queue["queue_id"]:
return queue_position
return None
[docs]
class QueueStorage:
"""stores queue items"""
def __init__(self, scan_manager: ScanManager, maxlen=50) -> None:
self.storage: deque[QueueItem] = deque(maxlen=maxlen)
self._lock = threading.RLock()
self.scan_manager = scan_manager
self.current_scan_queue = None
self.queue_history = None
def _update_queue_history(self):
"""get the queue history from redis"""
self.queue_history = self.scan_manager.connector.lrange(
MessageEndpoints.scan_queue_history(), 0, 5
)
def _update_current_scan_queue(self):
"""get the current scan queue from redis"""
msg = self.scan_manager.connector.get(MessageEndpoints.scan_queue_status())
if msg:
self.current_scan_queue = msg.content["queue"]
def _update_queue(self):
self._update_current_scan_queue()
self._update_queue_history()
[docs]
def describe_queue(self):
"""create a rich.table description of the current scan queue"""
queue_tables = []
self._update_queue()
console = Console()
for queue_name, scan_queue in self.current_scan_queue.items():
table = Table(title=f"{queue_name} queue / {scan_queue.get('status')}")
table.add_column("queue_id", justify="center")
table.add_column("scan_id", justify="center")
table.add_column("is_scan", justify="center")
table.add_column("type", justify="center")
table.add_column("scan_number", justify="center")
table.add_column("IQ status", justify="center")
for instruction_queue in scan_queue.get("info"):
scan_msgs = [
msg.get("content") for msg in instruction_queue.get("request_blocks", [])
]
table.add_row(
instruction_queue.get("queue_id"),
", ".join([str(s) for s in instruction_queue.get("scan_id")]),
", ".join([str(s) for s in instruction_queue.get("is_scan")]),
", ".join([msg["scan_type"] for msg in scan_msgs]),
", ".join([str(s) for s in instruction_queue.get("scan_number")]),
instruction_queue.get("status"),
)
with console.capture() as capture:
console.print(table)
queue_tables.append(capture.get())
return queue_tables
[docs]
@threadlocked
def update_with_status(self, queue_msg: messages.ScanQueueStatusMessage) -> None:
"""update a queue item with a new ScanQueueStatusMessage / queue message"""
self.current_scan_queue = queue_msg.content["queue"]
self._update_queue_history()
queue_info = queue_msg.content["queue"]["primary"].get("info")
for queue_item in queue_info:
queue = self.find_queue_item_by_ID(queue_id=queue_item["queue_id"])
if queue:
queue.update_queue_item(queue_item)
continue
self.storage.append(QueueItem(scan_manager=self.scan_manager, **queue_item))
[docs]
@threadlocked
def find_queue_item_by_ID(self, queue_id: str) -> QueueItem | None:
"""find a queue item based on its queue_id"""
for queue_item in self.storage:
if queue_item.queue_id == queue_id:
return queue_item
return None
[docs]
@threadlocked
def find_queue_item_by_requestID(self, requestID: str) -> QueueItem | None:
"""find a queue item based on its requestID"""
for queue_item in self.storage:
if requestID in queue_item.requestIDs:
return queue_item
return None
[docs]
@threadlocked
def find_queue_item_by_scan_id(self, scan_id: str) -> QueueItem | None:
"""find a queue item based on its scan_id"""
for queue_item in self.storage:
if scan_id in queue_item.scans:
return queue_item
return None