Source code for twindb_backup.source.binlog_source

Module defines MySQL binlog source class for backing them up.
import struct
from contextlib import contextmanager
from os import path as osp
from subprocess import PIPE, Popen

from twindb_backup import LOG
from twindb_backup.source.base_source import BaseSource
from twindb_backup.source.exceptions import BinlogSourceError

[docs]class BinlogV4Event(object): # pylint: disable=too-few-public-methods """ MySQL Binary log event. """ def __init__(self, **kwargs): self.timestamp = kwargs.get("timestamp") self.type_code = kwargs.get("type_code") self.server_id = kwargs.get("server_id") self.event_length = kwargs.get("event_length") self.curr_position = kwargs.get("curr_position") self.next_position = kwargs.get("next_position") self.flags = kwargs.get("flags")
[docs]class BinlogParser(object): """ Class parses a binlog file. :param binlog: path to a binlog file. :type binlog: str """ def __init__(self, binlog): self._binlog = binlog @property def name(self): """Binlog base name""" return osp.basename(self._binlog) @property def created_at(self): """Timestamp when the binlog was created""" try: with open(self._binlog, "rb") as binlog_descriptor: self.__read_magic_number(binlog_descriptor) return self.__read_int(binlog_descriptor, 4) except IOError as err: raise BinlogSourceError("Failed to read the 'created_at' attribute: %s" % err) @property def start_position(self): """Minimal position in the binlog""" return 4 @property def end_position(self): """Last position in the binlog""" last_position = self.start_position with open(self._binlog, "rb") as binlog_descriptor: self.__read_magic_number(binlog_descriptor) while True: event = self.__read_binlog_event(binlog_descriptor) if event: last_position = event.curr_position else: break return last_position @staticmethod def __read_magic_number(fdesc): return @staticmethod def __read_int(fdesc, n_bytes): if n_bytes == 4: return struct.unpack("i",[0] elif n_bytes == 2: return struct.unpack("h",[0] elif n_bytes == 1: return struct.unpack("b",[0] else: raise NotImplementedError("Reading %d bytes integer is unsupported" % n_bytes) def __read_binlog_event(self, binlog_descriptor): """ Read binlog event from file descriptor :param binlog_descriptor: File descriptor :return: Binlog event :rtype: BinlogV4Event """ position = binlog_descriptor.tell() try: event = BinlogV4Event( timestamp=self.__read_int(binlog_descriptor, 4), type_code=self.__read_int(binlog_descriptor, 1), server_id=self.__read_int(binlog_descriptor, 4), event_length=self.__read_int(binlog_descriptor, 4), curr_position=position, next_position=self.__read_int(binlog_descriptor, 4), flags=self.__read_int(binlog_descriptor, 2), ) - 19) return event except struct.error: return None
[docs]class BinlogSource(BaseSource): """ MySQL Binlog source. :param run_type: The backup copy interval. hourly, daily, etc. :type run_type: str :param mysql_client: Instance that can be used to execute queries in MySQL. :type mysql_client: MySQLClient :param binlog_file: Name of the binlog file as it appears in ``SHOW BINARY LOGS``. :type binlog_file: str """ def __init__(self, run_type, mysql_client, binlog_file=None): super(BinlogSource, self).__init__(run_type) self._mysql_client = mysql_client self._media_type = "binlog" self._binlog_file = binlog_file self.suffix = ""
[docs] @contextmanager def get_stream(self): """ Stream content of one binary file. :return: stream of bytes with the binlog content. """ with self._mysql_client.cursor() as cursor: cursor.execute("SELECT @@log_bin_basename AS log_bin_basename") row = cursor.fetchone() log_bin_basename = row["log_bin_basename"] log_bin_dirname = osp.dirname(log_bin_basename) log_bin_file = osp.join(log_bin_dirname, self._binlog_file) cmd = [ "cat", log_bin_file, ] try: LOG.debug("Running %s", " ".join(cmd)) proc = Popen(cmd, stderr=PIPE, stdout=PIPE) yield proc.stdout _, cerr = proc.communicate() if proc.returncode: LOG.error("Failed to read from %s: %s", log_bin_file, cerr) exit(1) else: LOG.debug("Successfully streamed %s", log_bin_file) except OSError as err: LOG.error("Failed to run %s: %s", cmd, err) exit(1)
[docs] def get_name(self): return osp.join(, self._media_type, "{name}{suffix}".format(name=self._binlog_file, suffix=self.suffix), )