"""
Module defines MySQL binlog source class for backing them up.
"""
import struct
from contextlib import contextmanager
from os import path as osp
from subprocess import PIPE, Popen
from twindb_backup import LOG
from twindb_backup.source.base_source import BaseSource
from twindb_backup.source.exceptions import BinlogSourceError
[docs]class BinlogV4Event(object): # pylint: disable=too-few-public-methods
"""
MySQL Binary log event.
"""
def __init__(self, **kwargs):
self.timestamp = kwargs.get("timestamp")
self.type_code = kwargs.get("type_code")
self.server_id = kwargs.get("server_id")
self.event_length = kwargs.get("event_length")
self.curr_position = kwargs.get("curr_position")
self.next_position = kwargs.get("next_position")
self.flags = kwargs.get("flags")
[docs]class BinlogParser(object):
"""
Class parses a binlog file.
:param binlog: path to a binlog file.
:type binlog: str
"""
def __init__(self, binlog):
self._binlog = binlog
@property
def name(self):
"""Binlog base name"""
return osp.basename(self._binlog)
@property
def created_at(self):
"""Timestamp when the binlog was created"""
try:
with open(self._binlog, "rb") as binlog_descriptor:
self.__read_magic_number(binlog_descriptor)
return self.__read_int(binlog_descriptor, 4)
except IOError as err:
raise BinlogSourceError("Failed to read the 'created_at' attribute: %s" % err)
@property
def start_position(self):
"""Minimal position in the binlog"""
return 4
@property
def end_position(self):
"""Last position in the binlog"""
last_position = self.start_position
with open(self._binlog, "rb") as binlog_descriptor:
self.__read_magic_number(binlog_descriptor)
while True:
event = self.__read_binlog_event(binlog_descriptor)
if event:
last_position = event.curr_position
else:
break
return last_position
@staticmethod
def __read_magic_number(fdesc):
return fdesc.read(4)
@staticmethod
def __read_int(fdesc, n_bytes):
if n_bytes == 4:
return struct.unpack("i", fdesc.read(n_bytes))[0]
elif n_bytes == 2:
return struct.unpack("h", fdesc.read(n_bytes))[0]
elif n_bytes == 1:
return struct.unpack("b", fdesc.read(n_bytes))[0]
else:
raise NotImplementedError("Reading %d bytes integer is unsupported" % n_bytes)
def __read_binlog_event(self, binlog_descriptor):
"""
Read binlog event from file descriptor
:param binlog_descriptor: File descriptor
:return: Binlog event
:rtype: BinlogV4Event
"""
position = binlog_descriptor.tell()
try:
event = BinlogV4Event(
timestamp=self.__read_int(binlog_descriptor, 4),
type_code=self.__read_int(binlog_descriptor, 1),
server_id=self.__read_int(binlog_descriptor, 4),
event_length=self.__read_int(binlog_descriptor, 4),
curr_position=position,
next_position=self.__read_int(binlog_descriptor, 4),
flags=self.__read_int(binlog_descriptor, 2),
)
binlog_descriptor.read(event.event_length - 19)
return event
except struct.error:
return None
[docs]class BinlogSource(BaseSource):
"""
MySQL Binlog source.
:param run_type: The backup copy interval. hourly, daily, etc.
:type run_type: str
:param mysql_client: Instance that can be used to execute queries in MySQL.
:type mysql_client: MySQLClient
:param binlog_file: Name of the binlog file as it appears in
``SHOW BINARY LOGS``.
:type binlog_file: str
"""
def __init__(self, run_type, mysql_client, binlog_file=None):
super(BinlogSource, self).__init__(run_type)
self._mysql_client = mysql_client
self._media_type = "binlog"
self._binlog_file = binlog_file
self.suffix = ""
[docs] @contextmanager
def get_stream(self):
"""
Stream content of one binary file.
:return: stream of bytes with the binlog content.
"""
with self._mysql_client.cursor() as cursor:
cursor.execute("SELECT @@log_bin_basename AS log_bin_basename")
row = cursor.fetchone()
log_bin_basename = row["log_bin_basename"]
log_bin_dirname = osp.dirname(log_bin_basename)
log_bin_file = osp.join(log_bin_dirname, self._binlog_file)
cmd = [
"cat",
log_bin_file,
]
try:
LOG.debug("Running %s", " ".join(cmd))
proc = Popen(cmd, stderr=PIPE, stdout=PIPE)
yield proc.stdout
_, cerr = proc.communicate()
if proc.returncode:
LOG.error("Failed to read from %s: %s", log_bin_file, cerr)
exit(1)
else:
LOG.debug("Successfully streamed %s", log_bin_file)
except OSError as err:
LOG.error("Failed to run %s: %s", cmd, err)
exit(1)
[docs] def get_name(self):
return osp.join(
self.host,
self._media_type,
"{name}{suffix}".format(name=self._binlog_file, suffix=self.suffix),
)