Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Make Boto3 upload calls blocking (single threaded)

Tags:

python

boto3

Edit: my original assumption was proved partly wrong. I've added a lengthy answer here on which I invite others to stress-test and correct.


I am looking for a way to utilize the Boto3 S3 API in a single-threaded manner to mimic a threadsafe key-value store. In a nutshell, I want to use the calling thread rather than a new thread to do the upload.

The default behavior of the .upload_fileobj() method in Boto3 (or .upload_file()), as far as I can tell, is to kick the task off to a new thread and return None immediately.

From the docs:

This is a managed transfer which will perform a multipart upload in multiple threads if necessary.

(If my understanding of this is wrong in the first place, a correcton on that would be helpful as well. This is in Boto3 1.9.134.)

>>> import io
>>> import boto3
>>> bucket = boto3.resource('s3').Bucket('my-bucket-name')
>>> buf = io.BytesIO(b"test")
>>> res = bucket.upload_fileobj(buf, 'testobj')
>>> res is None
True

Now, let's say that buf is not a short 4-byte string, but a huge text blob that's going to take a non-negligible amount of time to fully upload.

I also use this function to check if an object with a given key exists:

def key_exists_in_bucket(bucket_obj, key: str) -> bool:
    try:
        bucket_obj.Object(key).load()
    except botocore.exceptions.ClientError:
        return False
    else:
        return True

My intention is to not re-write the object if it exists by name.

The race condition here is fairly obvious: kick off an upload asynchronously, then do a quick check with key_exists_in_bucket(), getting back False if the object is still being written, then go to write it again unnecessarily as a result of that.

Is there a way to ensure that bucket.upload_fileobj() is called by the current thread rather than a new thread created within the scope of that method?

I realize that this will slow things down. I'm willing to sacrifice speed in this case.

like image 527
Brad Solomon Avatar asked Dec 01 '22 09:12

Brad Solomon


2 Answers

I figured that, since both the answers to this question and another similar question seem to be in direct conflict, it would be best to go straight to the source with pdb.

Summary

  • boto3 does use multiple threads (10) by default
  • However, it is not asynchronous, in that it waits on (joins) these threads before returning, rather than using a "fire-and-forget" technique
  • So, in this manner, the read/write threadsafety is in place if you're trying to talk to an s3 bucket from multiple clients.

Detail

One aspect that I'm striving to address here is that multiple (subthreads) do not imply that the top-level method itself is non-blocking: if the calling thread kicks off the upload to multiple subthreads, but then waits for those threads to finish and return, I would venture to say that is still a blocking call. The flip side of this is if the method call is, in asyncio speak, a "fire and forget" call. With threading, this effectively comes down to whether x.join() is ever called.

Here is the initial code, taken from Victor Val, to kick off the debugger:

import io
import pdb

import boto3

# From dd if=/dev/zero of=100mb.txt  bs=50M  count=1
buf = io.BytesIO(open('100mb.txt', 'rb').read())
bucket = boto3.resource('s3').Bucket('test-threads')
pdb.run("bucket.upload_fileobj(buf, '100mb')")

This stack frame is from Boto 1.9.134.

Now jumping into pdb:

.upload_fileobj() first calls a nested method -- not much to see yet.

