Source code for twindb_backup.status.base_status
"""Base status is a class for a general purpose status.
"""
import hashlib
import json
import socket
from abc import abstractmethod, abstractproperty
from base64 import b64decode
from os import path as osp
from twindb_backup import LOG, STATUS_FORMAT_VERSION
from twindb_backup.destination.exceptions import FileNotFound
from twindb_backup.status.exceptions import CorruptedStatus, StatusKeyNotFound
[docs]class BaseStatus(object):
"""Base class for status. It can be instantiated either from
a string with status content or from a destination instance.
If the destination is given then the status will be read from a status
file on the destination.
:param content: if passed it will initialize a status from this string.
:type content: str
:param dst: Destination instance.
:type dst: BaseDestination
:param status_directory: Relative path to a directory where the status
file is stored. Usually,
it's a hostname where backup was taken from.
:type status_directory: str
:raise CorruptedStatus: If the content string is not a valid status
or empty string.
"""
__version__ = STATUS_FORMAT_VERSION
def __init__(self, content=None, dst=None, status_directory=None):
self._status_directory = status_directory or socket.gethostname()
self._status = []
if dst:
self.__init_from_str(self._read(dst))
else:
self.__init_from_str(content)
@abstractproperty
def basename(self):
"""
Returns file name without a directory path
where the status is stored in the destination.
"""
return "status"
@property
def latest_backup(self):
"""
Find the latest backup copy.
:return: backup copy or None if status is empty.
:rtype: BaseCopy
"""
try:
return self._status[len(self._status) - 1]
except IndexError:
return None
@property
def md5(self):
"""
:return: MD5 checksum of the status. It is calculated as
a md5 of output of ``self._status_serialize()``.
:rtype: str
"""
return hashlib.md5(self._status_serialize().encode("utf-8")).hexdigest()
@property
def status_path(self):
"""
Return relative path where status is stored.
:return: relative to the destination path where the status is stored.
:rtype: str
"""
return osp.join(self._status_directory, self.basename)
@property
def version(self):
"""
Version of status file. Originally status file didn't have
any versions, but in future the version will be used to work
with new features.
"""
return self.__version__
[docs] def add(self, backup_copy):
"""
Add entry to status.
:param backup_copy: Instance of backup copy
:type backup_copy: BaseCopy
"""
self._status.append(backup_copy)
[docs] def remove(self, key):
"""
Remove key from the status.
:param key: A copy key in the status.
:type key: str
"""
copy = None
try:
copy = self[key]
self._status.remove(copy)
except StatusKeyNotFound:
for copy in self._status:
if key.endswith(copy.key):
self._status.remove(copy)
return
raise
[docs] def serialize(self):
"""
Return a string that represents current state
"""
return json.dumps(
{
"status": self._status_serialize(),
"version": self.version,
"md5": self.md5,
},
sort_keys=True,
)
[docs] def save(self, dst):
"""
Write status file to the destination.
:param dst: Destination instance.
:type dst: BasicDestination
"""
dst.write(self.serialize(), self.status_path)
@abstractmethod
def _load(self, status_as_json):
"""
Parse status_as_json string and construct a status.
:param status_as_json: A JSON string with status
:type status_as_json: str
:return: status object - list of BackupCopies
:rtype: list
"""
raise NotImplementedError
def _read(self, dst):
"""
Read status file from the destination.
:param dst: Destination instance.
:type dst: BasicDestination
:return: Content of the file or None if file doesn't exist.
:rtype: str
"""
try:
return dst.read(self.status_path)
except (FileNotFound, FileNotFoundError):
return None
def _status_serialize(self):
raise NotImplementedError
def __getitem__(self, item):
if isinstance(item, int):
return self._status[item]
elif isinstance(item, (str,)):
for copy in self._status:
if copy.key == str(item):
return copy
raise StatusKeyNotFound("Copy %s not found" % item)
else:
raise NotImplementedError("Type %s not supported" % type(item))
def __str__(self):
return b64decode(self._status_serialize()).decode("utf-8")
def __len__(self):
return len(self._status)
def __init_from_str(self, content):
"""Initialize status from a string."""
if content == "":
raise CorruptedStatus("Status content cannot be an empty string")
try:
status = json.loads(content)
md5_stored = status["md5"]
md5_calculated = hashlib.md5(status["status"].encode("utf-8")).hexdigest()
if md5_calculated != md5_stored:
raise CorruptedStatus("Checksum mismatch")
self._status = self._load(b64decode(status["status"]).decode("utf-8"))
self._status.sort(key=lambda cp: cp.created_at)
except TypeError: # Init from None
self._status = []
except ValueError as err: # Old format
LOG.debug(err)
LOG.debug("Looks like old format")
self._status = self._load(b64decode(content).decode("utf-8"))
LOG.debug("Loaded status: %s", self._status)
self._status.sort(key=lambda cp: cp.sort_key)