Source code for twindb_backup.configuration

# -*- coding: utf-8 -*-
Module to process configuration file.
import socket
from configparser import ConfigParser, NoOptionError, NoSectionError
from shlex import split

from twindb_backup import INTERVALS, LOG
from twindb_backup.configuration.compression import CompressionConfig
from twindb_backup.configuration.destinations.gcs import GCSConfig
from twindb_backup.configuration.destinations.s3 import S3Config
from twindb_backup.configuration.destinations.ssh import SSHConfig
from twindb_backup.configuration.exceptions import ConfigurationError
from twindb_backup.configuration.gpg import GPGConfig
from twindb_backup.configuration.mysql import MySQLConfig
from twindb_backup.configuration.retention import RetentionPolicy
from twindb_backup.configuration.run_intervals import RunIntervals
from twindb_backup.destination.gcs import GCS
from twindb_backup.destination.s3 import S3
from twindb_backup.destination.ssh import Ssh
from twindb_backup.exporter.datadog_exporter import DataDogExporter
from twindb_backup.exporter.statsd_exporter import StatsdExporter

DEFAULT_CONFIG_FILE_PATH = "/etc/twindb/twindb-backup.cfg"

[docs]class TwinDBBackupConfig: """ Class represents TwinDB Backup configuration """ def __init__(self, config_file=DEFAULT_CONFIG_FILE_PATH): self._config_file = config_file self.__cfg = ConfigParser() self.__mysql = None @property def retention(self): """ :return: Remote retention policy. :rtype: RetentionPolicy """ return self._retention("retention") @property def retention_local(self): """ :return: Local retention policy. :rtype: RetentionPolicy """ return self._retention("retention_local") @property def run_intervals(self): """ Run intervals config. When to run or not the backup. :return: Configuration with data on whether to run the backup tool now. :rtype: RunIntervals """ kwargs = {} try: kwargs = {i: self.__cfg.getboolean("intervals", f"run_{i}") for i in INTERVALS} except (NoOptionError, NoSectionError) as err: LOG.debug(err) LOG.debug("Will use default retention policy") return RunIntervals(**kwargs) @property def mysql(self): """ :return: Local MySQL source configuration. :rtype: MySQLConfig """ if self.__mysql is None: try: self.__mysql = MySQLConfig(**self.__read_options_from_section("mysql")) except NoSectionError: return None return self.__mysql @property def ssh(self): """ :return: Remote SSH configuration. :rtype: SSHConfig """ try: return SSHConfig(**self.__read_options_from_section("ssh")) except NoSectionError: return None @property def s3(self): # pylint: disable=invalid-name """Amazon S3 configuration""" try: return S3Config(**self.__read_options_from_section("s3")) except NoSectionError: return None @property def gcs(self): # pylint: disable=invalid-name """Google Cloud Storage configuration""" try: return GCSConfig(**self.__read_options_from_section("gcs")) except NoSectionError: return None @property def keep_local_path(self): """If specified a local path where the tool will keep an additional local backup copy. """ try: return self.__cfg.get("destination", "keep_local_path") except (NoSectionError, NoOptionError): return None @property def exporter(self): """ Read config and return export transport instance :return: Instance of export transport, if it is set :rtype: BaseExporter :raise: ConfigurationError, if transport isn't implemented """ try: try: transport = self.__cfg.get("export", "transport") if transport == "datadog": app_key = self.__cfg.get("export", "app_key") api_key = self.__cfg.get("export", "api_key") return DataDogExporter(app_key, api_key) if transport == "statsd": statsd_host = self.__cfg.get("export", "statsd_host") statsd_port = self.__cfg.get("export", "statsd_port") return StatsdExporter(statsd_host, statsd_port) else: raise ConfigurationError(f"Metric exported '{transport}' is not implemented") except NoOptionError as err: raise ConfigurationError(err) from err except NoSectionError: return None @property def compression(self): """ :return: Compression configuration :rtype: CompressionConfig """ try: return CompressionConfig(**self.__read_options_from_section("compression")) except NoSectionError: return CompressionConfig() @property def gpg(self): """GPG configuration.""" try: return GPGConfig(**self.__read_options_from_section("gpg")) except NoSectionError: return None @property def backup_dirs(self): """Directories to backup""" try: dirs = self.__cfg.get("source", "backup_dirs") return split(dirs) except NoOptionError: return [] except NoSectionError as err: LOG.error("Section 'source' is mandatory") raise ConfigurationError(err) from err @property def backup_mysql(self): """FLag to backup MySQL or not""" try: return self.__cfg.getboolean("source", "backup_mysql") except NoOptionError: return False except NoSectionError as err: LOG.error("Section 'source' is mandatory") raise ConfigurationError(err) from err @property def tar_options(self): """ :return: Additional options passed to ``tar``. :rtype: str """ try: return self.__cfg.get("source", "tar_options") except NoOptionError: return None
[docs] def destination(self, backup_source=socket.gethostname()): """ :param backup_source: Hostname of the host where backup is taken from. :type backup_source: str :return: Backup destination instance :rtype: BaseDestination """ try: backup_destination = self.__cfg.get("destination", "backup_destination") if backup_destination == "ssh": return Ssh( self.ssh.path, hostname=backup_source,, ssh_port=self.ssh.port, ssh_user=self.ssh.user, ssh_key=self.ssh.key, ) elif backup_destination == "s3": return S3( bucket=self.s3.bucket, aws_access_key_id=self.s3.aws_access_key_id, aws_secret_access_key=self.s3.aws_secret_access_key, aws_default_region=self.s3.aws_default_region, hostname=backup_source, ) elif backup_destination == "gcs": return GCS( bucket=self.gcs.bucket, gc_credentials_file=self.gcs.gc_credentials_file, gc_encryption_key=self.gcs.gc_encryption_key, hostname=backup_source, ) else: raise ConfigurationError(f"Unsupported destination '{backup_destination}'") except NoSectionError as err: raise ConfigurationError(f"{self._config_file} is missing required section 'destination'") from err
def _retention(self, section): kwargs = {} for i in INTERVALS: option = f"{i}_copies" try: kwargs[i] = self.__cfg.getint(section, option) except (NoOptionError, NoSectionError): LOG.warning("Option %s is not defined in section %s", option, section) return RetentionPolicy(**kwargs) def __read_options_from_section(self, section): return {opt: self.__cfg.get(section, opt).strip("\"'") for opt in self.__cfg.options(section)} def __repr__(self): return f"{self.__class__.__name__}: {self._config_file}"