# -*- coding: utf-8 -*-
"""
Module that restores backup copies.
"""
from __future__ import print_function
import errno
import os
import sys
import tempfile
import time
from os import path as osp
from subprocess import PIPE, Popen
import psutil
from twindb_backup import DEFAULT_FILE_ENCODING, LOG, XBSTREAM_BINARY, XTRABACKUP_BINARY
from twindb_backup.copy.mysql_copy import MySQLCopy
from twindb_backup.destination.exceptions import DestinationError
from twindb_backup.destination.local import Local
from twindb_backup.exceptions import TwinDBBackupError
from twindb_backup.export import export_info
from twindb_backup.exporter.base_exporter import ExportCategory, ExportMeasureType
from twindb_backup.modifiers.gpg import Gpg
from twindb_backup.status.mysql_status import MySQLStatus
from twindb_backup.util import mkdir_p
[docs]def get_my_cnf(status, key):
"""
Get MySQL config from the status.
:param status: Backup status.
:type status: MySQLStatus
:param key: Backup name.
:type key: str
:return: Content of my.cnf or None if not found
:rtype: str
"""
for path in status[key].config:
if isinstance(status[key].config[path], str):
yield path, status[key].config[path]
elif isinstance(status[key].config[path], bytes):
yield path, status[key].config[path].decode("utf-8")
else:
raise TwinDBBackupError(f"Unexpected type of {status[key].config[path]}")
[docs]def get_free_memory():
"""Return size of available memory in bytes. It calculates it as a half
of available memory.
:return: Size of free memory in bytes
:rtype: int
"""
return int(psutil.virtual_memory().available / 2)
[docs]def restore_from_mysql_full(
stream,
dst_dir,
config,
redo_only=False,
xtrabackup_binary=XTRABACKUP_BINARY,
xbstream_binary=XBSTREAM_BINARY,
):
"""
Restore MySQL datadir from a backup copy
:param stream: Generator that provides backup copy
:param dst_dir: Path to destination directory. Must exist and be empty.
:type dst_dir: str
:param config: Tool configuration.
:type config: TwinDBBackupConfig
:param redo_only: True if the function has to do final apply of
the redo log. For example, if you restore backup from a full copy
it should be False. If you restore from incremental copy and
you restore base full copy redo_only should be True.
:type redo_only: bool
:param xtrabackup_binary: path to xtrabackup binary.
:param xbstream_binary: Path to xbstream binary
:return: If success, return True
:rtype: bool
"""
LOG.debug("Restore tools: %s/%s", xtrabackup_binary, xbstream_binary)
# GPG modifier
if config.gpg:
gpg = Gpg(
stream,
config.gpg.recipient,
config.gpg.keyring,
secret_keyring=config.gpg.secret_keyring,
)
LOG.debug("Decrypting stream")
stream = gpg.revert_stream()
else:
LOG.debug("Not decrypting the stream")
if config.mysql.xtrabackup_binary:
xtrabackup_binary = config.mysql.xtrabackup_binary
stream = config.compression.get_modifier(stream).revert_stream()
with stream as handler:
if not _extract_xbstream(handler, dst_dir, xbstream_binary):
return False
try:
xtrabackup_cmd = [
xtrabackup_binary,
f"--use-memory={get_free_memory()}",
"--prepare",
]
if redo_only:
xtrabackup_cmd += ["--apply-log-only"]
xtrabackup_cmd += ["--target-dir", dst_dir]
LOG.debug("Running %s", " ".join(xtrabackup_cmd))
with Popen(xtrabackup_cmd, stdout=None, stderr=None) as xtrabackup_proc:
xtrabackup_proc.communicate()
ret = xtrabackup_proc.returncode
if ret:
LOG.error("%s exited with code %d", " ".join(xtrabackup_cmd), ret)
return ret == 0
except OSError as err:
raise TwinDBBackupError(f"Failed to prepare backup in {dst_dir}: {err}") from err
def _extract_xbstream(input_stream, working_dir, xbstream_binary=XBSTREAM_BINARY):
"""
Extract xbstream stream in directory
:param input_stream: The stream in xbstream format
:param working_dir: directory
:param xbstream_binary: Path to xbstream
:return: True if extracted successfully
"""
try:
cmd = [xbstream_binary, "-x"]
LOG.debug("Running %s", " ".join(cmd))
LOG.debug("Working directory: %s", working_dir)
LOG.debug("Xbstream binary: %s", xbstream_binary)
with Popen(cmd, stdin=input_stream, stdout=PIPE, stderr=PIPE, cwd=working_dir) as proc:
cout, cerr = proc.communicate()
ret = proc.returncode
if ret:
LOG.error("%s exited with code %d", " ".join(cmd), ret)
if cout:
LOG.error("STDOUT: %s", cout)
if cerr:
LOG.error("STDERR: %s", cerr)
return ret == 0
except OSError as err:
raise TwinDBBackupError(f"Failed to extract xbstream: {err}") from err
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
[docs]def restore_from_mysql_incremental(
stream,
dst_dir,
config,
tmp_dir=None,
xtrabackup_binary=XTRABACKUP_BINARY,
xbstream_binary=XBSTREAM_BINARY,
):
"""
Restore MySQL datadir from an incremental copy.
:param stream: Generator that provides backup copy
:param dst_dir: Path to destination directory. Must exist and be empty.
:type dst_dir: str
:param config: Tool configuration.
:type config: TwinDBBackupConfig
:param tmp_dir: Path to temp dir
:type tmp_dir: str
:param xtrabackup_binary: Path to xtrabackup binary.
:param xbstream_binary: Path to xbstream binary
:return: If success, return True
:rtype: bool
"""
inc_dir = tmp_dir or tempfile.mkdtemp()
# GPG modifier
if config.gpg:
gpg = Gpg(
stream,
config.gpg.recipient,
config.gpg.keyring,
secret_keyring=config.gpg.secret_keyring,
)
LOG.debug("Decrypting stream")
stream = gpg.revert_stream()
else:
LOG.debug("Not decrypting the stream")
if config.mysql.xtrabackup_binary:
xtrabackup_binary = config.mysql.xtrabackup_binary
stream = config.compression.get_modifier(stream).revert_stream()
with stream as handler:
if not _extract_xbstream(handler, inc_dir, xbstream_binary):
return False
try:
try:
xtrabackup_cmd = [
xtrabackup_binary,
f"--use-memory={get_free_memory()}",
"--prepare",
"--apply-log-only",
f"--target-dir={dst_dir}",
]
LOG.debug("Running %s", " ".join(xtrabackup_cmd))
with Popen(xtrabackup_cmd, stdout=None, stderr=None) as xtrabackup_proc:
xtrabackup_proc.communicate()
ret = xtrabackup_proc.returncode
if ret:
LOG.error("%s exited with code %d", " ".join(xtrabackup_cmd), ret)
return False
xtrabackup_cmd = [
xtrabackup_binary,
f"--use-memory={get_free_memory()}",
"--prepare",
f"--target-dir={dst_dir}",
f"--incremental-dir={inc_dir}",
]
LOG.debug("Running %s", " ".join(xtrabackup_cmd))
with Popen(xtrabackup_cmd, stdout=None, stderr=None) as xtrabackup_proc:
xtrabackup_proc.communicate()
ret = xtrabackup_proc.returncode
if ret:
LOG.error("%s exited with code %d", " ".join(xtrabackup_cmd), ret)
return ret == 0
except OSError as err:
LOG.error("Failed to prepare backup in %s: %s", dst_dir, err)
return False
finally:
try:
pass
except OSError as exc:
if exc.errno != errno.ENOENT: # ENOENT - no such file or directory
raise # re-raise exception
[docs]def gen_grastate(path, version, uuid, seqno):
"""
Generate and save grastate file.
:param path: Path to grastate file.
:param version: Galera version from grastate.dat.
:param uuid: UUID from grastate.dat.
:param seqno: seqno from grastate.dat.
"""
with open(path, "w", encoding=DEFAULT_FILE_ENCODING) as file_desc:
file_desc.write(
f"""# GALERA saved state
version: {version}
uuid: {uuid}
seqno: {seqno}
cert_index:
"""
)
[docs]def update_grastate(dst_dir, status, key):
"""
If xtrabackup_galera_info exists in the destination directory
then parse it and generate grastate.dat file.
:param dst_dir: Path to destination directory.
:type dst_dir: str
:param status: Backup status
:type status: dict
:param key: Backup name
:type key: str
"""
if osp.exists(osp.join(dst_dir, "xtrabackup_galera_info")):
version = status[key].wsrep_provider_version
with open(
osp.join(dst_dir, "xtrabackup_galera_info"),
encoding=DEFAULT_FILE_ENCODING,
) as galera_info:
galera_info = galera_info.read()
uuid = galera_info.split(":")[0]
seqno = galera_info.split(":")[1]
gen_grastate(dst_dir + "/grastate.dat", version, uuid, seqno)
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
[docs]def restore_from_mysql(
twindb_config,
copy: MySQLCopy,
dst_dir,
tmp_dir=None,
cache=None,
hostname=None,
):
"""
Restore MySQL datadir in a given directory
:param twindb_config: tool configuration
:type twindb_config: TwinDBBackupConfig
:param copy: Backup copy instance.
:type copy: MySQLCopy
:param dst_dir: Destination directory. Must exist and be empty.
:type dst_dir: str
:param tmp_dir: Path to temp directory
:type tmp_dir: str
:param cache: Local cache object.
:type cache: Cache
:param hostname: Hostname
:type hostname: str
"""
LOG.info("Restoring %s in %s", copy, dst_dir)
LOG.debug("Server vendor: %s", copy.server_vendor)
mkdir_p(dst_dir)
dst = None
restore_start = time.time()
keep_local_path = twindb_config.keep_local_path
if keep_local_path and osp.exists(osp.join(keep_local_path, copy.key)):
dst = Local(twindb_config.keep_local_path)
if not dst:
if not hostname:
hostname = copy.host
if not hostname:
raise DestinationError(f"Failed to get hostname from {copy}")
dst = twindb_config.destination(backup_source=hostname)
key = copy.key
status = MySQLStatus(dst=dst, status_directory=hostname)
stream = dst.get_stream(copy)
if status[key].type == "full":
cache_key = os.path.basename(key)
if cache:
if cache_key in cache:
# restore from cache
cache.restore_in(cache_key, dst_dir)
else:
restore_from_mysql_full(stream, dst_dir, twindb_config, redo_only=False)
cache.add(dst_dir, cache_key)
else:
restore_from_mysql_full(
stream,
dst_dir,
twindb_config,
redo_only=False,
xbstream_binary=copy.xbstream_binary,
xtrabackup_binary=copy.xtrabackup_binary,
)
else:
full_copy = status.candidate_parent(copy.run_type)
full_stream = dst.get_stream(full_copy)
LOG.debug("Full parent copy is %s", full_copy.key)
cache_key = os.path.basename(full_copy.key)
if cache:
if cache_key in cache:
# restore from cache
cache.restore_in(cache_key, dst_dir)
else:
restore_from_mysql_full(full_stream, dst_dir, twindb_config, redo_only=True)
cache.add(dst_dir, cache_key)
else:
restore_from_mysql_full(
full_stream,
dst_dir,
twindb_config,
redo_only=True,
xtrabackup_binary=copy.xtrabackup_binary,
xbstream_binary=copy.xbstream_binary,
)
restore_from_mysql_incremental(
stream,
dst_dir,
twindb_config,
tmp_dir,
xtrabackup_binary=copy.xtrabackup_binary,
xbstream_binary=copy.xbstream_binary,
)
config_dir = os.path.join(dst_dir, "_config")
for path, content in get_my_cnf(status, key):
config_sub_dir = os.path.join(config_dir, os.path.dirname(path).lstrip("/"))
mkdir_p(config_sub_dir, mode=0o755)
with open(
os.path.join(config_sub_dir, os.path.basename(path)),
"w",
encoding=DEFAULT_FILE_ENCODING,
) as mysql_config:
mysql_config.write(content)
update_grastate(dst_dir, status, key)
export_info(
twindb_config,
data=time.time() - restore_start,
category=ExportCategory.mysql,
measure_type=ExportMeasureType.restore,
)
LOG.info("Successfully restored %s in %s.", copy.key, dst_dir)
LOG.info(
"Now copy content of %s to MySQL datadir: cp -R %s /var/lib/mysql/",
dst_dir,
osp.join(dst_dir, "*"),
)
LOG.info("Fix permissions: chown -R mysql:mysql /var/lib/mysql/")
LOG.info(
"Make sure innodb_log_file_size and innodb_log_files_in_group "
"in %s/backup-my.cnf and in /etc/my.cnf are same.",
dst_dir,
)
if osp.exists(config_dir):
LOG.info("Original my.cnf is restored in %s.", config_dir)
LOG.info("Then you can start MySQL normally.")
[docs]def restore_from_file(twindb_config, copy, dst_dir):
"""
Restore a directory from a backup copy in the directory
:param twindb_config: tool configuration
:type twindb_config: TwinDBBackupConfig
:param copy: Instance of BaseCopy or and inheriting classes.
:type copy: BaseCopy
:param dst_dir: Path to destination directory. Must exist and be empty.
:type dst_dir: str
"""
LOG.info("Restoring %s in %s", copy.key, dst_dir)
mkdir_p(dst_dir)
restore_start = time.time()
keep_local_path = twindb_config.keep_local_path
if keep_local_path and os.path.exists(osp.join(keep_local_path, copy.key)):
dst = Local(osp.join(keep_local_path, copy.key))
stream = dst.get_stream(copy)
else:
dst = twindb_config.destination()
stream = dst.get_stream(copy)
# GPG modifier
if twindb_config.gpg:
gpg = Gpg(
stream,
twindb_config.gpg.recipient,
twindb_config.gpg.keyring,
secret_keyring=twindb_config.gpg.secret_keyring,
)
LOG.debug("Decrypting stream")
stream = gpg.revert_stream()
else:
LOG.debug("Not decrypting the stream")
with stream as handler:
try:
LOG.debug("handler type: %s", type(handler))
LOG.debug("stream type: %s", type(stream))
cmd = ["tar", "zvxf", "-"]
LOG.debug("Running %s", " ".join(cmd))
with Popen(cmd, stdin=handler, cwd=dst_dir) as proc:
cout, cerr = proc.communicate()
ret = proc.returncode
if ret:
LOG.error("%s exited with code %d", cmd, ret)
if cout:
LOG.error("STDOUT: %s", cout)
if cerr:
LOG.error("STDERR: %s", cerr)
return
LOG.info("Successfully restored %s in %s", copy.key, dst_dir)
except (OSError, DestinationError) as err:
LOG.error("Failed to decompress %s: %s", copy.key, err)
sys.exit(1)
export_info(
twindb_config,
data=time.time() - restore_start,
category=ExportCategory.files,
measure_type=ExportMeasureType.restore,
)