(Pdb) s
--Call--
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(542)bucket_upload_fileobj()
-> def bucket_upload_fileobj(self, Fileobj, Key, ExtraArgs=None,
(Pdb) s

(Pdb) l
574     
575         :type Config: boto3.s3.transfer.TransferConfig
576         :param Config: The transfer configuration to be used when performing the
577             upload.
578         """
579  ->     return self.meta.client.upload_fileobj(
580             Fileobj=Fileobj, Bucket=self.name, Key=Key, ExtraArgs=ExtraArgs,
581             Callback=Callback, Config=Config)
582     
583     
584  

So the top-level method does return something, but its unclear yet how that something eventually becomes None.

So we step into that.

Now, .upload_fileobj() does have a config parameter, which is None by default:

(Pdb) l 531
526     
527         subscribers = None
528         if Callback is not None:
529             subscribers = [ProgressCallbackInvoker(Callback)]
530     
531         config = Config
532         if config is None:
533             config = TransferConfig()
534     
535         with create_transfer_manager(self, config) as manager:
536             future = manager.upload(

This means that config becomes the default TransferConfig():

  • use_threads -- If True, threads will be used when performing S3 transfers. If False, no threads will be used in performing transfers: all logic will be ran in the main thread.
  • max_concurrency -- The maximum number of threads that will be making requests to perform a transfer. If use_threads is set to False, the value provided is ignored as the transfer will only ever use the main thread.

And wa-la, here they are:

(Pdb) unt 534
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(535)upload_fileobj()
-> with create_transfer_manager(self, config) as manager:
(Pdb) config
<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>
(Pdb) config.use_threads
True
(Pdb) config.max_concurrency
10

Now we descend down a level in the call stack to use a TransferManager (context manager). At this point, max_concurrency has been used as an argument to similarly-named max_request_concurrency:

# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/manager.py#L223

    # The executor responsible for making S3 API transfer requests
    self._request_executor = BoundedExecutor(
        max_size=self._config.max_request_queue_size,
        max_num_threads=self._config.max_request_concurrency,
        tag_semaphores={
            IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
                self._config.max_in_memory_upload_chunks),
            IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
                self._config.max_in_memory_download_chunks)
        },
        executor_cls=executor_cls
    )

In this boto3 version, at least, that class comes from the separate library s3transfer.

(Pdb) n
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(536)upload_fileobj()
-> future = manager.upload(
(Pdb) manager
<s3transfer.manager.TransferManager object at 0x7f178db437f0>
(Pdb) manager._config
<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>
(Pdb) manager._config.use_threads
True
(Pdb) manager._config.max_concurrency
10

Next, let's step into manager.upload(). Here's the full body of that method:

(Pdb) l 290, 303
290  ->         if extra_args is None:
291                 extra_args = {}
292             if subscribers is None:
293                 subscribers = []
294             self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
295             call_args = CallArgs(
296                 fileobj=fileobj, bucket=bucket, key=key, extra_args=extra_args,
297                 subscribers=subscribers
298             )
299             extra_main_kwargs = {}
300             if self._bandwidth_limiter:
301                 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
302             return self._submit_transfer(
303                 call_args, UploadSubmissionTask, extra_main_kwargs)

(Pdb) unt 301
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(302)upload()
-> return self._submit_transfer(
(Pdb) extra_main_kwargs
{}

(Pdb) UploadSubmissionTask
<class 's3transfer.upload.UploadSubmissionTask'>
(Pdb) call_args
<s3transfer.utils.CallArgs object at 0x7f178db5a5f8>

(Pdb) l 300, 5
300             if self._bandwidth_limiter:
301                 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
302  ->         return self._submit_transfer(
303                 call_args, UploadSubmissionTask, extra_main_kwargs)
304     
305         def download(self, bucket, key, fileobj, extra_args=None,

Ah, lovely--so we'll need to get down at least one level further to see the actual underlying upload.

(Pdb) s
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(303)upload()
-> call_args, UploadSubmissionTask, extra_main_kwargs)
(Pdb) s
--Call--
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(438)_submit_transfer()
-> def _submit_transfer(self, call_args, submission_task_cls,
(Pdb) s
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(440)_submit_transfer()
-> if not extra_main_kwargs:

(Pdb) l 440, 10
440  ->         if not extra_main_kwargs:
441                 extra_main_kwargs = {}
442     
443             # Create a TransferFuture to return back to the user
444             transfer_future, components = self._get_future_with_components(
445                 call_args)
446     
447             # Add any provided done callbacks to the created transfer future
448             # to be invoked on the transfer future being complete.
449             for callback in get_callbacks(transfer_future, 'done'):
450                 components['coordinator'].add_done_callback(callback)

Okay, so now we have a TransferFuture, defined in s3transfer/futures.py No definitive proof that threads have been kicked off yet, but it sure sounds like it when futures become involved.

(Pdb) l
444             transfer_future, components = self._get_future_with_components(
445                 call_args)
446     
447             # Add any provided done callbacks to the created transfer future
448             # to be invoked on the transfer future being complete.
449  ->         for callback in get_callbacks(transfer_future, 'done'):
450                 components['coordinator'].add_done_callback(callback)
451     
452             # Get the main kwargs needed to instantiate the submission task
453             main_kwargs = self._get_submission_task_main_kwargs(
454                 transfer_future, extra_main_kwargs)
(Pdb) transfer_future
<s3transfer.futures.TransferFuture object at 0x7f178db5a780>

The final line below, from the TransferCoordinator class, seems important at first glance:

class TransferCoordinator(object):
    """A helper class for managing TransferFuture"""
    def __init__(self, transfer_id=None):
        self.transfer_id = transfer_id
        self._status = 'not-started'
        self._result = None
        self._exception = None
        self._associated_futures = set()
        self._failure_cleanups = []
        self._done_callbacks = []
        self._done_event = threading.Event()  # < ------ !!!!!!

You typically see threading.Event being used for one thread to signal an event status while other threads can be waiting for that event to happen.

TransferCoordinator is what is used by TransferFuture.result().

Alright, circling back from the above, we're now at s3transfer.futures.BoundedExecutor and its max_num_threads attribute:

class BoundedExecutor(object):
    EXECUTOR_CLS = futures.ThreadPoolExecutor
    # ...
    def __init__(self, max_size, max_num_threads, tag_semaphores=None,
                 executor_cls=None):
    self._max_num_threads = max_num_threads
    if executor_cls is None:
        executor_cls = self.EXECUTOR_CLS
    self._executor = executor_cls(max_workers=self._max_num_threads)

This is basically equivalent to:

from concurrent import futures

_executor = futures.ThreadPoolExecutor(max_workers=10)

But there is still one question remaining: is this a "fire-and-forget," or does the call actually wait on the threads to finish and return?

It seems to be the latter. .result() calls self._done_event.wait(MAXINT).

# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/futures.py#L249

def result(self):
    self._done_event.wait(MAXINT)

    # Once done waiting, raise an exception if present or return the
    # final result.
    if self._exception:
        raise self._exception
    return self._result

Finally, to re-run Victor Val's test, this seems to confirm the above:

>>> import boto3
>>> import time
>>> import io
>>> 
>>> buf = io.BytesIO(open('100mb.txt', 'rb').read())
>>> 
>>> bucket = boto3.resource('s3').Bucket('test-threads')
>>> start = time.time()
>>> print("starting to upload...")
starting to upload...
>>> bucket.upload_fileobj(buf, '100mb')
>>> print("finished uploading")
finished uploading
>>> end = time.time()
>>> print("time: {}".format(end-start))
time: 2.6030001640319824

(This execution time is probably shorter with this example run on a network-optimized instance. But 2.5 seconds is still a noticeably big chunk of time, and not at all indicative of the threads being kicked off and not waited for.)


Lastly, here's an example of a Callback for .upload_fileobj(). It follows along with an example from the docs.

First, a little helper to get a buffer's size efficiently:

def get_bufsize(buf, chunk=1024) -> int:
    start = buf.tell()
    try:
        size = 0 
        while True: 
            out = buf.read(chunk) 
            if out: 
                size += chunk 
            else: 
                break
        return size
    finally:
        buf.seek(start)

The class itself:

import os
import sys
import threading
import time

class ProgressPercentage(object):
    def __init__(self, filename, buf):
        self._filename = filename
        self._size = float(get_bufsize(buf))
        self._seen_so_far = 0
        self._lock = threading.Lock()
        self.start = None

    def __call__(self, bytes_amount):
        with self._lock:
            if not self.start:
                self.start = time.monotonic()
            self._seen_so_far += bytes_amount
            percentage = (self._seen_so_far / self._size) * 100
            sys.stdout.write(
                "\r%s  %s of %s  (%.2f%% done, %.2fs elapsed\n" % (
                    self._filename, self._seen_so_far, self._size,
                    percentage, time.monotonic() - self.start))
            # Use sys.stdout.flush() to update on one line
            # sys.stdout.flush()

Example:

In [19]: import io 
    ...:  
    ...: from boto3.session import Session 
    ...:  
    ...: s3 = Session().resource("s3") 
    ...: bucket = s3.Bucket("test-threads") 
    ...: buf = io.BytesIO(open('100mb.txt', 'rb').read()) 
    ...:  
    ...: bucket.upload_fileobj(buf, 'mykey', Callback=ProgressPercentage("mykey", buf))                                                                                                                                                                      
mykey  262144 of 104857600.0  (0.25% done, 0.00s elapsed
mykey  524288 of 104857600.0  (0.50% done, 0.00s elapsed
mykey  786432 of 104857600.0  (0.75% done, 0.01s elapsed
mykey  1048576 of 104857600.0  (1.00% done, 0.01s elapsed
mykey  1310720 of 104857600.0  (1.25% done, 0.01s elapsed
mykey  1572864 of 104857600.0  (1.50% done, 0.02s elapsed
like image 198
Brad Solomon Avatar answered Dec 04 '22 00:12

Brad Solomon


The upload_fileobj takes in a Config parameter. This is a boto3.s3.transfer.TransferConfig object, which in turn has a parameter called use_threads (default true) - If True, threads will be used when performing S3 transfers. If False, no threads will be used in performing transfers: all logic will be ran in the main thread.

Hopefully this should work for you.

like image 22
Verma Avatar answered Dec 03 '22 23:12

Verma