# -*- coding: utf-8 -*-
"""
Module defines MySQL source class for backing up remote MySQL.
"""
import configparser
import os
import re
import socket
import struct
from contextlib import contextmanager
from errno import ENOENT
from os import path as osp
from pathlib import Path
import pymysql
from twindb_backup import LOG, MY_CNF_COMMON_PATHS
from twindb_backup.source.exceptions import RemoteMySQLSourceError
from twindb_backup.source.mysql_source import MySQLSource
from twindb_backup.ssh.client import SshClient
from twindb_backup.ssh.exceptions import SshClientException
[docs]class RemoteMySQLSource(MySQLSource):
"""Remote MySQLSource class"""
def __init__(self, kwargs):
ssh_kwargs = {}
for arg in ["ssh_host", "ssh_port", "ssh_user", "ssh_key"]:
if arg in kwargs:
ssh_kwargs[arg.replace("ssh_", "")] = kwargs.pop(arg)
self._ssh_client = SshClient(**ssh_kwargs)
super(RemoteMySQLSource, self).__init__(**kwargs)
[docs] @contextmanager
def get_stream(self):
raise NotImplementedError("Method get_stream not implemented")
[docs] def clone(self, dest_host, port, compress=False):
"""
Send backup to destination host
:param dest_host: Destination host
:type dest_host: str
:param port: Port to sending backup
:type port: int
:param compress: If True compress stream
:type compress: bool
:raise RemoteMySQLSourceError: if any error
"""
error_log = osp.join(
os.sep,
"tmp",
f"{self._ssh_client.host}_{self._ssh_client.port}-" f"{dest_host}_{port}-error.log",
)
xb_cmd = [
self._xtrabackup,
]
local_defaults = osp.expanduser(self._connect_info.defaults_file)
remote_defaults = None
try:
if osp.exists(local_defaults):
with open(local_defaults) as fp:
remote_defaults = self._ssh_client.execute("mktemp")[0].strip()
LOG.debug(
"Writing content of %s to %s.",
local_defaults,
remote_defaults,
)
self._ssh_client.write_content(remote_defaults, fp.read())
xb_cmd.append(f"--defaults-file={remote_defaults}")
xb_cmd.extend(
[
"--stream=xbstream",
"--host=127.0.0.1",
"--backup",
"--target-dir",
"./",
]
)
cmd = [" ".join(xb_cmd) + f" 2> {error_log}"]
if compress:
cmd.append("gzip -c -")
cmd.append(f"ncat {dest_host} {port} --send-only")
cmd_line = "|".join(cmd)
remote_cmd = f"bash -c 'set -o pipefail; sudo {cmd_line}'"
return self._ssh_client.execute(remote_cmd)
finally:
if remote_defaults:
self._ssh_client.execute(f"rm -f '{remote_defaults}'")
[docs] def clone_config(self, dst):
"""
Clone config to destination server
:param dst: Destination server
:type dst: Ssh
"""
cfg_path = self._get_root_my_cnf()
LOG.debug("Root my.cnf is: %s", cfg_path)
self._save_cfg(dst, cfg_path)
def _find_all_cnf(self, root_path):
"""Return list of embed cnf files
:param root_path: Path to the originating my.cnf config.
(/etc/my.cnf, or /etc/mysql/my.cnf)
:type root_path: Path
:return: List of all included my.cnf files
:rtype: list
"""
files = [str(root_path)]
cfg_content = self._ssh_client.get_text_content(str(root_path))
for line in cfg_content.splitlines():
if "!includedir" in line:
rel_path = line.split()[1]
file_list = self._ssh_client.list_files(
str(root_path.parent.joinpath(rel_path)),
recursive=False,
files_only=True,
)
LOG.debug(file_list)
for sub_file in file_list:
files.extend(self._find_all_cnf(root_path.parent.joinpath(rel_path).joinpath(sub_file)))
elif "!include" in line:
rel_path = line.split()[1]
files.extend(self._find_all_cnf(root_path.parent.joinpath(rel_path)))
return files
@staticmethod
def _find_server_id_by_path(cfg):
"""Find path with server_id"""
options = ["server_id", "server-id"]
for option in options:
try:
if cfg.has_option("mysqld", option):
return option
except configparser.Error:
pass
return None
def _save_cfg(self, dst, root_cfg):
"""Save configs on destination recursively"""
files = self._find_all_cnf(Path(root_cfg))
server_id = self._get_server_id(dst.host)
is_server_id_set = False
valid_cfg = []
for path in files:
try:
cfg = self._get_config(path)
option = self._find_server_id_by_path(cfg)
if option:
cfg.set("mysqld", option, value=str(server_id))
is_server_id_set = True
dst.client.write_config(path, cfg)
valid_cfg.append(path)
except configparser.ParsingError:
cfg_content = self._ssh_client.get_text_content(path)
dst.client.write_content(path, cfg_content)
if not is_server_id_set:
for path in valid_cfg:
cfg = self._get_config(path)
if cfg.has_section("mysqld"):
cfg.set("mysqld", "server_id", value=str(server_id))
dst.client.write_config(path, cfg)
return
def _get_root_my_cnf(self):
"""Return root my.cnf path"""
for cfg_path in MY_CNF_COMMON_PATHS:
try:
self._ssh_client.get_text_content(cfg_path)
return cfg_path
except SshClientException:
continue
except IOError as err:
if err.errno == ENOENT:
continue
else:
raise
raise OSError("Root my.cnf not found")
def _get_config(self, cfg_path):
"""
Return parsed config
:param cfg_path: Path to config
:type cfg_path: str
:return: Path and config
:rtype: ConfigParser.ConfigParser
"""
cfg = configparser.ConfigParser(allow_no_value=True)
try:
cmd = "cat %s" % cfg_path
with self._ssh_client.get_remote_handlers(cmd) as (_, cout, _):
cfg.read_string(cout.read().decode())
except configparser.ParsingError as err:
LOG.error(err)
raise
return cfg
[docs] def setup_slave(self, master_info): # noqa # pylint: disable=too-many-arguments
"""
Change master
:param master_info: Master details.
:type master_info: MySQLMasterInfo
"""
try:
with self._cursor() as cursor:
query = (
"CHANGE MASTER TO "
"MASTER_HOST = '{master}', "
"MASTER_USER = '{user}', "
"MASTER_PORT = {port}, "
"MASTER_PASSWORD = '{password}', "
"MASTER_LOG_FILE = '{binlog}', "
"MASTER_LOG_POS = {binlog_pos}".format(
master=master_info.host,
user=master_info.user,
password=master_info.password,
binlog=master_info.binlog,
binlog_pos=master_info.binlog_position,
port=master_info.port,
)
)
cursor.execute(query)
cursor.execute("START SLAVE")
return True
except pymysql.Error as err:
LOG.debug(err)
return False
[docs] def apply_backup(self, datadir):
"""
Apply backup of destination server
:param datadir: Path to datadir
:return: Binlog file name and position
:rtype: tuple
:raise RemoteMySQLSourceError: if any error.
"""
try:
use_memory = "--use-memory %d" % int(self._mem_available() / 2)
except OSError:
use_memory = ""
logfile_path = "/tmp/xtrabackup-apply-log.log"
cmd = (
"sudo {xtrabackup} --prepare --apply-log-only "
"--target-dir {target_dir} {use_memory} "
"> {logfile} 2>&1"
"".format(
xtrabackup=self._xtrabackup,
target_dir=datadir,
use_memory=use_memory,
logfile=logfile_path,
)
)
try:
self._ssh_client.execute(cmd)
self._ssh_client.execute("sudo chown -R mysql %s" % datadir)
return self._get_binlog_info(datadir)
except SshClientException as err:
LOG.debug("###### Logfile BEGIN.")
LOG.debug(self._ssh_client.get_text_content(logfile_path))
LOG.debug("###### Logfile END.")
raise RemoteMySQLSourceError(err)
def _mem_available(self):
"""
Get available memory size
:return: Size of available memory in bytes
:rtype: int
:raise OSError: if can' detect memory
"""
# https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0a
stdout_, _ = self._ssh_client.execute(
"awk -v low=$(grep low /proc/zoneinfo | "
"awk '{k+=$2}END{print k}') "
"'{a[$1]=$2}END{m="
'a["MemFree:"]'
'+a["Active(file):"]'
'+a["Inactive(file):"]'
'+a["SReclaimable:"]; '
'print a["MemAvailable:"]}\' '
"/proc/meminfo"
)
mem = stdout_.strip()
if not mem:
raise OSError("Cant get available mem")
free_mem = int(mem) * 1024
return free_mem
@staticmethod
def _get_server_id(host):
"""Determinate server id"""
try:
server_id = struct.unpack("!I", socket.inet_aton(host))[0]
except socket.error:
server_ip = socket.gethostbyname(host)
server_id = struct.unpack("!I", socket.inet_aton(server_ip))[0]
return server_id
def _get_binlog_info(self, backup_path):
"""Get binlog coordinates from an xtrabackup_binlog_info.
:param backup_path: Path where to look for xtrabackup_binlog_info.
:type backup_path: str
:return: Tuple with binlog coordinates - (file_name, pos)
:rtype: tuple
"""
stdout_, _ = self._ssh_client.execute("sudo cat %s/xtrabackup_binlog_info" % backup_path)
binlog_info = re.split(r"\t+", stdout_.rstrip())
return binlog_info[0], int(binlog_info[1])