Source code for bec_lib.service_config
"""
This module provides a class to handle the service configuration.
"""
import json
import os
from pathlib import Path
import yaml
from bec_lib.logger import bec_logger
try:
from bec_plugins.utils import load_service_config as plugin_load_service_config
except ImportError:
plugin_load_service_config = None
logger = bec_logger.logger
DEFAULT_BASE_PATH = (
str(Path(__file__).resolve().parent.parent.parent) if "site-packages" not in __file__ else "./"
)
DEFAULT_SERVICE_CONFIG = {
"redis": {"host": os.environ.get("BEC_REDIS_HOST", "localhost"), "port": 6379},
"service_config": {
"file_writer": {"plugin": "default_NeXus_format", "base_path": DEFAULT_BASE_PATH},
"log_writer": {"base_path": DEFAULT_BASE_PATH},
},
}
[docs]
class ServiceConfig:
def __init__(
self, config_path: str = None, redis: dict = None, config: dict = None, **kwargs
) -> None:
self.config_path = config_path
self.config = {}
self._load_config()
if self.config:
self._load_urls("redis", required=True)
self._load_urls("mongodb", required=False)
self._update_config(service_config=config, redis=redis)
self.service_config = self.config.get(
"service_config", DEFAULT_SERVICE_CONFIG["service_config"]
)
def _update_config(self, **kwargs):
for key, val in kwargs.items():
if not val:
continue
self.config[key] = val
def _load_config(self):
if self.config_path:
with open(self.config_path, "r") as stream:
self.config = yaml.safe_load(stream)
logger.info(
"Loaded new config from disk:"
f" {json.dumps(self.config, sort_keys=True, indent=4)}"
)
return
if os.environ.get("BEC_SERVICE_CONFIG"):
self.config = json.loads(os.environ.get("BEC_SERVICE_CONFIG"))
logger.info(
"Loaded new config from environment:"
f" {json.dumps(self.config, sort_keys=True, indent=4)}"
)
return
if plugin_load_service_config:
self.config = plugin_load_service_config()
logger.info(
"Loaded new config from plugin:"
f" {json.dumps(self.config, sort_keys=True, indent=4)}"
)
return
if not self.config_path:
self.config = DEFAULT_SERVICE_CONFIG
return
def _load_urls(self, entry: str, required: bool = True):
config = self.config.get(entry)
if config:
return f"{config['host']}:{config['port']}"
if required:
raise ValueError(
f"The provided config does not specify the url (host and port) for {entry}."
)
return ""
@property
def redis(self):
return self._load_urls("redis", required=True)
@property
def abort_on_ctrl_c(self):
return self.service_config.get("abort_on_ctrl_c", True)