Source code for bec_lib.file_utils

""" Module for file utilities."""

from __future__ import annotations

import os
import warnings
from typing import TYPE_CHECKING

from bec_lib.bec_errors import ServiceConfigError
from bec_lib.endpoints import MessageEndpoints
from bec_lib.utils.import_utils import lazy_import_from

# TODO: put back normal import when Pydantic gets faster
ScanStatusMessage = lazy_import_from("bec_lib.messages", ("ScanStatusMessage",))

if TYPE_CHECKING:
    from bec_lib.redis_connector import RedisConnector


[docs] class ServiceConfigParser: """Service Config Parser""" def __init__(self, service_config: dict = None) -> None: """Initialize the service config parser. Args: service_config (dict): Service config from BEC startup. """ self._service_config = service_config @property def service_config(self) -> dict: """Get the service config.""" return self._service_config
[docs] def get_base_path(self) -> str: """Get the base path from the config.""" if not self.service_config: raise ServiceConfigError("Service config must contain a file writer definition.") if not self.service_config.get("base_path"): raise ServiceConfigError("File writer config must define a base path.") return os.path.abspath(os.path.expanduser(self.service_config.get("base_path")))
[docs] def create_directory(self, directory: str) -> None: """Create a directory if it does not exist.""" os.makedirs(directory, exist_ok=True)
[docs] class LogWriter: """Log writer class""" def __init__(self, service_config: dict) -> None: """ Initialize the log writer class. Args: service_config (dict): Log writer service config. Must at least contain a base_path. """ self.service_config_parser = ServiceConfigParser(service_config) self._base_path = self.service_config_parser.get_base_path() self._directory = os.path.join(self._base_path, "logs") self.create_directory(self.directory)
[docs] def create_directory(self, fname: str = None) -> None: """Create the log directory.""" self.service_config_parser.create_directory(fname)
@property def directory(self) -> str: """Get the log directory. Returns: str: String representation of log directory """ return self._directory
[docs] class DeviceConfigWriter: """Device config writer class""" def __init__(self, service_config: dict) -> None: """ Initialize the device config writer class. At the moment, this uses the same base_path as the log writer Args: service_config (dict): Device config writer service config. Must at least contain a base_path. """ self.service_config_parser = ServiceConfigParser(service_config) self._base_path = self.service_config_parser.get_base_path() self._directory = os.path.join(self._base_path, "device_configs") self.create_directory(fname=self.directory)
[docs] def create_directory(self, fname: str = None) -> None: """Create the device config directory.""" self.service_config_parser.create_directory(fname)
@property def directory(self) -> str: """Get the device config directory. Returns: str: String representation of device config directory """ return self._directory
[docs] def get_recovery_directory(self) -> str: """ Compile the recovery config directory. """ return os.path.join(self.directory, "recovery_configs")
[docs] class FileWriterError(Exception): """Exception for errors in the file writer"""
[docs] class FileWriter: """FileWriter for creating file paths and directories for services and devices.""" def __init__( self, *args, service_config: dict = None, connector: RedisConnector = None, scan_bundle: int = 1000, leading_zeros: int = 5, **kwargs, ) -> None: """ Initialize the file writer mixin class. If no RedisConnector is provided, the class will not be able to communicate with the REDIS server. In this case, it will fall back to a default base path in the current directory. Args: service_config (dict): File writer service config. Must at least contain base_path. connector (Redisconnector, optional): Connector class to use. Defaults to RedisConnector. scan_bundle (int, optional): Scan bundle size. Defaults to 1000. leading_zeros (int, optional): Number of leading zeros. Defaults to 5. """ self.service_config = service_config self._scan_bundle = scan_bundle self._leading_zeros = leading_zeros self._kwargs = kwargs self._base_path = "." # default to current directory self.connector = connector self.service_config_parser = None self._configured = False self._initialized = True if self.connector: self.configure_file_writer() self._configured = True
[docs] def configure_file_writer(self): """Configure the file writer mixin class in case service_config is provided""" self.service_config_parser = ServiceConfigParser(self.service_config) self._base_path = self.service_config_parser.get_base_path()
@property def scan_bundle(self) -> int: """Get the scan bundle size.""" return self._scan_bundle @property def leading_zeros(self) -> int: """Get the number of leading zeros.""" return self._leading_zeros
[docs] def get_scan_directory( self, scan_number: int, scan_bundle: int, leading_zeros: int = None, user_suffix: str = None ) -> str: """ Get the scan directory for a given scan number and scan bundle. Args: scan_bundle (int): Scan bundle size scan_number (int): Scan number leading_zeros (int, optional): Number of leading zeros. Defaults to None. user_suffix (str, optional): User defined suffix. Defaults to None. Returns: str: Scan directory Examples: >>> get_scan_directory(1234, 1000, 5) 'S01000-01999/S01234' >>> get_scan_directory(1234, 1000, 5, 'sampleA') 'S01000-01999/S01234_sampleA' """ if leading_zeros is None: leading_zeros = len(str(scan_bundle)) floor_dir = scan_number // scan_bundle * scan_bundle rtr = f"S{floor_dir:0{leading_zeros}d}-{floor_dir+scan_bundle-1:0{leading_zeros}d}/S{scan_number:0{leading_zeros}d}" if user_suffix: rtr += f"_{user_suffix}" return rtr
[docs] def get_scan_msg(self): """Get the scan message for the next scan""" msg = self.connector.get(MessageEndpoints.scan_status()) if not isinstance(msg, ScanStatusMessage): return None return msg
[docs] def compile_full_filename(self, suffix: str, create_dir: bool = True) -> str: """ Compile a full filename for a given scan number and suffix. This method should only be called after a scan has been opened, i.e. preferable in stage and not during __init__. The method will use the last scan message received in REDIS, and will return an empty string if no scan message is available. Args: suffix (str): Filename suffix including extension. We allow alphanumeric, ascii characters - and _. file_type (str) : Optional, will default to h5. create_dir (bool, optional): Create the directory if it does not exist. Defaults to True. Returns: str: Full filename """ # to check if suffix is alphanumeric and ascii, however we allow in addition - and _ check_suffix = suffix.replace("_", "").replace("-", "") if not check_suffix.isalnum() or not check_suffix.isascii(): raise FileWriterError( f"Can't use suffix {suffix}; formatting is alphanumeric:{suffix.isalnum()} and ascii {suffix.isascii()}" ) if self._configured: scan_msg = self.get_scan_msg() if not scan_msg: warnings.warn("No scan message available.") return "" scannr = scan_msg.content["info"]["scan_number"] user_suffix = scan_msg.info["file_writer_data"].get("file_suffix") scan_dir = scan_msg.info["file_writer_data"].get("file_directory") if not scan_dir: scan_dir = self.get_scan_directory( scannr, self.scan_bundle, self.leading_zeros, user_suffix=user_suffix ) if user_suffix: suffix += f"_{user_suffix}" full_file = os.path.join( self._base_path, "data", scan_dir, f"S{scannr:0{self._leading_zeros}d}_{suffix}.h5" ) else: full_file = os.path.join(self._base_path, "data", f"S00000_default_{suffix}.h5") warnings.warn(f"No service config provided, using default base path {full_file}.") if create_dir: os.makedirs(os.path.dirname(full_file), exist_ok=True) return full_file