It is trivial to download a large amount of files to your local machine in parallel using gsutil by adding the -m
flag:
gsutil -m cp gs://my-bucket/blob_prefix* .
In python, I can only download a single file at a time:
client = storage.Client()
bucket = client.get_bucket(gs_bucket_name)
blobs = [blob for blob in bucket.list_blobs(prefix=blob_prefix)]
for blob in blobs:
blob.download_to_filename(filename)
Preferably I'd like to download the data directly into memory (similar to blob.download_as_string()
), and preferably into a generator. Order doesn't really matter.
Does this functionality exist somewhere within the python api?
If not, what would be the best way to do this?
I've implemented this hack:
def fetch_data_from_storage(fetch_pattern):
"""Download blobs to local first, then load them into Generator."""
tmp_save_dir = os.path.join("/tmp", "tmp_gs_download")
if os.path.isdir(tmp_save_dir):
shutil.rmtree(tmp_save_dir) # empty tmp dir first
os.makedirs(tmp_save_dir) # create tmp dir
download_command = ["gsutil", "-m", "cp", "gs://{}/{}".format(bucket.name, fetch_pattern), tmp_save_dir]
resp = subprocess.call(download_command)
for file in os.listdir(tmp_save_dir):
with open(os.path.join(tmp_save_dir, file), 'r') as f_data:
content = json.load(f_data)
yield content
Please advise if this was implemented in a better way somewhere.
Checking the Python client libraries for Google Cloud Storage [1] I can conclude there is not a direct way to download multiple files in parallel.
I ran some test using client libraries normally (i.e. one file at a time) and using os.system
to pass the gsutil
command to check the difference on time, and doing it with os.system
is much faster (at least for small files). Can you tell me how it goes as you did? Here's the code I used (fairly simple):
from google.cloud import storage
import time
import os
start_time = time.time()
download_command = "gsutil -m cp gs://<bucket>/* . "
os.system(download_command)
elapsed_time = time.time() - start_time
print(elapsed_time)
I filed a Feature Request for this on your behalf, you can track it here [2]
Ok, here is my multi-processed, multi-threaded solution. Here is how it works:
gsutil ls -l pattern
to fetch a list of blob names and their file size. This is the pattern entered in __main__
cpu_count - 2
Reasons why I wrote this:
Speed comparison for ~2500 small (<50kb) (=55MB) files:
gsutil -m cp
(no metadata): 0m35sfrom typing import Iterable, Generator
import logging
import json
import datetime
import re
import subprocess
import multiprocessing
import threading
from google.cloud import storage
logging.basicConfig(level='INFO')
logger = logging.getLogger(__name__)
class StorageDownloader:
def __init__(self, bucket_name):
self.bucket_name = bucket_name
self.bucket = storage.Client().bucket(bucket_name)
def create_blob_batches_by_pattern(self, fetch_pattern, max_batch_size=1e6):
"""Fetch all blob names according to the pattern and the blob size.
:param fetch_pattern: The gsutil matching pattern for files we want to download.
A gsutil pattern is used instead of blob prefix because it is more powerful.
:type fetch_pattern: str
:param max_batch_size: Maximum size per batch in bytes. Default = 1 MB = 1e6 bytes
:type max_batch_size: float or int
:return: Generator of batches of blob names.
:rtype: Generator of list
"""
download_command = ["gsutil", "ls", "-l", "gs://{}/{}".format(self.bucket.name, fetch_pattern)]
logger.info("Gsutil list command command: {}".format(download_command))
blob_details_raw = subprocess.check_output(download_command).decode()
regexp = r"(\d+) +\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ +gs:\/\/\S+?\/(\S+)"
# re.finditer returns a generator so we don't duplicate memory need in case the string is quite large
cum_batch_size = 0
batch = []
batch_nr = 1
for reg_match in re.finditer(regexp, blob_details_raw):
blob_name = reg_match.group(2)
byte_size = int(reg_match.group(1))
batch.append(blob_name)
cum_batch_size += byte_size
if cum_batch_size > max_batch_size:
yield batch
batch = []
cum_batch_size = 0
batch_nr += 1
logger.info("Created {} batches with roughly max batch size = {} bytes".format(batch_nr, int(max_batch_size)))
if batch:
yield batch # if we still have a batch left, then it must also be yielded
@staticmethod
def download_batch_into_memory(batch, bucket, inclue_metadata=True, max_threads=10):
"""Given a batch of storage filenames, download them into memory.
Downloading the files in a batch is multithreaded.
:param batch: A list of gs:// filenames to download.
:type batch: list of str
:param bucket: The google api pucket.
:type bucket: google.cloud.storage.bucket.Bucket
:param inclue_metadata: True to inclue metadata
:type inclue_metadata: bool
:param max_threads: Number of threads to use for downloading batch. Don't increase this over 10.
:type max_threads: int
:return: Complete blob contents and metadata.
:rtype: dict
"""
def download_blob(blob_name, state):
"""Standalone function so that we can multithread this."""
blob = bucket.blob(blob_name=blob_name)
content = json.loads(blob.download_as_string())
if inclue_metadata:
blob.reload()
metadata = blob.metadata
if metadata:
state[blob_name] = {**content, **metadata}
state[blob_name] = content
batch_data = {bn: {} for bn in batch}
threads = []
active_thread_count = 0
for blobname in batch:
thread = threading.Thread(target=download_blob, kwargs={"blob_name": blobname, "state": batch_data})
threads.append(thread)
thread.start()
active_thread_count += 1
if active_thread_count == max_threads:
# finish up threads in batches of size max_threads. A better implementation would be a queue
# from which the threads can feed, but this is good enough if the blob size is roughtly the same.
for thread in threads:
thread.join()
threads = []
active_thread_count = 0
# wait for the last of the threads to be finished
for thread in threads:
thread.join()
return batch_data
def multiprocess_batches(self, batches, max_processes=None):
"""Spawn parallel process for downloading and processing batches.
:param batches: An iterable of batches, probably a Generator.
:type batches: Iterable
:param max_processes: Maximum number of processes to spawn. None for cpu_count
:type max_processes: int or None
:return: The response form all the processes.
:rtype: dict
"""
if max_processes is None:
max_processes = multiprocessing.cpu_count() - 2
logger.info("Using {} processes to process batches".format(max_processes))
def single_proc(mp_batch, mp_bucket, batchresults):
"""Standalone function so that we can multiprocess this."""
proc_res = self.download_batch_into_memory(mp_batch, mp_bucket)
batchresults.update(proc_res)
pool = multiprocessing.Pool(processes=max_processes)
batch_results = multiprocessing.Manager().dict()
jobs = []
for batch in batches:
logger.info("Processing batch with {} elements".format(len(batch)))
# the client is not thread safe, so need to recreate the client for each process.
bucket = storage.Client().get_bucket(self.bucket_name)
proc = pool.Process(
target=single_proc,
kwargs={"mp_batch": batch, "mp_bucket": bucket, "batchresults": batch_results}
)
jobs.append(proc)
proc.start()
for job in jobs:
job.join()
logger.info("finished downloading {} blobs".format(len(batch_results)))
return batch_results
def bulk_download_as_dict(self, fetch_pattern):
"""Download blobs from google storage to
:param fetch_pattern: A gsutil storage pattern.
:type fetch_pattern: str
:return: A dict with k,v pairs = {blobname: blob_data}
:rtype: dict
"""
start = datetime.datetime.now()
filename_batches = self.create_blob_batches_by_pattern(fetch_pattern)
downloaded_data = self.multiprocess_batches(filename_batches)
logger.info("time taken to download = {}".format(datetime.datetime.now() - start))
return downloaded_data
if __name__ == '__main__':
stor = StorageDownloader("mybucket")
data = stor.bulk_download_as_dict("some_prefix*")
This can still use a fair amount of optimisation (for example queuing the threads instead of waiting for the block to be complete), but this is good enough for me for now.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With