Source code for twindb_backup.destination.azblob

# -*- coding: utf-8 -*-
"""
Module for Azure-blob destination.
"""
# builtin module imports
import gc
import io
import multiprocessing as mp
import os
import sys
import time
import traceback
from contextlib import contextmanager
from functools import wraps
from multiprocessing.connection import Connection as mpConn
from pathlib import Path
from textwrap import indent
from typing import AnyStr, Callable, Dict, Generator, Iterable, List, Optional, Tuple, Union

# Third party module imports
from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
from azure.storage.blob import (
    BlobClient,
    BlobProperties,
    BlobServiceClient,
    ContainerClient,
    ContainerProperties,
    StorageStreamDownloader,
)

# project sub-module imports
from twindb_backup import LOG
from twindb_backup.copy.mysql_copy import MySQLCopy
from twindb_backup.destination.base_destination import BaseDestination
from twindb_backup.destination.exceptions import AzureBlobDestinationError

IterableClientType = Iterable[Union[BlobServiceClient, ContainerClient, BlobClient]]
DEFAULT_AVAILABLE_CPU = os.cpu_count()
GC_TOGGLE_DEPTH = 0
"""GC_TOGGLE_DEPTH is used as a reference counter for managing when the _gc_toggle function should call gc.enable()."""
ONE_MiB = 2**20
MAX_PIPE_CHUNK_BYTES = 8 * ONE_MiB
MAX_SYS_MEM_USE = 512 * ONE_MiB
"""MAX_PIPE_CHUNK_BYTES is a conservatively safe upper bound on the number of bytes we send through
`multiprocessing.connections.Connection` objects.

This boundary will be derived for the current machine's OS at runtime.

Per the official Python 3.9.6 documentation:
::

    send(obj)
        Send an object to the other end of the connection which should be read using recv().

        The object must be picklable. Very large pickles (approximately 32 MiB+, though it depends on the OS)
        may raise a ValueError exception.

For source documentation on send(obj) see:

    https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection.send
"""
NONE_LABEL = "None"
BSC_LABEL = "BlobServiceClient"
CC_LABEL = "ContainerClient"
BC_LABEL = "BlobClient"


