Source code for bec_lib.config_helper
"""
This module provides a helper class for updating and saving the BEC device configuration.
"""
from __future__ import annotations
import datetime
import json
import os
import pathlib
import time
import uuid
from typing import TYPE_CHECKING
import yaml
import bec_lib
from bec_lib import messages
from bec_lib.bec_errors import DeviceConfigError, ServiceConfigError
from bec_lib.bec_yaml_loader import yaml_load
from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import DeviceConfigWriter
from bec_lib.logger import bec_logger
from bec_lib.messages import DeviceConfigMessage, RequestResponseMessage
if TYPE_CHECKING:
from bec_lib.redis_connector import RedisConnector
logger = bec_logger.logger
[docs]
class ConfigHelper:
"""Config Helper"""
def __init__(self, connector: RedisConnector, service_name: str = None) -> None:
"""Helper class for updating and saving the BEC device configuration.
Args:
connector (RedisConnector): Redis connector.
service_name (str, optional): Name of the service. Defaults to None.
"""
self.connector = connector
self._service_name = service_name
self.writer_mixin = None
self._base_path_recovery = None
[docs]
def update_session_with_file(self, file_path: str, save_recovery: bool = True) -> None:
"""Update the current session with a yaml file from disk.
Args:
file_path (str): Full path to the yaml file.
save_recovery (bool, optional): Save the current session before updating. Defaults to True.
"""
if save_recovery:
time_stamp = f"{datetime.datetime.now():%Y-%m-%d_%H-%M-%S}"
if not self._base_path_recovery:
self._update_base_path_recovery()
if os.path.exists(self._base_path_recovery) is False:
self.writer_mixin.create_directory(self._base_path_recovery)
fname = os.path.join(self._base_path_recovery, f"recovery_config_{time_stamp}.yaml")
success = self._save_config_to_file(fname, raise_on_error=False)
if success:
print(f"A recovery config was written to {fname}.")
config = self._load_config_from_file(file_path)
self.send_config_request(action="set", config=config)
def _update_base_path_recovery(self):
"""
Compile the filepath for the recovery configs.
"""
# pylint: disable=import-outside-toplevel
from bec_lib.bec_service import SERVICE_CONFIG
service_cfg = SERVICE_CONFIG.config["service_config"].get("log_writer", None)
if not service_cfg:
raise ServiceConfigError(
f"ServiceConfig {service_cfg} must at least contain key with 'log_writer'"
)
self.writer_mixin = DeviceConfigWriter(service_cfg)
self._base_path_recovery = self.writer_mixin.get_recovery_directory()
self.writer_mixin.create_directory(self._base_path_recovery)
def _load_config_from_file(self, file_path: str) -> dict:
data = {}
if pathlib.Path(file_path).suffix not in (".yaml", ".yml"):
raise NotImplementedError
with open(file_path, "r", encoding="utf-8") as stream:
try:
data = yaml_load(stream)
logger.trace(
f"Loaded new config from disk: {json.dumps(data, sort_keys=True, indent=4)}"
)
except yaml.YAMLError as err:
logger.error(f"Error while loading config from disk: {repr(err)}")
return data
[docs]
def save_current_session(self, file_path: str):
"""Save the current session as a yaml file to disk.
Args:
file_path (str): Full path to the yaml file.
"""
self._save_config_to_file(file_path)
print(f"Config was written to {file_path}.")
def _save_config_to_file(self, file_path: str, raise_on_error: bool = True) -> bool:
config = self.connector.get(MessageEndpoints.device_config())
if not config:
if raise_on_error:
raise DeviceConfigError("No config found in the session.")
return False
config = config.content["resource"]
out = {}
for dev in config:
dev.pop("id", None)
dev.pop("createdAt", None)
dev.pop("createdBy", None)
dev.pop("sessionId", None)
name = dev.pop("name")
out[name] = dev
with open(file_path, "w") as file:
file.write(yaml.dump(out))
return True
[docs]
def send_config_request(self, action: str = "update", config=None) -> None:
"""
send request to update config
Returns:
"""
if action in ["update", "add", "set"] and not config:
raise DeviceConfigError(f"Config cannot be empty for an {action} request.")
RID = str(uuid.uuid4())
self.connector.send(
MessageEndpoints.device_config_request(),
DeviceConfigMessage(action=action, config=config, metadata={"RID": RID}),
)
reply = self.wait_for_config_reply(RID)
if not reply.content["accepted"] and not reply.metadata.get("updated_config"):
raise DeviceConfigError(
f"Failed to update the config: {reply.content['message']}. No devices were updated."
)
try:
if not reply.content["accepted"] and reply.metadata.get("updated_config"):
raise DeviceConfigError(
f"Failed to update the config: {reply.content['message']}. The old config will be kept in the device config history."
)
if "failed_devices" in reply.metadata:
print("Failed to update the config for some devices.")
for dev in reply.metadata["failed_devices"]:
print(
f"Device {dev} failed to update:\n {reply.metadata['failed_devices'][dev]}."
)
devices = [dev for dev in reply.metadata["failed_devices"]]
raise DeviceConfigError(
f"Failed to update the config for some devices. The following devices were disabled: {devices}."
)
finally:
# wait for the device server and scan server to acknowledge the config change
self.wait_for_service_response(RID)
[docs]
def wait_for_service_response(self, RID: str, timeout=10) -> messages.ServiceResponseMessage:
"""
wait for service response
Args:
RID (str): request id
timeout (int, optional): timeout in seconds. Defaults to 10.
Returns:
ServiceResponseMessage: reply message
"""
elapsed_time = 0
max_time = timeout
while True:
service_messages = self.connector.lrange(MessageEndpoints.service_response(RID), 0, -1)
if not service_messages:
time.sleep(0.005)
elapsed_time += 0.005
else:
ack_services = [
msg.content["response"]["service"]
for msg in service_messages
if msg is not None
]
checked_services = set(["DeviceServer", "ScanServer"])
if self._service_name:
checked_services.add(self._service_name)
if checked_services.issubset(set(ack_services)):
break
if elapsed_time > max_time:
if service_messages:
raise DeviceConfigError(
"Timeout reached whilst waiting for config change to be acknowledged."
f" Received {service_messages}."
)
raise DeviceConfigError(
"Timeout reached whilst waiting for config change to be acknowledged. No"
" messages received."
)
[docs]
def wait_for_config_reply(self, RID: str, timeout=10) -> RequestResponseMessage:
"""
wait for config reply
Args:
RID (str): request id
timeout (int, optional): timeout in seconds. Defaults to 10.
Returns:
RequestResponseMessage: reply message
"""
start = 0
while True:
msg = self.connector.get(MessageEndpoints.device_config_request_response(RID))
if msg is None:
time.sleep(0.01)
start += 0.01
if start > timeout:
raise DeviceConfigError("Timeout reached whilst waiting for config reply.")
continue
return msg
[docs]
def load_demo_config(self):
"""Load BEC device demo_config.yaml for simulation."""
dir_path = os.path.abspath(os.path.join(os.path.dirname(bec_lib.__file__), "./configs/"))
fpath = os.path.join(dir_path, "demo_config.yaml")
self.update_session_with_file(fpath)