Source code for twindb_backup.source.mysql_source

"""
Module defines MySQL source class for backing up local MySQL.
"""
from __future__ import print_function

import os
import sys
import tempfile
import time
from contextlib import contextmanager
from enum import Enum
from os import path as osp
from subprocess import PIPE, Popen

import pymysql
from pymysql import OperationalError
from pymysql.cursors import DictCursor

from twindb_backup import INTERVALS, LOG, XTRABACKUP_BINARY, get_files_to_delete
from twindb_backup.source.base_source import BaseSource
from twindb_backup.source.exceptions import MySQLSourceError
from twindb_backup.status.exceptions import StatusKeyNotFound


[docs]class MySQLConnectInfo(object): # pylint: disable=too-few-public-methods """MySQL connection's details""" def __init__( self, defaults_file, connect_timeout=10, cursor=DictCursor, hostname="127.0.0.1", ): self.cursor = cursor self.connect_timeout = connect_timeout self.defaults_file = defaults_file self.hostname = hostname def __eq__(self, other): return all( ( (self.cursor == other.cursor), (self.connect_timeout == other.connect_timeout), (self.defaults_file == other.defaults_file), (self.hostname == other.hostname), ) )
[docs]class MySQLMasterInfo(object): # pylint: disable=too-few-public-methods """MySQL master details""" def __init__( self, host, port, # pylint: disable=too-many-arguments user, password, binlog, binlog_pos, ): self.host = host self.user = user self.password = password self.binlog = binlog self.binlog_position = binlog_pos self.port = port or 3306
[docs]class MySQLFlavor(str, Enum): ORACLE = "oracle" PERCONA = "percona" MARIADB = "mariadb" def __eq__(self, other): return self.value == other def __hash__(self): return hash(self.value)
[docs]class MySQLClient: """Class to send queries to MySQL""" def __init__(self, defaults_file, connect_timeout=10, hostname="127.0.0.1"): self.connect_timeout = connect_timeout self.defaults_file = defaults_file self.hostname = hostname @property def server_vendor(self) -> MySQLFlavor: if "mariadb" in self.variable("version").lower(): return MySQLFlavor.MARIADB elif "percona" in self.variable("version_comment").lower(): return MySQLFlavor.PERCONA return MySQLFlavor.ORACLE
[docs] @contextmanager def get_connection(self): """ Connect to MySQL host and yield a connection. :return: MySQL connection :raise MySQLSourceError: if can't connect to server """ connection = None try: connection = pymysql.connect( host=self.hostname, read_default_file=self.defaults_file, connect_timeout=self.connect_timeout, cursorclass=DictCursor, ) yield connection except OperationalError: LOG.error("Can't connect to MySQL server on %s", self.hostname) raise MySQLSourceError("Can't connect to MySQL server on %s" % self.hostname) finally: if connection: connection.close()
[docs] @contextmanager def cursor(self): """MySQL cursor for connection to local MySQL instance.""" with self.get_connection() as connection: with connection.cursor() as cursor: yield cursor
[docs] def variable(self, varname): """Read MySQL variable and return its value""" with self.cursor() as cursor: cursor.execute("SELECT @@%s AS varname" % varname) row = cursor.fetchone() return row["varname"]
[docs]class MySQLSource(BaseSource): # pylint: disable=too-many-instance-attributes """MySQLSource class""" def __init__(self, mysql_connect_info, run_type, backup_type, **kwargs): """ MySQLSource constructor :param mysql_connect_info: MySQL connection details :type mysql_connect_info: MySQLConnectInfo :param run_type: daily, weekly, etc :param backup_type: full or incremental :type backup_type: str :param dst: """ if run_type not in INTERVALS: raise MySQLSourceError("Incorrect run type %r" % run_type) self._parent_lsn = kwargs.get("parent_lsn", None) class _BackupInfo(object): # pylint: disable=too-few-public-methods """class to store details about backup copy""" def __init__(self, lsn=None, binlog_coordinate=None): self.lsn = lsn self.binlog_coordinate = binlog_coordinate # MySQL if not isinstance(mysql_connect_info, MySQLConnectInfo): raise MySQLSourceError("mysql_connect_info must be " "instance of MySQLConnectInfo") self._connect_info = mysql_connect_info self._backup_info = _BackupInfo() if backup_type in ["full", "incremental"]: self._type = backup_type else: raise MySQLSourceError("Unrecognized backup type %s" % backup_type) self._suffix = "xbstream" self._media_type = "mysql" self._file_name_prefix = "mysql" self.dst = kwargs.get("dst", None) self._xtrabackup = kwargs.get("xtrabackup_binary") or XTRABACKUP_BINARY super(MySQLSource, self).__init__(run_type) @property def backup_tool(self): """The tool binary that is used for backups""" return self._xtrabackup @property def binlog_coordinate(self): """ Binary log coordinate up to that backup is taken :return: file name and position :rtype: tuple """ return self._backup_info.binlog_coordinate @property def lsn(self): """ The latest LSN of the taken backup :return: LSN :rtype: int """ return self._backup_info.lsn
[docs] @contextmanager def get_stream(self): """ Get a PIPE handler with content of the source :return: """ cmd = [ self._xtrabackup, "--defaults-file=%s" % self._connect_info.defaults_file, "--stream=xbstream", "--host=127.0.0.1", "--backup", ] cmd += ["--target-dir", "."] if self.is_galera(): cmd.append("--galera-info") cmd.append("--no-backup-locks") if self.incremental: cmd += [ "--incremental-basedir", ".", "--incremental-lsn=%d" % self._parent_lsn, ] # If this is a Galera node then additional step needs to be taken to # prevent the backups from locking up the cluster. wsrep_desynced = False LOG.debug("Running %s", " ".join(cmd)) stderr_file = tempfile.NamedTemporaryFile(delete=False) try: if self.is_galera(): wsrep_desynced = self.enable_wsrep_desync() LOG.debug("Running %s", " ".join(cmd)) proc_xtrabackup = Popen(cmd, stderr=stderr_file, stdout=PIPE) yield proc_xtrabackup.stdout proc_xtrabackup.communicate() if proc_xtrabackup.returncode: LOG.error( "Failed to run xtrabackup. Check error output in %s", stderr_file.name, ) try: if LOG.debug_enabled: with open(stderr_file.name) as xb_out: for line in xb_out: print(line, end="", file=sys.stderr) except AttributeError: pass self.dst.delete(self.get_name()) exit(1) else: LOG.debug("Successfully streamed xtrabackup output") self._update_backup_info(stderr_file) except OSError as err: LOG.error("Failed to run %s: %s", " ".join(cmd), err) LOG.error( "Make sure that xtrabackup package is installed and %s " "is available in $PATH", self._xtrabackup, ) exit(1) finally: if wsrep_desynced: self.disable_wsrep_desync()
def _handle_failure_exec(self, err, stderr_file): """Cleanup on failure exec""" LOG.error(err) LOG.error( "Failed to run xtrabackup. " "Check error output in %s", stderr_file.name, ) self.dst.delete(self.get_name()) exit(1) def _update_backup_info(self, stderr_file): """Update backup_info from stderr""" LOG.debug("xtrabackup error log file %s", stderr_file.name) self._backup_info.lsn = self._get_lsn(stderr_file.name) self._backup_info.binlog_coordinate = self.get_binlog_coordinates(stderr_file.name) os.unlink(stderr_file.name)
[docs] def get_name(self): """ Generate relative destination file name :return: file name """ return self._get_name("mysql")
[docs] def apply_retention_policy(self, dst, config, run_type, status): """ Delete old backup copies. :param dst: Destination where the backups are stored. :type dst: BaseDestination :param config: Tool configuration :type config: TwinDBBackupConfig :param run_type: Run type. :type run_type: str :param status: Backups status. :type status: Status :return: Updated status. :rtype: Status """ prefix = osp.join(dst.remote_path, self.get_prefix(), "mysql") keep_copies = getattr(config.retention, run_type) backups_list = dst.list_files(prefix, files_only=True) LOG.debug("Remote copies: %r", backups_list) for backup_file in get_files_to_delete(backups_list, keep_copies): LOG.debug("Deleting remote file %s", backup_file) dst.delete(backup_file) try: status.remove(backup_file) except StatusKeyNotFound as err: LOG.warning(err) LOG.debug("Status: %r", status) self._delete_local_files("mysql", config) return status
[docs] @staticmethod def get_binlog_coordinates(err_log_path): """ Parse innobackupex log and return binary log coordinate :param err_log_path: path to the innobackupex log :type err_log_path: str :return: Binlog coordinate. :rtype: tuple """ with open(err_log_path) as error_log: for line in error_log: if line.startswith("MySQL binlog position:"): filename = line.split()[4].strip(",'") position = int(line.split()[6].strip(",'")) return filename, position return None, None
@staticmethod def _get_lsn(err_log_path): """Find LSN up to which the backup is taken :param err_log_path: path to Innobackupex error log :return: lsn :rtype: int """ with open(err_log_path) as error_log: for line in error_log: pattern = "xtrabackup: The latest check point (for incremental):" if line.startswith(pattern): lsn = line.split()[7].strip("'") return int(lsn) elif "The latest check point (for incremental):" in line: idx = 10 if "mariabackup" in line else 11 return int(line.split()[idx].strip("'")) raise MySQLSourceError("Could not find LSN in XtraBackup error output %s" % err_log_path) @property def full(self): """ Check if the backup copy is a full copy. :return: True if it's a full copy. :rtype: bool """ return self.type == "full" @property def incremental(self): """ Check if the backup copy is an incremental copy. :return: True if it's an incremental copy. :rtype: bool """ return not self.full @property def type(self): """Get backup copy type - full or incremental :return: 'full' or 'incremental' :rtype: str """ return self._type @property def status(self): """Backup status on a destination :return: Backups status :rtype: dict """ return self.dst.status()
[docs] def enable_wsrep_desync(self): """ Try to enable wsrep_desync :return: True if wsrep_desync was enabled. False if not supported """ try: with self.get_connection() as connection: with connection.cursor() as cursor: cursor.execute("SET GLOBAL wsrep_desync=ON") return True except pymysql.Error as err: LOG.debug(err) return False
[docs] def disable_wsrep_desync(self): """ Wait till wsrep_local_recv_queue is zero and disable wsrep_local_recv_queue then """ max_time = time.time() + 900 try: with self.get_connection() as connection: with connection.cursor() as cursor: while time.time() < max_time: cursor.execute("SHOW GLOBAL STATUS LIKE " "'wsrep_local_recv_queue'") res = {r["Variable_name"].lower(): r["Value"].lower() for r in cursor.fetchall()} if not res.get("wsrep_local_recv_queue"): raise Exception("Unknown status variable " '"wsrep_local_recv_queue"') if int(res["wsrep_local_recv_queue"]) == 0: break time.sleep(1) LOG.debug("Disabling wsrep_desync") cursor.execute("SET GLOBAL wsrep_desync=OFF") except pymysql.Error as err: LOG.error(err)
@property def wsrep_provider_version(self): """ Parse Galera version from wsrep_provider_version. :return: Galera version :rtype: str """ with self._cursor() as cursor: cursor.execute("SHOW STATUS LIKE 'wsrep_provider_version'") res = {row["Variable_name"].lower(): row["Value"].lower() for row in cursor.fetchall()} if res.get("wsrep_provider_version"): return res["wsrep_provider_version"].split("(")[0] return None @property def galera(self): """Check if local MySQL instance is a Galera cluster :return: True if it's a Galera. :rtype: bool """ return self.is_galera()
[docs] def is_galera(self): """Check if local MySQL instance is a Galera cluster :return: True if it's a Galera. :rtype: bool """ try: with self._cursor() as cursor: cursor.execute("SELECT @@wsrep_on as wsrep_on") row = cursor.fetchone() return str(row["wsrep_on"]).lower() == "1" or str(row["wsrep_on"]).lower() == "on" except ( pymysql.InternalError, OperationalError, MySQLSourceError, ) as err: error_code = err.args[0] error_message = err.args[1] if error_code == 1193: LOG.debug("Galera is not supported or not enabled") return False else: LOG.error(error_message) raise
@property def datadir(self): """Return datadir path on MySQL server""" with self._cursor() as cursor: cursor.execute("SELECT @@datadir AS datadir") row = cursor.fetchone() return row["datadir"] @property def server_vendor(self) -> MySQLFlavor: return MySQLClient( self._connect_info.defaults_file, connect_timeout=self._connect_info.connect_timeout, hostname=self._connect_info.hostname, ).server_vendor
[docs] @contextmanager def get_connection(self): """ Connect to MySQL host and yield a connection. :return: MySQL connection :raise MySQLSourceError: if it can't connect to server """ connection = None try: connection = pymysql.connect( host=self._connect_info.hostname, read_default_file=self._connect_info.defaults_file, connect_timeout=self._connect_info.connect_timeout, cursorclass=self._connect_info.cursor, ) yield connection except OperationalError as err: LOG.error( "Can't connect to MySQL server on %s", self._connect_info.hostname, ) raise MySQLSourceError(*err.args) from err finally: if connection: connection.close()
@contextmanager def _cursor(self): with self.get_connection() as connection: with connection.cursor() as cursor: yield cursor