Source code for twindb_backup.source.file_source

# -*- coding: utf-8 -*-
"""
Module defines File source class for backing up local directories.
"""
import shlex
from contextlib import contextmanager
from subprocess import PIPE, Popen

from twindb_backup import LOG, get_files_to_delete
from twindb_backup.source.base_source import BaseSource


[docs]class FileSource(BaseSource): """ FileSource class describes a local directory or file. The get_stream() method will return a compressed content of it. :param path: Path to local file or directory. :type path: str :param run_type: A string "daily", "weekly", etc. :type run_type: str :param tar_options: Additional options passed to ``tar``. :type tar_options: str """ def __init__(self, path, run_type, tar_options: str = None): self.path = path self._suffix = "tar" self._media_type = "files" self._tar_options = tar_options super(FileSource, self).__init__(run_type) @property def media_type(self): """Get media type. Media type is a general term that describes what you back up. For directories media_type is 'file'. :return: 'file' :rtype: str """ return self._media_type
[docs] @contextmanager def get_stream(self): """ Get a PIPE handler with content of the source :return: """ cmd = ["tar", "cf", "-"] if self._tar_options: cmd.extend(self._tar_options.split(" ")) cmd.append(self.path) try: LOG.debug("Running %s", " ".join(cmd)) proc = Popen(cmd, stderr=PIPE, stdout=PIPE) yield proc.stdout _, cerr = proc.communicate() if proc.returncode: LOG.error("Failed to read from %s: %s", self.path, cerr) exit(1) else: LOG.debug("Successfully streamed %s", self.path) except OSError as err: LOG.error("Failed to run %s: %s", cmd, err) exit(1)
[docs] def get_name(self): """ Generate relative destination file name :return: file name """ return self._get_name(self._sanitize_filename())
def _sanitize_filename(self): return self.path.rstrip("/").replace("/", "_")
[docs] def apply_retention_policy(self, dst, config, run_type): """Apply retention policy""" prefix = "{remote_path}/{prefix}/files/{file}".format( remote_path=dst.remote_path, prefix=self.get_prefix(), file=self._sanitize_filename(), ) keep_copies = getattr(config.retention, run_type) backups_list = dst.list_files(prefix) LOG.debug("Remote copies: %r", backups_list) for backup_copy in get_files_to_delete(backups_list, keep_copies): LOG.debug("Deleting remote file %s", backup_copy) dst.delete(backup_copy) self._delete_local_files(self._sanitize_filename(), config)