"""
This module contains the BECLogger class, which is a wrapper around the loguru logger. It is used to
configure and manage the logging of the BEC.
"""
from __future__ import annotations
import enum
import json
import os
import sys
import traceback
from itertools import takewhile
from typing import TYPE_CHECKING
from loguru import logger as loguru_logger
# TODO: Importing bec_lib, instead of `from bec_lib.messages import LogMessage`, avoids potential
# logger <-> messages circular import. But there could be a better solution.
import bec_lib
from bec_lib.bec_errors import ServiceConfigError
from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import LogWriter
if TYPE_CHECKING:
from bec_lib.connector import ConnectorBase
[docs]
class LogLevel(int, enum.Enum):
"""Mapping of Loguru log levels to BEC log levels."""
TRACE = 5
DEBUG = 10
INFO = 20
SUCCESS = 25
WARNING = 30
ERROR = 40
CRITICAL = 50
CONSOLE_LOG = 21
[docs]
class BECLogger:
"""Logger for BEC."""
SERVICE_ABBREVIATION = {
"ScanServer": "SCS",
"SciHub": "SIH",
"DeviceServer": "DVS",
"DAPServer": "DAP",
"FileWriterManager": "FWM",
"ScanBundler": "SCB",
"BECClient": "CLI",
}
LOG_FORMAT = (
"<green>{service_name} | {{time:YYYY-MM-DD HH:mm:ss}}</green> | <level>[{{level}}]</level> |"
" <level>{{message}}</level>\n"
)
DEBUG_FORMAT = (
"<green>{service_name} | {{time:YYYY-MM-DD HH:mm:ss.SSS}}</green> | <level>{{level}}</level> |"
" <level>{{thread.name}} ({{thread.id}})</level> | <cyan>{{name}}</cyan>:<cyan>{{function}}</cyan>:<cyan>{{line}}</cyan> -"
" <level>{{message}}</level>\n"
)
TRACE_FORMAT = (
"<green>{service_name} | {{time:YYYY-MM-DD HH:mm:ss.SSS}}</green> | <level>{{level}}</level> |"
" <level>{{thread.name}} ({{thread.id}})</level> | <cyan>{{extra[stack]}}</cyan> - <level>{{message}}</level>\n"
)
LOGLEVEL = LogLevel
_logger = None
def __init__(self) -> None:
if hasattr(self, "_configured"):
return
self.bootstrap_server = None
self.connector = None
self.service_name = None
self.writer_mixin = None
self._base_path = None
self.logger = loguru_logger
self._log_level = LogLevel.INFO
self.level = self._log_level
self._configured = False
# self.logger.level("CONSOLE_LOG", no=21, color="<yellow>", icon="📣")
self._update_logger_level()
def __new__(cls):
if not hasattr(cls, "_logger") or cls._logger is None:
cls._logger = super(BECLogger, cls).__new__(cls)
return cls._logger
@classmethod
def _reset_singleton(cls):
if cls._logger is not None:
cls._logger.logger.remove()
cls._logger = None
def _update_logger_level(self):
try:
self.logger.level("CONSOLE_LOG", no=21, color="<yellow>", icon="📣")
except TypeError:
print("CONSOLE_LOG was already configured")
def _update_base_path(self, service_config: dict = None):
"""
Compile the log base path.
"""
# pylint: disable=import-outside-toplevel
if service_config:
service_cfg = service_config.get("log_writer", None)
if not service_cfg:
raise ServiceConfigError(
f"ServiceConfig {service_config} must at least contain key with 'log_writer'"
)
else:
service_cfg = {"base_path": "./"}
self.writer_mixin = LogWriter(service_cfg)
self._base_path = self.writer_mixin.directory
self.writer_mixin.create_directory(self._base_path)
def _logger_callback(self, msg):
if not self._configured:
return
msg = json.loads(msg)
msg["service_name"] = self.service_name
try:
self.connector.send(
topic=MessageEndpoints.log(),
msg=bec_lib.messages.LogMessage(
log_type=msg["record"]["level"]["name"].lower(), log_msg=msg
),
)
except Exception:
# connector disconnected?
# just ignore the error here...
# Exception is not explicitely specified,
# because it depends on the connector
pass
def _update_sinks(self):
self.logger.remove()
self.add_console_log()
self.add_redis_log(self._log_level)
self.add_sys_stderr(self._log_level)
self.add_file_log(self._log_level)
[docs]
def add_sys_stderr(self, level: LogLevel):
"""
Add a sink to stderr.
Args:
level (LogLevel): Log level.
"""
self.logger.add(sys.stderr, level=level, format=self.formatting, enqueue=True)
[docs]
def add_file_log(self, level: LogLevel):
"""
Add a sink to the service log file.
Args:
level (LogLevel): Log level.
"""
if not self.service_name:
return
filename = os.path.join(self._base_path, f"{self.service_name}.log")
self.logger.add(filename, level=level, format=self.formatting, enqueue=True)
[docs]
def add_console_log(self):
"""
Add a sink to the console log.
"""
if not self.service_name or self.service_name in [
"ScanServer",
"SciHub",
"DAPServer",
"DeviceServer",
"ScanBundler",
"FileWriterManager",
]:
return
filename = os.path.join(self._base_path, f"{self.service_name}_CONSOLE.log")
self.logger.add(
filename,
level=LogLevel.CONSOLE_LOG,
format=self.get_format(LogLevel.CONSOLE_LOG),
filter=lambda record: record["level"].no == LogLevel.CONSOLE_LOG,
enqueue=True,
)
[docs]
def add_redis_log(self, level: LogLevel):
"""
Add a sink to the redis log.
Args:
level (LogLevel): Log level.
"""
self.logger.add(self._logger_callback, serialize=True, level=level, format=self.formatting)
@property
def level(self):
"""
Get the current log level.
"""
return self._log_level
@level.setter
def level(self, val: LogLevel):
self._log_level = val
self._update_sinks()
bec_logger = BECLogger()