Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Google Storage python api download in parallel

It is trivial to download a large amount of files to your local machine in parallel using gsutil by adding the -m flag:

gsutil -m cp gs://my-bucket/blob_prefix* .

In python, I can only download a single file at a time:

client = storage.Client()
bucket = client.get_bucket(gs_bucket_name)
blobs = [blob for blob in bucket.list_blobs(prefix=blob_prefix)]
for blob in blobs:
    blob.download_to_filename(filename)

Preferably I'd like to download the data directly into memory (similar to blob.download_as_string()), and preferably into a generator. Order doesn't really matter.

Does this functionality exist somewhere within the python api?
If not, what would be the best way to do this?

Edit

I've implemented this hack:

def fetch_data_from_storage(fetch_pattern):
    """Download blobs to local first, then load them into Generator."""
    tmp_save_dir = os.path.join("/tmp", "tmp_gs_download")
    if os.path.isdir(tmp_save_dir):
        shutil.rmtree(tmp_save_dir)  # empty tmp dir first
    os.makedirs(tmp_save_dir)  # create tmp dir

    download_command = ["gsutil", "-m", "cp", "gs://{}/{}".format(bucket.name, fetch_pattern), tmp_save_dir]
    resp = subprocess.call(download_command)

    for file in os.listdir(tmp_save_dir):
        with open(os.path.join(tmp_save_dir, file), 'r') as f_data:
            content = json.load(f_data)
            yield content

Please advise if this was implemented in a better way somewhere.

like image 807
Roman Avatar asked Jul 24 '18 22:07

Roman


2 Answers

Checking the Python client libraries for Google Cloud Storage [1] I can conclude there is not a direct way to download multiple files in parallel.

I ran some test using client libraries normally (i.e. one file at a time) and using os.system to pass the gsutil command to check the difference on time, and doing it with os.system is much faster (at least for small files). Can you tell me how it goes as you did? Here's the code I used (fairly simple):

from google.cloud import storage
import time
import os
start_time = time.time()

download_command = "gsutil -m cp gs://<bucket>/* . "
os.system(download_command)
elapsed_time = time.time() - start_time
print(elapsed_time)

I filed a Feature Request for this on your behalf, you can track it here [2]

like image 165
Iñigo Avatar answered Nov 10 '22 10:11

Iñigo


Ok, here is my multi-processed, multi-threaded solution. Here is how it works:

  • 1) Use subprocess and gsutil ls -l pattern to fetch a list of blob names and their file size. This is the pattern entered in __main__
  • 2) Batches of blob names are created depending on max batch size. Default is 1MB. A large file will then just create a batch of 1.
  • 3) Each batch is sent to a different process. Default processes = cpu_count - 2
  • 4) Every batch in each process is multi-threaded (default max threads = 10) A batch of threads needs to be complete before the next batch starts.
  • 5) Each thread downloads a single blob and combines it with its metadata.
  • 6) Results are propagated upwards via shared resources and memory allocations.

Reasons why I wrote this:

  • I also needed the metadata, which is lost with gsutil (critical)
  • I want some retry control if some parts failed (minor)

Speed comparison for ~2500 small (<50kb) (=55MB) files:

  • One file at time (including metadata): 25m13s
  • gsutil -m cp (no metadata): 0m35s
  • Code below (including metadata): 1m43s

from typing import Iterable, Generator
import logging
import json
import datetime
import re
import subprocess
import multiprocessing
import threading

from google.cloud import storage

logging.basicConfig(level='INFO')
logger = logging.getLogger(__name__)