[docs]class ClientWrapper: """The ContainerWrapper class exists to simplify the process of ensuring that a container's name is accessible from mixed types of inputs. """ def __init__(self, name: str = None, props: Optional[ContainerProperties] = None) -> None: self._name = name or None if not self._name and props is not None: self._name = props.name @property def name(self) -> str: return self._name
HasNameAttr = Union[ClientWrapper, ContainerProperties] IterableHasName = Iterable[HasNameAttr] StrOrHasName = Union[str, HasNameAttr] IterableStrOrHasName = Iterable[StrOrHasName] def _assemble_fname(path_dict: dict) -> str: interval = path_dict.get("interval", None) media = path_dict.get("media_type", None) prefix = path_dict.get("fname_prefix", None) fname = path_dict.get("fname", None) return "/".join((part for part in (interval, media, prefix, fname) if part)) @contextmanager def _gc_toggle(): """A context manager that toggles garbage collection off-at-entry and back-on-at-exit. :return: A bool value indicating if gc was enabled when this context was entered """ global GC_TOGGLE_DEPTH try: gc.disable() GC_TOGGLE_DEPTH += 1 yield GC_TOGGLE_DEPTH GC_TOGGLE_DEPTH -= 1 finally: if GC_TOGGLE_DEPTH == 0: gc.enable() def _client_name_gen(obj: Union[StrOrHasName, IterableStrOrHasName]) -> str: if obj: if isinstance(obj, (str, ClientWrapper, BlobProperties, ContainerProperties)): obj = (obj,) for elem in obj: if isinstance(elem, str): yield elem elif isinstance(elem, (ClientWrapper, BlobProperties, ContainerProperties)): yield elem.name else: yield from _client_name_gen(elem) def _ensure_containers_exist(conn_str: str, container: Union[StrOrHasName, IterableStrOrHasName]): """ If we have been given a container name (or an iterable of container names) we should ensure they exist and are ready to be acted upon before returning them to the caller. Otherwise they will encounter the potentially troublesome `ResourceNotFoundError` Example of how it becomes troublesome: If a caller deletes a container just before calling this function, there will be an some indeterminate amount of time while that delete operation is being performed that any subsequent operations attempting to create the container will raise `ResourceExistsError` and operations that would interact with an existing resource will raise `ResourceNotFoundError`. """ gen = _client_name_gen(container) delay_max = 10 delay = 0.1 while True: unfinished = [] for cont in gen: _client: ContainerClient = ContainerClient.from_connection_string(conn_str, cont) try: cprop: ContainerProperties = _client.get_container_properties(timeout=2) # getting etag confirms container is fully created etag = getattr(cprop, "etag", cprop["etag"]) except ResourceNotFoundError: try: cprop: ContainerProperties = _client.create_container(timeout=2) # getting etag confirms container is fully created etag = getattr(cprop, "etag", cprop["etag"]) except ResourceExistsError: # We are getting both resource existance errors, meaning the container # is likely being deleted and we can't recreate it till that operation # has finished. So, add the container back to our queue and we'll try # again later. unfinished.append(cont) finally: _client.close() if not unfinished: break gen = _client_name_gen(unfinished) # added delay to ensure we don't jackhammer requests to remote service. time.sleep(delay) delay = min(delay_max, delay + delay)
[docs]def flatten_client_iters(clients: List[Union[ContainerClient, List[BlobClient]]]): errs: Dict[str, List[Dict[str, str]]] = {} for cclient in clients: if isinstance(cclient, list): for bclient in cclient: try: yield bclient except BaseException as be: exc_type, exc_value, exc_traceback = sys.exc_info() be.with_traceback(exc_traceback) errs.setdefault(exc_type, []).append( { "original": be, "exc_type": exc_type, "exc_value": exc_value, } ) else: try: yield cclient except BaseException as be: exc_type, exc_value, exc_traceback = sys.exc_info() be.with_traceback(exc_traceback) errs.setdefault(exc_type, []).append( { "original": be, "exc_type": exc_type, "exc_value": exc_value, } ) if errs: err = AzureClientManagerError(f"There were {len(errs)} errors while accessing the flattened clients iterable.") err.aggregated_traceback = [] for e, lst in errs.items(): agg_tb = [] for args in lst: args: dict oe: BaseException = args["original"] tb = "".join(traceback.format_exception(args["exc_type"], args["exc_value"], oe.__traceback__)) agg_tb.append(indent(tb, "\t")) agg_tb = "\n\n".join(agg_tb) agg_tb = f"\n{'=' * 120}\n{agg_tb}{'-' * 120}" err.aggregated_traceback.append(agg_tb) LOG.exception("\n".join(err.aggregated_traceback), exc_info=err) # raise err err.err_map = errs err.args += (errs,) raise err
[docs]def client_generator( conn_str, container: Optional[Union[StrOrHasName, IterableStrOrHasName]] = None, prefix: Optional[str] = None, blob: Optional[Union[StrOrHasName, IterableStrOrHasName]] = None, recurse: bool = False, ) -> Generator[Union[str, BlobServiceClient, ContainerClient, BlobClient], None, None]: # forward declared type hints bprop: BlobProperties cprop: ContainerProperties # scope shared state flags blobs_yielded = False containers_yielded = False service_clients_yielded = False # a couple of inner functions for handling different client iteration strategies def client_iter(container_iterable): nonlocal blobs_yielded, containers_yielded for c in container_iterable: with ContainerClient.from_connection_string(conn_str, c) as container_client: container_client: ContainerClient if prefix is not None or blob is not None: for bprop in container_client.list_blobs(prefix): bname: str = bprop.name _name = bname.rpartition("/")[2] if check_blob(_name): with container_client.get_blob_client(bprop.name) as blob_client: if not blobs_yielded: yield BC_LABEL blobs_yielded = True yield blob_client elif recurse: for bprop in container_client.list_blobs(): with container_client.get_blob_client(bprop.name) as blob_client: if not blobs_yielded: yield BC_LABEL blobs_yielded = True yield blob_client else: if not containers_yielded: yield CC_LABEL containers_yielded = True yield container_client if not (blobs_yielded or containers_yielded): for c in _client_name_gen(container): with ContainerClient.from_connection_string(conn_str, c) as container_client: container_client: ContainerClient if recurse: for bprop in container_client.list_blobs(): with BlobClient.from_connection_string( conn_str, bprop.container, bprop.name ) as blob_client: if not blobs_yielded: yield BC_LABEL blobs_yielded = True yield blob_client else: if not containers_yielded: yield CC_LABEL containers_yielded = True yield container_client # second of the inner functions for client iteration strategies def bsc_iter(): nonlocal service_clients_yielded, containers_yielded, blobs_yielded with BlobServiceClient.from_connection_string(conn_str) as service_client: service_client: BlobServiceClient if (prefix or blob) and not (blobs_yielded or containers_yielded): yield from client_iter(service_client.list_containers()) elif recurse: for c in service_client.list_containers(): with service_client.get_container_client(c) as container_client: for b in container_client.list_blobs(): with container_client.get_blob_client(b) as blob_client: if not blobs_yielded: yield BC_LABEL blobs_yielded = True yield blob_client if not (blobs_yielded or containers_yielded): yield BSC_LABEL service_clients_yielded = True yield service_client # begin context_manager function's logic if not prefix: if blob: prefs = set() _blob = [] for b in _client_name_gen(blob): pref, _, bname = b.rpartition("/") _blob.append(bname) if pref: prefs.add(pref) # ToDo: work in logic for handling if there are more than 1 kind of prefix found blob = _blob try: _pref = prefs.pop() except KeyError: _pref = None # to ensure it's not an empty string prefix = _pref def _check_name(name): return name in blob_set def _always_true(*args): return True if blob: blob_set = set(_client_name_gen(blob)) check_blob = _check_name else: blob = None check_blob = _always_true if container: _ensure_containers_exist(conn_str, container) yield from client_iter(_client_name_gen(container)) else: yield from bsc_iter() if not (blobs_yielded or containers_yielded or service_clients_yielded): yield from (NONE_LABEL,)
def _client_ctx_mgr_wrapper(conn_str: str, gen_func: Callable = client_generator) -> contextmanager: @contextmanager @wraps(gen_func) def context_manager(*args, **kwargs): nonlocal conn_str, gen_func try: ret = gen_func(conn_str, *args, **kwargs) yield ret finally: del ret return context_manager def _ensure_str(obj: Union[AnyStr, Union[List[AnyStr], Tuple[AnyStr]]]): if obj is None: return "" if isinstance(obj, (list, tuple)): if obj: obj = obj[0] else: return "" if isinstance(obj, bytes): obj = obj.decode("utf-8") return str(obj) def _ensure_list_of_str(obj: Union[List[AnyStr], AnyStr]) -> List[Union[str, List[str]]]: """ A helper function that allows us to ensure that a given argument parameter is a list of strings. This function assumes the given object is one of: * list * str * bytes :param obj: A string, bytes object, or a list (or nested list) of string/bytes objects. :return: A list (or nested list) of string objects. :raises AzurBlobInitError: If the given object is not a str or bytes object, or if it's a list/tuple of non-(str/bytes) objects then a logic error has likely occured somewhere and we should fail execution here. """ if obj is None: return [] if isinstance(obj, (list, tuple)): if isinstance(obj, tuple): obj = list(obj) elif isinstance(obj, (str, bytes)): if isinstance(obj, bytes): obj = obj.decode("utf-8") obj = [obj] else: raise AzureBlobInitError(f"Our attempted to ensure obj is a list of strings failed,\n\tgiven {obj=}") for i, elem in enumerate(obj): if isinstance(elem, str): continue elif isinstance(elem, bytes): obj[i] = elem.decode("utf-8") elif isinstance(obj, (list, tuple)): if isinstance(obj, tuple): obj = list(obj) for j, elem2 in obj: obj[j] = _ensure_list_of_str(elem2) else: err_msg = ( "Our attempt to ensure obj is a list of strings failed," f"\n\tgiven: {obj=}" f"\n\tfailure occured while ensuring each element of given iterable was a string, " f"at element: obj[{i}]={elem}" ) raise AzureBlobInitError(err_msg) return obj
[docs]class AzureBlobInitError(AzureBlobDestinationError): pass
[docs]class AzureBlobPathParseError(AzureBlobDestinationError): pass
[docs]class AzureBlobReadError(AzureBlobDestinationError): blob_path: str = "" """The path string which lead to this exception""" chunk_byte_range: Tuple[int, int] = -1, -1 """The [start,end) bytes defining the chunk where this exception occurs (if chunking used) else set to (-1,-1)""" container_name: str = "" blob_name: str = "" blob_properties: BlobProperties = None
[docs]class AzureBlobWriteError(AzureBlobDestinationError): blob_path: str = "" """The path string which lead to this exception""" container_name: str = "" blob_name: str = "" blob_properties: BlobProperties = None content_type = None
[docs]class AzureBlobClientError(AzureBlobDestinationError): container_name: str = "" blob_name: str = ""
[docs]class AzureClientManagerError(AzureBlobDestinationError): err_map: Dict[str, List[Dict[str, str]]] aggregated_traceback: List[str]
[docs]class AzureClientIterationError(AzureBlobDestinationError): pass
[docs]class AzureBlob(BaseDestination): def __getnewargs__(self): """utility function that allows an instance of this class to be pickled""" return ( self.remote_path, self.connection_string, self.can_overwrite, self._cpu_cap, self._max_mem_bytes, self.default_protocol, self.default_host_name, self.default_container_name, self.default_interval, self.default_media_type, self.default_fname_prefix, ) def __getstate__(self): """utility function that allows an instance of this class to be pickled""" return {k: v if k != "_connection_manager" else None for k, v in self.__dict__.items()} def __init__( self, remote_path: AnyStr, connection_string: AnyStr, can_do_overwrites: bool = False, cpu_cap: int = DEFAULT_AVAILABLE_CPU, max_mem_bytes: int = MAX_SYS_MEM_USE, default_protocol: Optional[AnyStr] = None, default_host_name: Optional[AnyStr] = None, default_container_name: Optional[AnyStr] = None, default_interval: Optional[AnyStr] = None, default_media_type: Optional[AnyStr] = None, default_fname_prefix: Optional[AnyStr] = None, ): """ A subclass of BAseDestination; Allows for streaming a backup stream to an Azure-blob destination. Here's the expected general form for the remote path: [protocol]://[host_name]/[container_name]/[interval]/[media_type]/[default_prefix]/[optional_fname] NOTE: Components inside square brackets, E.G.: `[some component]`; are optional as long as they are instead defined by their corresponding initializer argument. :param remote_path: REQUIRED; A string or bytes object; Defines the URI (or URL) for where to connect to the backup object. :param connection_string: REQUIRED; A string or bytes object; When the application makes a request to Azure Storage, it must be authorized. To authorize a request, add your storage account credentials to the application as a connection string. See: https://docs.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-python#copy-your-credentials-from-the-azure-portal :param can_do_overwrites: REQUIRED; a boolean value; Flags if we should overwrite existing data when given a destination that already exists, or if we should fail and raise a `ResourceExistsError`. :param default_protocol: OPTIONAL; DEFAULT is set from container component of remote_path argument A string or bytes object; The name of the container in the destination blob storage we should use. If undefined, then we assume it is on the given remote_path argument. :param default_container_name: OPTIONAL; DEFAULT is set from container component of remote_path argument A string or bytes object; The name of the container in the destination blob storage we should use. If undefined, then we assume it is on the given remote_path argument. :param default_host_name: OPTIONAL; DEFAULT is set from host component of remote_path argument. A string or bytes object; The name of the host server. If undefined, then we assume it is on the given remote_path argument. :param default_interval: OPTIONAL; DEFAULT to "yearly" A string or bytes object; If undefined, then we assume it is on the given remote_path argument. :param default_media_type: OPTIONAL; DEFAULT to "mysql" A string or bytes object; if undefined, thenw e assume it is on the given remote_path argument. """ path = _ensure_str(remote_path) path = path.strip(" /:").rstrip(".") parts = self._path2parts(path) if not path: protocol = default_protocol or "" if not protocol.endswith("://"): protocol += "://" host = default_host_name or "" if not host.endswith("/"): host += "/" container = default_container_name or "" if container and not container.endswith("/"): container += "/" interval = default_interval or "" if interval and not interval.endswith("/"): interval += "/" media_type = default_media_type or "" if media_type and not media_type.endswith("/"): media_type += "/" fname_prefix = default_fname_prefix or "" if fname_prefix and not fname_prefix.endswith("/"): fname_prefix += "/" path = protocol + host + container + interval + media_type + fname_prefix super(AzureBlob, self).__init__(path) connection_string = _ensure_str(connection_string) self._connection_string = connection_string self._flag_overwite_on_write = can_do_overwrites self._cpu_cap = cpu_cap self._max_mem_bytes = max_mem_bytes self._max_mem_pipe = min(MAX_PIPE_CHUNK_BYTES, max_mem_bytes) default_protocol = _ensure_str(default_protocol or parts[0]).strip(":/") default_host_name = _ensure_str(default_host_name or parts[1]).strip(":/") default_container_name = _ensure_str(default_container_name or parts[2]).strip(":/") default_interval = _ensure_str(default_interval or parts[3]).strip(":/") default_media_type = _ensure_str(default_media_type or parts[4]).strip(":/") default_fname_prefix = _ensure_str(default_fname_prefix or parts[5]).strip(":/") self._protocol = default_protocol self._host_name = default_host_name self._container_name = default_container_name self._interval = default_interval self._media_type = default_media_type self._fname_prefix = default_fname_prefix self._part_names = "protocol,host,container,interval,media_type,fname_prefix,fname".split(",") self._parts_list = [ (name, parts[i] if i < len(parts) and parts[i] else "") for i, name in enumerate(self._part_names) ] self._default_parts: Dict[str, str] = {k: v if v != "" else None for k, v in self._parts_list} self._default_parts["interval"] = self._default_parts["interval"] or "yearly" self._default_parts["media_type"] = self._default_parts["media_type"] or "mysql" self._part_names = self._part_names[::-1] self._connection_manager: Optional[contextmanager] = None @property def connection_string(self): """An Azure specific authentication string for accessing the target backup destination host""" return self._connection_string @property def default_protocol(self): return self._protocol @property def default_host_name(self): """The default host server name directory that we default to if a relative path string omits the reference""" return self._host_name @property def default_container_name(self): """The default container (aka bucket) name that we default to if a relative path string omits the reference""" return self._container_name @property def default_interval(self): """The default backup interval directory that we default to if a relative path string omits the reference""" return self._interval @property def default_media_type(self): return self._media_type @property def default_fname_prefix(self): return self._fname_prefix @property def can_overwrite(self): return self._flag_overwite_on_write @property def max_bytes_per_pipe_message(self): return self._max_mem_pipe @property def max_system_memory_usage(self): return self._max_mem_bytes @property def connection_manager(self): if self._connection_manager is None: self._connection_manager = _client_ctx_mgr_wrapper(self._connection_string, client_generator) return self._connection_manager @staticmethod def _path2parts(path: str, split_fname: bool = False): """Breaks a path string into its sub-parts, and produces a tuple of those parts that is at least 6 elements long. We will insert None where a part is determined to be missing in order to ensure the minimum length of 6 elements.""" def extract_protocol(_path: str): protocol, _, _path = _path.partition("://") if not _path: if protocol.startswith(".../"): _path = protocol[4:] protocol = "..." else: _path = protocol protocol = None else: protocol = protocol.strip(":/") return protocol, *partition_path(_path, 1) def partition_path(_path: str, depth: int): if not _path: if depth < 6: return None, *partition_path(_path, depth + 1) elif depth < 5: part, _, _path = _path.partition("/") return part.strip(":/"), *partition_path(_path, depth + 1) elif split_fname: prefix, _, fname = _path.rpartition("/") return prefix, fname return _path.strip(":/"), None return extract_protocol(path) def _path_parse(self, path: str, split_fname: bool = False): """ Called in multiple places where we need to decompose a path string in order to access specific parts by name. """ if not path: return self.remote_path, {k: v for k, v in self._default_parts.items()} # noinspection PyTupleAssignmentBalance ( protocol, host, container, interval, media, prefix, *fname, ) = self._path2parts(path, split_fname) fname: list protocol = protocol if protocol and protocol != "..." else self.default_protocol host = host if host and host != "..." else self.default_host_name container = container if container and container != "..." else self.default_container_name if container != self.default_container_name: interval = self.default_interval if interval and interval == "..." else interval if interval else "" media = self.default_media_type if media and media == "..." else media if media else "" prefix = self.default_fname_prefix if prefix and prefix == "..." else prefix if prefix else "" else: interval = interval if interval and interval != "..." else self.default_interval media = media if media and media != "..." else self.default_media_type prefix = prefix if prefix and prefix != "..." else self.default_fname_prefix if fname: _fname = list(fname) while _fname: fname = _fname.pop() if fname: _fname = "/".join(_fname) break else: # noinspection PyTypeChecker fname = None parts: str = "/".join((s for s in (host, container, interval, media, prefix, fname) if s)) relative_depth = 0 while parts and parts.startswith("../"): relative_depth += 1 _, _, parts = parts.partition("/") base_parts = "/".join(tpl[1] for tpl in self._parts_list[1:-relative_depth]) base_parts += "/" if base_parts else "" path = base_parts + parts.lstrip("/") _parts = path.split("/", 4)[::-1] shorten = len(self._part_names) - 1 - len(_parts) _parts2 = [None] * shorten _parts2 += _parts # noinspection PyTypeChecker ret = {k: v for k, v in zip(self._part_names[:-1], _parts2)} ret["protocol"] = protocol return path, ret
[docs] def delete(self, path: AnyStr): """ Delete object from the destination the general form for the path object should conform to the following: [azure:/]/[bucket or container name]/[server name]/[update interval]/[query language]/<file name> NOTE: The protocol type (the left-most component of the example above) is technically optional, as it should always be an azure storage type; but if passed we will check to confirm that it is indeed for azure-blob storage, so including it ensures proper sanity checking. -- If path defines a new absolute path string then it must contain all parts defined above, with the option to omit those components wrapped in square brackets, E.G.: [some component] where: [components inside square brackets] => optional <objects inside chevrons> => required such that: optional components that are not provided should be substituted with an ellipsis (the triple period => ...) E.G.: ...://foo/.../hourly/mysql/bar-that.foos.gz Note: Where optional path components are omitted, we assume that the context of the called AzureBlob instance should be used to fill in the gaps. -- If path is given as a relative path string then you may also use the ellipsis as defined for absolute paths, with the added option to use `..` for relative directory hierarchy referencing. The one caveat is that E.G.: ../../daily/mysql/relative-foo.bar.gz or ../../../some_different_host/.../mysql where: The `...` component signals that we wish to use the given default interval this object was initialized with. :param path: A string or bytes object; The path to the file (blob) to delete. Can be relative or absolute. """ abs_path, path_dict = self._path_parse(path) container = path_dict["container"] fname = _assemble_fname(path_dict) if fname: label = BC_LABEL client_type = "blob" args = container, fname else: label = CC_LABEL client_type = "container" args = (container,) with self.connection_manager(*args) as client_iter: iter_type = next(client_iter) if iter_type != label: raise AzureClientIterationError( f"Failed to properly identify deletion target given {path=}" f"\n\texpected client type of {label} but got {iter_type}" ) to_check = [] del_call = "delete_" + client_type for client in client_iter: client: Union[BlobClient, ContainerClient] to_check.append(client) getattr(client, del_call)() for c in to_check: delay = 0.01 max_delay = 2 t0 = time.perf_counter() while (time.perf_counter() - t0) < 5: try: if client_type == "blob": c: BlobClient try: bprop: BlobProperties = c.get_blob_properties() if bprop.deleted: break except AttributeError: # when calls to get_blob_properties raises AttributeError, # then the blob is no longer available and the deletion was successful break else: c: ContainerClient cprop: ContainerProperties = c.get_container_properties() if cprop.deleted: break time.sleep(delay) delay = min(max_delay, delay + delay) except ResourceNotFoundError: break
def _blob_ospiper( self, path_parts_dict: Dict[str, str], pout: mpConn, chunk_size: int = None, ) -> None: def err_assembly(): bad_path = "{protocol}://{parts}".format( protocol=self._part_names[0], parts="/".join((f"{{{s}}}" for s in self._part_names[1:] if path_parts_dict.get(s, None))), ).format(**path_parts_dict) return AzureClientIterationError(f"Unable to find downloadable content files on path : {bad_path}") # noinspection PyShadowingNames def configure_chunking(bsize: int, pipe_chunk_size: int): """ :param bsize: total number of bytes to be downloaded for current blob :type bsize: int :param pipe_chunk_size: The maximum buffer size of our transfer pipe :type pipe_chunk_size: int :return: 4-tuple of ints indicating: * the the number of memory chunks * the size of those mem chunks * if the pipe buffer is smaller than max allowed mem usage, then this is the number of pipe chunks needed to fully transfer one of the memory chunks. * the size of the transfer chunks :rtype: tuple[int,int,int,int] """ nonlocal self if bsize < self.max_system_memory_usage: mem_chunk_size = size num_mem_chunks = 1 else: mem_chunk_size = self.max_system_memory_usage num_mem_chunks = (size + mem_chunk_size - 1) // mem_chunk_size if pipe_chunk_size < mem_chunk_size: _chunk_size = pipe_chunk_size num_chunks = (mem_chunk_size + _chunk_size - 1) // _chunk_size else: _chunk_size = mem_chunk_size num_chunks = 1 return num_mem_chunks, mem_chunk_size, num_chunks, _chunk_size chunk_size = self.max_bytes_per_pipe_message if chunk_size is None else chunk_size max_threads = min(32, self._max_mem_bytes) with pout: with os.fdopen(pout.fileno(), "wb", buffering=chunk_size, closefd=False) as pipe_out: container = path_parts_dict.get("container", None) fname = path_parts_dict.pop("fname", None) prefix = _assemble_fname(path_parts_dict) or None with self.connection_manager(container, prefix, fname, recurse=True) as client_iter: iter_type = next(client_iter) if iter_type != BC_LABEL: raise err_assembly() for client in client_iter: client: BlobClient size = client.get_blob_properties().size ( num_mem_chunks, mem_chunk_size, num_chunks, _chunk_size, ) = configure_chunking(size, chunk_size) with io.BytesIO(b"\x00" * mem_chunk_size) as bio: for i in range(num_mem_chunks): ipos = i * mem_chunk_size dl: StorageStreamDownloader = client.download_blob( ipos, mem_chunk_size, max_concurrency=max_threads, ) bio.seek(0) bytes_read = dl.readinto(bio) bio.seek(0) for pos in range(0, bytes_read, _chunk_size): pipe_out.write(bio.read(_chunk_size)) rem = bytes_read % _chunk_size if rem: pipe_out.write(bio.read(rem))
[docs] @contextmanager def get_stream(self, copy: Union[str, MySQLCopy]): if copy is None: copy = self.remote_path path = copy.key if isinstance(copy, MySQLCopy) else copy _path = Path(path) has_fname = "." in _path.name and _path.name != "..." path, path_parts_dict = self._path_parse(path, has_fname) pipe_in, pipe_out = mp.Pipe(False) proc = mp.Process(target=self._blob_ospiper, args=(path_parts_dict, pipe_out)) try: with pipe_in: proc.start() pipe_out.close() with os.fdopen(pipe_in.fileno(), "rb", closefd=False) as file_pipe_in: yield file_pipe_in finally: # pipe_out.close() proc.join() proc.close()
[docs] def read(self, filepath: str, bytes_per_chunk: Optional[int] = None) -> bytes: """ Read content from destination at the end of given filepath. :param filepath: REQUIRED; a str object; Relative path to destination file that we will read from. :type filepath: str :param bytes_per_chunk: OPTIONAL; DEFAULT = self.max_bytes_per_pipe_message; an int value; This parameter dictates the max chunk size (in bytes) that should be passed into the pipe for any single chunk. :type bytes_per_chunk: int :return: Content of the file. :rtype: bytes """ with self.get_stream(filepath) as conn: conn: io.FileIO strt = time.perf_counter() datum = [] while time.perf_counter() - strt < 2: try: data = conn.read() if data: datum.append(data) strt = time.perf_counter() except EOFError: break return b"".join(datum)
[docs] def save(self, handler, filepath): """ Save a stream given as handler to filepath. :param handler: Incoming stream. :type handler: file :param filepath: Save stream as this name. :type filepath: str """ with handler as f_src: self.write(f_src, filepath)
[docs] def write(self, content: Union[AnyStr, io.BufferedIOBase], filepath: AnyStr): """ Write ``content`` to a file. :param content: Content to write to the file. :type content: str, bytes, or subclass of BufferedIOBase object :param filepath: Relative path to file. :type filepath: str or bytes object """ if isinstance(filepath, bytes): filepath = filepath.decode("utf-8") filepath, _, fname = filepath.rpartition("/") path, path_dict = self._path_parse(filepath) container = path_dict["container"] or self.default_container_name blob_name = _assemble_fname(path_dict) with self.connection_manager(container, prefix=blob_name, blob=fname) as client_iter: iter_type = next(client_iter) if iter_type == CC_LABEL: blob_name += "/" + fname client: ContainerClient = next(client_iter) if isinstance(content, io.BufferedReader): with content: client.upload_blob( blob_name, content.read(), overwrite=self.can_overwrite, ) else: client.upload_blob(blob_name, content, overwrite=self.can_overwrite) elif iter_type != BC_LABEL: raise AzureClientIterationError(f"Failed to identify path to blob files given: {filepath}") else: # Unless filepath used wildcards, client_iter is only going to produce # a single client instance to upload to. bclient: BlobClient = next(client_iter) if isinstance(content, io.BufferedReader): with content: bclient.upload_blob(content.read(), overwrite=self.can_overwrite) else: bclient.upload_blob(content, overwrite=self.can_overwrite)
def _list_files(self, prefix: str = None, **kwargs): # , recursive=False, files_only=False): """ A descendant class must implement this method. It should return a list of files already filtered out by prefix. Some storage engines (e.g. Google Cloud Storage) allow that at the API level. The method should use storage level filtering to save on network transfers. if prefix is given it is assumed to supersede the default container/interval/media_type/custom-prefix/ parts of the path. To only replace select parts of that path segment, use the ... (ellipsis) to indicate which portions you wish to have remain default. """ results = set() if prefix: if prefix == "..." or prefix.startswith(".../"): prefix = prefix.strip("/") path_template = f"{self._protocol}://{self.default_host_name}/{prefix}" _, path_dict = self._path_parse(path_template, True) else: container, _, prefix = prefix.partition("/") path_dict = {"container": container, "fname_prefix": prefix} else: prefix = None # ensure we don't pass along an empty string path_dict = {"container": None} fname = path_dict.pop("fname", None) or None prefix = _assemble_fname(path_dict) or prefix or None cont_starts, _, _ = (path_dict.get("container", "") or "").partition("*") with BlobServiceClient.from_connection_string(self.connection_string) as service_client: service_client: BlobServiceClient # service_client. for container in service_client.list_containers(cont_starts or None): with service_client.get_container_client(container) as cclient: cclient: ContainerClient if fname: for bprop in cclient.list_blobs(prefix): bprop: BlobProperties if fname in bprop.name: with cclient.get_blob_client(bprop) as bclient: results.add(bclient.url) else: for bprop in cclient.list_blobs(prefix): bprop: BlobProperties with cclient.get_blob_client(bprop) as bclient: results.add(bclient.url) # if files_only: # if recursive: # for bprop in cclient.list_blobs(prefix): # bprop: BlobProperties # bname: str = bprop.name # if not fname or fname in bname.rpartition("/")[2]: # with cclient.get_blob_client(bprop) as bclient: # results.add(bclient.url) # else: # for bprop in cclient.walk_blobs(prefix): # bprop: BlobProperties # bname = bprop.name # dbg_break = 0 # elif recursive: # if not fname: # for bprop in cclient.list_blobs(prefix): # bprop: BlobProperties # with cclient.get_blob_client(bprop) as bclient: # results.add(bclient.url) # # else: # for bprop in cclient.walk_blobs(prefix): # if fname in bname.rpartition("/")[2]: # with cclient.get_blob_client(bprop) as bclient: # results.add(bclient.url) return results