Source code for twindb_backup.util

# -*- coding: utf-8 -*-
Module with helper functions
import errno
import multiprocessing
import os
import shutil
import sys
from contextlib import contextmanager
from subprocess import Popen, PIPE

import psutil

from twindb_backup import LOG
from twindb_backup.exceptions import OperationError

[docs]def mkdir_p(path, mode=0o777): """ Emulate mkdir -p. Create a directory named path with numeric mode mode. The default mode is 0777 (octal) :param path: Directory path. :type path: str :param mode: Directory permissions. The default mode is 0777 (octal) :type mode: int """ try: os.makedirs(path, mode=mode) except OSError as exc: # Python >2.5 if exc.errno == errno.EEXIST and os.path.isdir(path): pass else: raise
[docs]def ensure_empty(path): """ Check if a given directory is empty and exit if not. :param path: path to directory :type path: str """ try: if os.listdir(path): msg = ( 'You asked to restore backup copy in directory "%s". ' "But it is not empty." % path ) raise OperationError(msg) except OSError as err: if err.errno == 2: # OSError: [Errno 2] No such file or directory pass else: raise
[docs]def empty_dir(path): """Remove all files are directories in path :param path: Path to directory to be emptied. :type path: str """ for the_file in os.listdir(path): file_path = os.path.join(path, the_file) if os.path.isfile(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path)
[docs]@contextmanager def run_command(command, ok_non_zero=False): """ Run shell command locally :param command: Command to run :type command: list :param ok_non_zero: Don't consider non-zero exit code as an error. :type ok_non_zero: bool :return: file object with stdout as generator to use with ``with`` """ try: LOG.debug("Running %s", " ".join(command)) proc = Popen(command, stderr=PIPE, stdout=PIPE) yield proc.stdout _, cerr = proc.communicate() if proc.returncode and not ok_non_zero: LOG.error( "Command %s exited with error code %d", " ".join(command), proc.returncode, ) LOG.error(cerr) sys.exit(1) else: LOG.debug("Exited with zero code") except OSError as err: LOG.error("Failed to run %s", " ".join(command)) LOG.error(err) sys.exit(1)
[docs]def split_host_port(host_port): """ Splits a string of host and port separated by a semicolon. :param host_port: host or host:port. Allowed values are like or just :return: a tuple with host and port. If only address is specified it'll return (address, None). If host_port is None it will return (None, None) :rtype: tuple """ try: host = host_port.split(":")[0] if not host: host = None except AttributeError: host = None try: port = int(host_port.split(":")[1]) except (IndexError, AttributeError, ValueError): port = None return host, port
[docs]def kill_children(): """ Kill child process """ for proc in multiprocessing.active_children():"Terminating %r [%d] ...", proc, proc.terminate() parent = psutil.Process(os.getpid()) for child in parent.children(recursive=True):"Terminating process %r", child) child.kill()
[docs]def my_cnfs(common_paths=None): """ Start reading a root my.cnf file given in common paths and parse included files. :param common_paths: list of my.cnf files to start parsing from. :type common_paths: list :return: list of all included my.cnf files :rtype: list """ result = [] for my_cnf in common_paths: if os.path.exists(my_cnf): result.append(my_cnf) with open(my_cnf) as fp_my_cnf: for line in if "!includedir" in line: path = line.split()[1] c_paths = [] for included_file in os.listdir(path): if included_file.endswith(".cnf"): c_paths.append("%s%s" % (path, included_file)) result.extend(my_cnfs(common_paths=c_paths)) elif "!include" in line: include_file = line.split()[1] result.extend(my_cnfs(common_paths=[include_file])) return result
[docs]def normalize_b64_data(coding): """ Normalize base64 key. See for details. :param coding: Encoded data :return: Normalized encoded data """ missing_padding = len(coding) % 4 if missing_padding != 0: coding += b"=" * (4 - missing_padding) return coding