Source code for bec_lib.alarm_handler
"""
This module provides the alarm handler class and its related functionality.
"""
from __future__ import annotations
import enum
import threading
from collections import deque
from typing import TYPE_CHECKING
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils import threadlocked
if TYPE_CHECKING:
from bec_lib.redis_connector import RedisConnector
logger = bec_logger.logger
[docs]
class Alarms(int, enum.Enum):
WARNING = 0
MINOR = 1
MAJOR = 2
[docs]
class AlarmException(Exception):
pass
[docs]
class AlarmBase(Exception):
def __init__(
self, alarm: messages.AlarmMessage, alarm_type: str, severity: Alarms, handled=False
) -> None:
self.alarm = alarm
self.severity = severity
self.handled = handled
self.alarm_type = alarm_type
super().__init__(self.alarm.content)
def __str__(self) -> str:
return (
f"An alarm has occured. Severity: {self.severity.name}. Source:"
f" {self.alarm.content['source']}.\n{self.alarm_type}.\n\t"
f" {self.alarm.content['msg']}"
)
[docs]
class AlarmHandler:
def __init__(self, connector: RedisConnector) -> None:
self.connector = connector
self.alarms_stack = deque(maxlen=100)
self._raised_alarms = deque(maxlen=100)
self._lock = threading.RLock()
[docs]
def start(self):
"""start the alarm handler and its subscriptions"""
self.connector.register(
topics=MessageEndpoints.alarm(),
name="AlarmHandler",
cb=self._alarm_register_callback,
parent=self,
)
@staticmethod
def _alarm_register_callback(msg, *, parent, **_kwargs):
parent.add_alarm(msg.value)
[docs]
@threadlocked
def add_alarm(self, msg: messages.AlarmMessage):
"""Add a new alarm message to the stack.
Args:
msg (messages.AlarmMessage): Alarm message that should be added
"""
severity = Alarms(msg.content["severity"])
alarm = AlarmBase(
alarm=msg, alarm_type=msg.content["alarm_type"], severity=severity, handled=False
)
if severity > Alarms.MINOR:
self.alarms_stack.appendleft(alarm)
logger.debug(alarm)
else:
logger.warning(alarm)
[docs]
@threadlocked
def get_unhandled_alarms(self, severity=Alarms.WARNING) -> list:
"""Get all unhandled alarms equal or above a minimum severity.
Args:
severity (Alarms, optional): Minimum severity. Defaults to Alarms.WARNING.
Returns:
list: List of unhandled alarms
"""
return [
alarm for alarm in self.alarms_stack if not alarm.handled and alarm.severity >= severity
]
[docs]
@threadlocked
def get_alarm(self, severity=Alarms.WARNING):
"""Get the next alarm
Args:
severity (Alarm, optional): Minimum severity. Defaults to Alarms.WARNING.
Yields:
AlarmBase: Alarm
"""
alarms = self.get_unhandled_alarms(severity=severity)
for alarm in alarms:
yield alarm
[docs]
def raise_alarms(self, severity=Alarms.MAJOR):
"""Raise unhandled alarms with specified severity.
Args:
severity (Alarm, optional): Minimum severity. Defaults to Alarms.MAJOR.
Raises:
alarms: Alarm exception.
"""
alarms = self.get_unhandled_alarms(severity=severity)
if len(alarms) > 0:
alarm = alarms.pop(0)
self._raised_alarms.append(alarm)
raise alarm
[docs]
@threadlocked
def clear(self):
"""clear all alarms from stack"""
self.alarms_stack.clear()
[docs]
def shutdown(self):
"""shutdown the alarm handler"""
self.connector.shutdown()