Source code for twindb_backup.clone

# -*- coding: utf-8 -*-
"""
Module defines clone feature
"""
from multiprocessing import Process

import time

from twindb_backup import INTERVALS, LOG
from twindb_backup.destination.ssh import Ssh
from twindb_backup.exceptions import OperationError
from twindb_backup.source.mysql_source import MySQLConnectInfo, MySQLMasterInfo
from twindb_backup.source.remote_mysql_source import RemoteMySQLSource
from twindb_backup.ssh.exceptions import SshClientException
from twindb_backup.util import split_host_port


def _mysql_service(dst, action):
    """Start or stop MySQL service

    :param dst: Destination server
    :type dst: Ssh
    :param action: string start or stop
    :type action: str
    """
    for service in ['mysqld', 'mysql']:
        try:
            return dst.execute_command(
                "PATH=$PATH:/sbin sudo service %s %s" % (service, action),
                quiet=True
            )
        except SshClientException as err:
            LOG.debug(err)

    try:
        LOG.warning('Failed to %s MySQL with an init script. '
                    'Will try to %s mysqld.', action, action)
        if action == "start":
            ret = dst.execute_command(
                "PATH=$PATH:/sbin sudo bash -c 'nohup mysqld &'",
                background=True
            )
            time.sleep(10)
            return ret
        elif action == "stop":
            return dst.execute_command(
                "PATH=$PATH:/sbin sudo kill $(pidof mysqld)"
            )
    except SshClientException as err:
        LOG.error(err)
        raise OperationError(
            'Failed to %s MySQL on %r'
            % (action, dst)
        )


[docs]def clone_mysql(cfg, source, destination, # pylint: disable=too-many-arguments replication_user, replication_password, netcat_port=9990, compress=False): """Clone mysql backup of remote machine and stream it to slave :param cfg: TwinDB Backup tool config :type cfg: TwinDBBackupConfig """ LOG.debug('Remote MySQL Source: %s', split_host_port(source)[0]) LOG.debug( 'MySQL defaults: %s', cfg.mysql.defaults_file ) LOG.debug( 'SSH username: %s', cfg.ssh.user ) LOG.debug( 'SSH key: %s', cfg.ssh.key ) src = RemoteMySQLSource( { "ssh_host": split_host_port(source)[0], "ssh_user": cfg.ssh.user, "ssh_key": cfg.ssh.key, "mysql_connect_info": MySQLConnectInfo( cfg.mysql.defaults_file, hostname=split_host_port(source)[0]), "run_type": INTERVALS[0], "backup_type": 'full' } ) xbstream_binary = cfg.mysql.xbstream_binary LOG.debug('SSH destination: %s', split_host_port(destination)[0]) LOG.debug('SSH username: %s', cfg.ssh.user) LOG.debug('SSH key: %s', cfg.ssh.key) dst = Ssh( '/tmp', ssh_host=split_host_port(destination)[0], ssh_user=cfg.ssh.user, ssh_key=cfg.ssh.key ) datadir = src.datadir LOG.debug('datadir: %s', datadir) if dst.list_files(datadir): LOG.error("Destination datadir is not empty: %s", datadir) exit(1) _run_remote_netcat( compress, datadir, destination, dst, netcat_port, src, xbstream_binary ) LOG.debug('Copying MySQL config to the destination') src.clone_config(dst) LOG.debug('Remote MySQL destination: %s', split_host_port(destination)[0]) LOG.debug( 'MySQL defaults: %s', cfg.mysql.defaults_file ) LOG.debug('SSH username: %s', cfg.ssh.user) LOG.debug('SSH key: %s', cfg.ssh.key) dst_mysql = RemoteMySQLSource({ "ssh_host": split_host_port(destination)[0], "ssh_user": cfg.ssh.user, "ssh_key": cfg.ssh.key, "mysql_connect_info": MySQLConnectInfo( cfg.mysql.defaults_file, hostname=split_host_port(destination)[0] ), "run_type": INTERVALS[0], "backup_type": 'full' }) binlog, position = dst_mysql.apply_backup(datadir) LOG.debug('Binlog coordinates: (%s, %d)', binlog, position) LOG.debug('Starting MySQL on the destination') _mysql_service(dst, action='start') LOG.debug('MySQL started') LOG.debug('Setting up replication.') LOG.debug('Master host: %s', source) LOG.debug('Replication user: %s', replication_user) LOG.debug('Replication password: %s', replication_password) dst_mysql.setup_slave( MySQLMasterInfo( host=split_host_port(source)[0], port=split_host_port(source)[1], user=replication_user, password=replication_password, binlog=binlog, binlog_pos=position ) )
def _run_remote_netcat(compress, datadir, # pylint: disable=too-many-arguments destination, dst, netcat_port, src, xbstream_path): netcat_cmd = "{xbstream_binary} -x -C {datadir}".format( xbstream_binary=xbstream_path, datadir=datadir ) if compress: netcat_cmd = "gunzip -c - | %s" % netcat_cmd # find unused port while netcat_port < 64000: if dst.ensure_tcp_port_listening(netcat_port, wait_timeout=1): netcat_port += 1 else: LOG.debug('Will use port %d for streaming', netcat_port) break proc_netcat = Process( target=dst.netcat, args=(netcat_cmd,), kwargs={ 'port': netcat_port } ) LOG.debug('Starting netcat on the destination') proc_netcat.start() nc_wait_timeout = 10 if not dst.ensure_tcp_port_listening(netcat_port, wait_timeout=nc_wait_timeout): LOG.error('netcat on the destination ' 'is not ready after %d seconds', nc_wait_timeout) proc_netcat.terminate() exit(1) src.clone( dest_host=split_host_port(destination)[0], port=netcat_port, compress=compress ) proc_netcat.join()