class StorageDownloader:

    def __init__(self, bucket_name):
        self.bucket_name = bucket_name
        self.bucket = storage.Client().bucket(bucket_name)

    def create_blob_batches_by_pattern(self, fetch_pattern, max_batch_size=1e6):
        """Fetch all blob names according to the pattern and the blob size.

        :param fetch_pattern: The gsutil matching pattern for files we want to download.
          A gsutil pattern is used instead of blob prefix because it is more powerful.
        :type fetch_pattern: str
        :param max_batch_size: Maximum size per batch in bytes.  Default = 1 MB = 1e6 bytes
        :type max_batch_size: float or int
        :return: Generator of batches of blob names.
        :rtype: Generator of list
        """
        download_command = ["gsutil", "ls", "-l", "gs://{}/{}".format(self.bucket.name, fetch_pattern)]
        logger.info("Gsutil list command command: {}".format(download_command))
        blob_details_raw = subprocess.check_output(download_command).decode()
        regexp = r"(\d+) +\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ +gs:\/\/\S+?\/(\S+)"
        # re.finditer returns a generator so we don't duplicate memory need in case the string is quite large
        cum_batch_size = 0
        batch = []
        batch_nr = 1
        for reg_match in re.finditer(regexp, blob_details_raw):
            blob_name = reg_match.group(2)
            byte_size = int(reg_match.group(1))

            batch.append(blob_name)
            cum_batch_size += byte_size

            if cum_batch_size > max_batch_size:
                yield batch
                batch = []
                cum_batch_size = 0
                batch_nr += 1

        logger.info("Created {} batches with roughly max batch size = {} bytes".format(batch_nr, int(max_batch_size)))
        if batch:
            yield batch  # if we still have a batch left, then it must also be yielded

    @staticmethod
    def download_batch_into_memory(batch, bucket, inclue_metadata=True, max_threads=10):
        """Given a batch of storage filenames, download them into memory.

        Downloading the files in a batch is multithreaded.

        :param batch: A list of gs:// filenames to download.
        :type batch: list of str
        :param bucket: The google api pucket.
        :type bucket: google.cloud.storage.bucket.Bucket
        :param inclue_metadata: True to inclue metadata
        :type inclue_metadata: bool
        :param max_threads: Number of threads to use for downloading batch.  Don't increase this over 10.
        :type max_threads: int
        :return: Complete blob contents and metadata.
        :rtype: dict
        """
        def download_blob(blob_name, state):
            """Standalone function so that we can multithread this."""
            blob = bucket.blob(blob_name=blob_name)
            content = json.loads(blob.download_as_string())
            if inclue_metadata:
                blob.reload()
                metadata = blob.metadata
                if metadata:
                    state[blob_name] = {**content, **metadata}
            state[blob_name] = content

        batch_data = {bn: {} for bn in batch}
        threads = []
        active_thread_count = 0
        for blobname in batch:
            thread = threading.Thread(target=download_blob, kwargs={"blob_name": blobname, "state": batch_data})
            threads.append(thread)
            thread.start()
            active_thread_count += 1
            if active_thread_count == max_threads:
                # finish up threads in batches of size max_threads.  A better implementation would be a queue
                #   from which the threads can feed, but this is good enough if the blob size is roughtly the same.
                for thread in threads:
                    thread.join()
                threads = []
                active_thread_count = 0

        # wait for the last of the threads to be finished
        for thread in threads:
            thread.join()
        return batch_data

    def multiprocess_batches(self, batches, max_processes=None):
        """Spawn parallel process for downloading and processing batches.

        :param batches: An iterable of batches, probably a Generator.
        :type batches: Iterable
        :param max_processes: Maximum number of processes to spawn.  None for cpu_count
        :type max_processes: int or None
        :return: The response form all the processes.
        :rtype: dict
        """
        if max_processes is None:
            max_processes = multiprocessing.cpu_count() - 2
            logger.info("Using {} processes to process batches".format(max_processes))

        def single_proc(mp_batch, mp_bucket, batchresults):
            """Standalone function so that we can multiprocess this."""
            proc_res = self.download_batch_into_memory(mp_batch, mp_bucket)
            batchresults.update(proc_res)

        pool = multiprocessing.Pool(processes=max_processes)
        batch_results = multiprocessing.Manager().dict()

        jobs = []
        for batch in batches:
            logger.info("Processing batch with {} elements".format(len(batch)))
            # the client is not thread safe, so need to recreate the client for each process.
            bucket = storage.Client().get_bucket(self.bucket_name)
            proc = pool.Process(
                target=single_proc,
                kwargs={"mp_batch": batch, "mp_bucket": bucket, "batchresults": batch_results}
            )
            jobs.append(proc)
            proc.start()

        for job in jobs:
            job.join()

        logger.info("finished downloading {} blobs".format(len(batch_results)))
        return batch_results

    def bulk_download_as_dict(self, fetch_pattern):
        """Download blobs from google storage to

        :param fetch_pattern: A gsutil storage pattern.
        :type fetch_pattern: str
        :return: A dict with k,v pairs = {blobname: blob_data}
        :rtype: dict
        """
        start = datetime.datetime.now()
        filename_batches = self.create_blob_batches_by_pattern(fetch_pattern)
        downloaded_data = self.multiprocess_batches(filename_batches)
        logger.info("time taken to download = {}".format(datetime.datetime.now() - start))
        return downloaded_data


if __name__ == '__main__':
    stor = StorageDownloader("mybucket")
    data = stor.bulk_download_as_dict("some_prefix*")

This can still use a fair amount of optimisation (for example queuing the threads instead of waiting for the block to be complete), but this is good enough for me for now.

like image 39
Roman Avatar answered Nov 10 '22 12:11

Roman