Source code for twindb_backup.status.mysql_status

"""Class to store and work with status file"""
from __future__ import print_function

import json
from base64 import b64decode, b64encode

from twindb_backup import INTERVALS, LOG
from twindb_backup.copy.mysql_copy import MySQLCopy

# For backward compatibility content of my.cnf files is base64 encoded.
from twindb_backup.status.exceptions import CorruptedStatus
from twindb_backup.status.periodic_status import PeriodicStatus


[docs]class MySQLStatus(PeriodicStatus): """ Class that stores status file and implements operations on it. """ def __init__(self, content=None, dst=None, status_directory=None): super(MySQLStatus, self).__init__(content=content, dst=dst, status_directory=status_directory) @property def basename(self): return "status"
[docs] def candidate_parent(self, run_type): """ Find a backup copy that can be a parent :param run_type: See :func:`~get_backup_type`. :return: Backup copy or None :rtype: MySQLCopy """ full_backup_index = INTERVALS.index(run_type) LOG.debug("Looking a parent candidate for %s run", run_type) for i in range(full_backup_index, len(INTERVALS)): period_copies = getattr(self, INTERVALS[i]) LOG.debug("Checking %d %s copies", len(period_copies), INTERVALS[i]) for value in sorted(period_copies.values(), reverse=True): try: if value.type == "full": LOG.debug("Found parent %r", value) return value except KeyError: return None LOG.debug("No eligible parents") return None
[docs] def full_copy_exists(self, run_type): """ Check whether there is a full copy. :param run_type: See :func:`~get_backup_type`. :return: True if there is a full copy. False if there is no an eligible full copy. :rtype: bool """ return self.candidate_parent(run_type) is not None
[docs] def next_backup_type(self, full_backup, run_type): """ Return backup type to take. If full_backup=daily then for hourly backups it will be incremental, for all other - full :param full_backup: when to take full backup according to config. :param run_type: what kind of backup run it is. :return: "full" or "incremental" :rtype: str """ if INTERVALS.index(full_backup) <= INTERVALS.index(run_type): return "full" elif not self.full_copy_exists(run_type): return "full" else: return "incremental"
def _load(self, status_as_json): status = [] try: status_as_obj = json.loads(status_as_json) except ValueError: raise CorruptedStatus("Could not load status from a bad JSON string %s" % (status_as_json,)) for run_type in INTERVALS: for key, value in status_as_obj[run_type].items(): try: host = key.split("/")[0] file_name = key.split("/")[3] kwargs = { "type": value["type"], "config": self.__serialize_config(value), } keys = [ "backup_started", "backup_finished", "binlog", "parent", "lsn", "position", "wsrep_provider_version", "server_vendor", ] for copy_key in keys: if copy_key in value: kwargs[copy_key] = value[copy_key] copy = MySQLCopy(host, run_type, file_name, **kwargs) status.append(copy) except IndexError as err: LOG.error(err) raise CorruptedStatus("Unexpected key %s" % key) return status def _status_serialize(self): def _serialize_config_dict(cfg): config_serialized = [] for key, value in cfg.items(): config_serialized.append({key: b64encode(value.encode("utf-8")).decode("utf-8")}) return config_serialized status = {} for interval in INTERVALS: status[interval] = {} copies = getattr(self, interval) for _, copy in copies.items(): status[interval][copy.key] = copy.as_dict() status[interval][copy.key]["config"] = _serialize_config_dict(status[interval][copy.key]["config"]) return b64encode(json.dumps(status).encode("utf-8")).decode("utf-8") @staticmethod def __serialize_config(copy): config = {} try: for cfg in copy["config"]: for cfg_key, cfg_value in cfg.items(): config[cfg_key] = b64decode(cfg_value.encode("utf-8")).decode("utf-8") except KeyError: config = {} return config