# -*- coding: utf-8 -*-
"""
Module defines clone feature
"""
from multiprocessing import Process
import time
from twindb_backup import INTERVALS, LOG
from twindb_backup.destination.ssh import Ssh
from twindb_backup.exceptions import OperationError
from twindb_backup.source.mysql_source import MySQLConnectInfo, MySQLMasterInfo
from twindb_backup.source.remote_mysql_source import RemoteMySQLSource
from twindb_backup.ssh.exceptions import SshClientException
from twindb_backup.util import split_host_port
def _mysql_service(dst, action):
"""Start or stop MySQL service
:param dst: Destination server
:type dst: Ssh
:param action: string start or stop
:type action: str
"""
for service in ["mysqld", "mysql"]:
try:
return dst.execute_command(
"PATH=$PATH:/sbin sudo service %s %s" % (service, action),
quiet=True,
)
except SshClientException as err:
LOG.debug(err)
try:
LOG.warning(
"Failed to %s MySQL with an init script. " "Will try to %s mysqld.",
action,
action,
)
if action == "start":
ret = dst.execute_command(
"PATH=$PATH:/sbin sudo bash -c 'nohup mysqld &'",
background=True,
)
time.sleep(10)
return ret
elif action == "stop":
return dst.execute_command(
"PATH=$PATH:/sbin sudo kill $(pidof mysqld)"
)
except SshClientException as err:
LOG.error(err)
raise OperationError("Failed to %s MySQL on %r" % (action, dst))
[docs]def clone_mysql(
cfg,
source,
destination, # pylint: disable=too-many-arguments
replication_user,
replication_password,
netcat_port=9990,
compress=False,
):
"""Clone mysql backup of remote machine and stream it to slave
:param cfg: TwinDB Backup tool config
:type cfg: TwinDBBackupConfig
"""
LOG.debug("Remote MySQL Source: %s", split_host_port(source)[0])
LOG.debug("MySQL defaults: %s", cfg.mysql.defaults_file)
LOG.debug("SSH username: %s", cfg.ssh.user)
LOG.debug("SSH key: %s", cfg.ssh.key)
src = RemoteMySQLSource(
{
"ssh_host": split_host_port(source)[0],
"ssh_user": cfg.ssh.user,
"ssh_key": cfg.ssh.key,
"mysql_connect_info": MySQLConnectInfo(
cfg.mysql.defaults_file, hostname=split_host_port(source)[0]
),
"run_type": INTERVALS[0],
"backup_type": "full",
}
)
xbstream_binary = cfg.mysql.xbstream_binary
LOG.debug("SSH destination: %s", split_host_port(destination)[0])
LOG.debug("SSH username: %s", cfg.ssh.user)
LOG.debug("SSH key: %s", cfg.ssh.key)
dst = Ssh(
"/tmp",
ssh_host=split_host_port(destination)[0],
ssh_user=cfg.ssh.user,
ssh_key=cfg.ssh.key,
)
datadir = src.datadir
LOG.debug("datadir: %s", datadir)
if dst.list_files(datadir):
LOG.error("Destination datadir is not empty: %s", datadir)
exit(1)
_run_remote_netcat(
compress, datadir, destination, dst, netcat_port, src, xbstream_binary
)
LOG.debug("Copying MySQL config to the destination")
src.clone_config(dst)
LOG.debug("Remote MySQL destination: %s", split_host_port(destination)[0])
LOG.debug("MySQL defaults: %s", cfg.mysql.defaults_file)
LOG.debug("SSH username: %s", cfg.ssh.user)
LOG.debug("SSH key: %s", cfg.ssh.key)
dst_mysql = RemoteMySQLSource(
{
"ssh_host": split_host_port(destination)[0],
"ssh_user": cfg.ssh.user,
"ssh_key": cfg.ssh.key,
"mysql_connect_info": MySQLConnectInfo(
cfg.mysql.defaults_file,
hostname=split_host_port(destination)[0],
),
"run_type": INTERVALS[0],
"backup_type": "full",
}
)
binlog, position = dst_mysql.apply_backup(datadir)
LOG.debug("Binlog coordinates: (%s, %d)", binlog, position)
LOG.debug("Starting MySQL on the destination")
_mysql_service(dst, action="start")
LOG.debug("MySQL started")
LOG.debug("Setting up replication.")
LOG.debug("Master host: %s", source)
LOG.debug("Replication user: %s", replication_user)
LOG.debug("Replication password: %s", replication_password)
dst_mysql.setup_slave(
MySQLMasterInfo(
host=split_host_port(source)[0],
port=split_host_port(source)[1],
user=replication_user,
password=replication_password,
binlog=binlog,
binlog_pos=position,
)
)
def _run_remote_netcat(
compress,
datadir, # pylint: disable=too-many-arguments
destination,
dst,
netcat_port,
src,
xbstream_path,
):
netcat_cmd = "{xbstream_binary} -x -C {datadir}".format(
xbstream_binary=xbstream_path, datadir=datadir
)
if compress:
netcat_cmd = "gunzip -c - | %s" % netcat_cmd
# find unused port
while netcat_port < 64000:
if dst.ensure_tcp_port_listening(netcat_port, wait_timeout=1):
netcat_port += 1
else:
LOG.debug("Will use port %d for streaming", netcat_port)
break
proc_netcat = Process(
target=dst.netcat, args=(netcat_cmd,), kwargs={"port": netcat_port}
)
LOG.debug("Starting netcat on the destination")
proc_netcat.start()
nc_wait_timeout = 10
if not dst.ensure_tcp_port_listening(
netcat_port, wait_timeout=nc_wait_timeout
):
LOG.error(
"netcat on the destination " "is not ready after %d seconds",
nc_wait_timeout,
)
proc_netcat.terminate()
exit(1)
src.clone(
dest_host=split_host_port(destination)[0],
port=netcat_port,
compress=compress,
)
proc_netcat.join()