# -*- coding: utf-8 -*-
"""
Module for GCS destination.
"""
import os
import re
from contextlib import contextmanager
from functools import partial
from multiprocessing import Process
from os import path as osp
from google.api_core.exceptions import GoogleAPIError, NotFound
from google.auth.exceptions import GoogleAuthError
from google.cloud.storage import Client
from twindb_backup import LOG
from twindb_backup.destination.base_destination import BaseDestination
from twindb_backup.destination.exceptions import FileNotFound, GCSDestinationError
GCS_CONNECT_TIMEOUT = 60
GCS_READ_TIMEOUT = 600
DEFAULT_CHUNK_SIZE = 250 * 1024 * 1024
_CHUNK_PART_REGEXP = r"/part-[0-9]{16}$"
[docs]class GCS(BaseDestination):
"""
GCS destination class.
:param kwargs: Keyword arguments.
* **bucket** - (required) GCS bucket name.
* **gc_credentials_file** - (required) GC credentials json filepath.
* **chunk_size** - when storing a stream use this a a chunk size.
The stream will be stored a set of chunks of this size on the GS.
"""
# def save(self, handler, filepath):
# pass
def __init__(self, **kwargs):
self._bucket = kwargs.get("bucket")
super(GCS, self).__init__(self.bucket)
if "gc_credentials_file" in kwargs:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = kwargs.get("gc_credentials_file")
else:
raise GCSDestinationError(
"gc_credentials_file keyword argument must be defined "
"when initializing %s class" % self.__class__.__name__
)
self._chunk_size = kwargs.get("chunk_size", DEFAULT_CHUNK_SIZE)
self.__bucket_obj = None
@property
def bucket(self):
"""GCS bucket name."""
return self._bucket
[docs] def create_bucket(self):
"""Creates the bucket in gcs that will store the backups.
:raises GCSDestinationError: if failed to create the bucket.
:raises GCSDestinationError: If authentication error.
"""
try:
self._gcs_client.create_bucket(bucket_name=self.bucket)
except (GoogleAPIError, GoogleAuthError) as err:
raise GCSDestinationError(err)
LOG.info("Created bucket %s", self.bucket)
[docs] def delete(self, path):
blobs = self._list_blob_or_chunks(path)
if not blobs:
raise FileNotFound("File %s does not exist." % path)
for blob in blobs:
blob.delete()
[docs] def delete_bucket(self, force=False):
"""Delete the bucket in gcs that was storing the backups.
:param force: If the bucket is non-empty then delete the objects
before deleting the bucket.
:type force: bool
:raise GCSDestinationError: if failed to delete the bucket.
"""
try:
self._bucket_obj.delete(force=force)
except (GoogleAPIError, GoogleAuthError) as err:
raise GCSDestinationError(err)
LOG.info("Deleted bucket %s", self.bucket)
[docs] @contextmanager
def get_stream(self, copy):
pipe_in, pipe_out = os.pipe()
proc = Process(target=self._download_to_pipe, args=(copy.key, pipe_in, pipe_out))
proc.start()
os.close(pipe_out)
pipe_in = os.fdopen(pipe_in)
yield pipe_in
proc.join()
[docs] def read(self, filepath):
"""
Read content from a file.
:param filepath: relative path to a file with status.
:type filepath: str
:return: Content of the file.
:rtype: str
:raises FileNotFound: if filepath doesn't exist on the destination.
"""
obj = self._bucket_obj.blob(filepath)
try:
return obj.download_as_string()
except NotFound as err:
raise FileNotFound(err)
[docs] def write(self, content, filepath):
"""
Write a string passed in ``content`` to a filepath on the destination.
:param content: String to write.
:type content: str
:param filepath: Relative file path on the destination.
:type filepath: str
"""
obj = self._bucket_obj.blob(filepath)
obj.upload_from_string(content)
[docs] def save(self, handler, filepath):
"""
Read from handler and save it to GCS
:param handler: stdout handler from backup source
:type handler: file
:param filepath: save backup copy in a file with this name
:type filepath: str
"""
with handler as f_src:
chunk_no = 0
for chunk in iter(partial(f_src.read, self._chunk_size), b""):
self.write(chunk, osp.join(filepath, "part-%016d" % chunk_no))
chunk_no += 1
@property
def _bucket_obj(self):
if self.__bucket_obj is None:
self.__bucket_obj = self._gcs_client.get_bucket(self.bucket)
return self.__bucket_obj
@_bucket_obj.setter
def _bucket_obj(self, value):
self.__bucket_obj = value
@property
def _gcs_client(self):
"""Creates an authenticated gcs client.
:return: GCS client instance.
:rtype: google.cloud.storage.Client
"""
return Client()
def _download_to_pipe(self, path, pipe_in, pipe_out):
os.close(pipe_in)
pipe_out = os.fdopen(pipe_out, "w")
for blob in self._list_blob_or_chunks(path):
blob.download_to_file(pipe_out)
def _list_blob_or_chunks(self, path):
"""
Method queries GS and returns a list of GS blobs that match path.
If a file is not chunked the method will return one blob in a list.
If the file is chunked the method will return a list of blobs, each
of them is a chunk blob.
For example, if the path is 'master1/status'. This file is not chunked,
so the method will return ``[Blob('master1/status')]``.
However, if the path is
'master1/daily/mysql/mysql-2019-04-04_05_29_05.xbstream.gz'
the method will return each chunk as a blob. So, the return value
will be something like::
[
Blob(
master1/daily/mysql/mysql-2019-04-04_05_29_05.xbstream.gz/part..0
),
Blob(
master1/daily/mysql/mysql-2019-04-04_05_29_05.xbstream.gz/part..1
)
,
...
]
:param path: path to a file in GS
:return: list of blobs that store this file, either the file itself or
its chunks.
:rtype: list(Blob)
"""
result = []
blobs = self._bucket_obj.list_blobs(prefix=path)
for blob in blobs:
if blob.name == path or re.match(path + _CHUNK_PART_REGEXP, blob.name):
result.append(blob)
return result
def _list_files(self, prefix=None, recursive=False, files_only=False):
"""
Get list of objects on Google Storage. It will remove the "/part-"
part from the names returning one file if it was split in chunks.
:param prefix: A prefix inside te GS bucket. For example,
if full object path is ``gs://some-bucket/foo/bar/file.txt``
then the path can be ``foo/`` or ``foo/bar``. Either path will
return ``gs://some-bucket/foo/bar/file.txt``.
:param recursive: Not used.
:param files_only: Not used.
:return: list of object names prefixed with ``gs://``.
:rtype: set(str)
"""
if prefix:
prefix = prefix.lstrip("gs://").lstrip(self.bucket).lstrip("/")
return set(
[
osp.join(
"gs://",
self.bucket,
re.sub(_CHUNK_PART_REGEXP, "", blob.name),
)
for blob in self._bucket_obj.list_blobs(prefix=prefix or None)
]
)