Edit: my original assumption was proved partly wrong. I've added a lengthy answer here on which I invite others to stress-test and correct.
I am looking for a way to utilize the Boto3 S3 API in a single-threaded manner to mimic a threadsafe key-value store. In a nutshell, I want to use the calling thread rather than a new thread to do the upload.
The default behavior of the .upload_fileobj()
method in Boto3 (or .upload_file()
), as far as I can tell, is to kick the task off to a new thread and return None
immediately.
From the docs:
This is a managed transfer which will perform a multipart upload in multiple threads if necessary.
(If my understanding of this is wrong in the first place, a correcton on that would be helpful as well. This is in Boto3 1.9.134.)
>>> import io
>>> import boto3
>>> bucket = boto3.resource('s3').Bucket('my-bucket-name')
>>> buf = io.BytesIO(b"test")
>>> res = bucket.upload_fileobj(buf, 'testobj')
>>> res is None
True
Now, let's say that buf
is not a short 4-byte string, but a huge text blob that's going to take a non-negligible amount of time to fully upload.
I also use this function to check if an object with a given key exists:
def key_exists_in_bucket(bucket_obj, key: str) -> bool:
try:
bucket_obj.Object(key).load()
except botocore.exceptions.ClientError:
return False
else:
return True
My intention is to not re-write the object if it exists by name.
The race condition here is fairly obvious: kick off an upload asynchronously, then do a quick check with key_exists_in_bucket()
, getting back False
if the object is still being written, then go to write it again unnecessarily as a result of that.
Is there a way to ensure that bucket.upload_fileobj()
is called by the current thread rather than a new thread created within the scope of that method?
I realize that this will slow things down. I'm willing to sacrifice speed in this case.
I figured that, since both the answers to this question and another similar question seem to be in direct conflict, it would be best to go straight to the source with pdb
.
boto3
does use multiple threads (10) by defaultOne aspect that I'm striving to address here is that multiple (subthreads) do not imply that the top-level method itself is non-blocking: if the calling thread kicks off the upload to multiple subthreads, but then waits for those threads to finish and return, I would venture to say that is still a blocking call. The flip side of this is if the method call is, in asyncio
speak, a "fire and forget" call. With threading
, this effectively comes down to whether x.join()
is ever called.
Here is the initial code, taken from Victor Val, to kick off the debugger:
import io
import pdb
import boto3
# From dd if=/dev/zero of=100mb.txt bs=50M count=1
buf = io.BytesIO(open('100mb.txt', 'rb').read())
bucket = boto3.resource('s3').Bucket('test-threads')
pdb.run("bucket.upload_fileobj(buf, '100mb')")
This stack frame is from Boto 1.9.134.
Now jumping into pdb
:
.upload_fileobj()
first calls a nested method -- not much to see yet.
(Pdb) s
--Call--
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(542)bucket_upload_fileobj()
-> def bucket_upload_fileobj(self, Fileobj, Key, ExtraArgs=None,
(Pdb) s
(Pdb) l
574
575 :type Config: boto3.s3.transfer.TransferConfig
576 :param Config: The transfer configuration to be used when performing the
577 upload.
578 """
579 -> return self.meta.client.upload_fileobj(
580 Fileobj=Fileobj, Bucket=self.name, Key=Key, ExtraArgs=ExtraArgs,
581 Callback=Callback, Config=Config)
582
583
584
So the top-level method does return something, but its unclear yet how that something eventually becomes None
.
So we step into that.
Now, .upload_fileobj()
does have a config
parameter, which is None by default:
(Pdb) l 531
526
527 subscribers = None
528 if Callback is not None:
529 subscribers = [ProgressCallbackInvoker(Callback)]
530
531 config = Config
532 if config is None:
533 config = TransferConfig()
534
535 with create_transfer_manager(self, config) as manager:
536 future = manager.upload(
This means that config
becomes the default TransferConfig()
:
use_threads
-- If True, threads will be used when performing S3 transfers. If False, no threads will be used in performing transfers: all logic will be ran in the main thread.max_concurrency
-- The maximum number of threads that will be making requests to perform a transfer. If use_threads is set to False, the value provided is ignored as the transfer will only ever use the main thread.And wa-la, here they are:
(Pdb) unt 534
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(535)upload_fileobj()
-> with create_transfer_manager(self, config) as manager:
(Pdb) config
<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>
(Pdb) config.use_threads
True
(Pdb) config.max_concurrency
10
Now we descend down a level in the call stack to use a TransferManager
(context manager). At this point, max_concurrency
has been used as an argument to similarly-named max_request_concurrency
:
# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/manager.py#L223
# The executor responsible for making S3 API transfer requests
self._request_executor = BoundedExecutor(
max_size=self._config.max_request_queue_size,
max_num_threads=self._config.max_request_concurrency,
tag_semaphores={
IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
self._config.max_in_memory_upload_chunks),
IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
self._config.max_in_memory_download_chunks)
},
executor_cls=executor_cls
)
In this boto3 version, at least, that class comes from the separate library s3transfer
.
(Pdb) n
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(536)upload_fileobj()
-> future = manager.upload(
(Pdb) manager
<s3transfer.manager.TransferManager object at 0x7f178db437f0>
(Pdb) manager._config
<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>
(Pdb) manager._config.use_threads
True
(Pdb) manager._config.max_concurrency
10
Next, let's step into manager.upload()
. Here's the full body of that method:
(Pdb) l 290, 303
290 -> if extra_args is None:
291 extra_args = {}
292 if subscribers is None:
293 subscribers = []
294 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
295 call_args = CallArgs(
296 fileobj=fileobj, bucket=bucket, key=key, extra_args=extra_args,
297 subscribers=subscribers
298 )
299 extra_main_kwargs = {}
300 if self._bandwidth_limiter:
301 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
302 return self._submit_transfer(
303 call_args, UploadSubmissionTask, extra_main_kwargs)
(Pdb) unt 301
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(302)upload()
-> return self._submit_transfer(
(Pdb) extra_main_kwargs
{}
(Pdb) UploadSubmissionTask
<class 's3transfer.upload.UploadSubmissionTask'>
(Pdb) call_args
<s3transfer.utils.CallArgs object at 0x7f178db5a5f8>
(Pdb) l 300, 5
300 if self._bandwidth_limiter:
301 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
302 -> return self._submit_transfer(
303 call_args, UploadSubmissionTask, extra_main_kwargs)
304
305 def download(self, bucket, key, fileobj, extra_args=None,
Ah, lovely--so we'll need to get down at least one level further to see the actual underlying upload.
(Pdb) s
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(303)upload()
-> call_args, UploadSubmissionTask, extra_main_kwargs)
(Pdb) s
--Call--
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(438)_submit_transfer()
-> def _submit_transfer(self, call_args, submission_task_cls,
(Pdb) s
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(440)_submit_transfer()
-> if not extra_main_kwargs:
(Pdb) l 440, 10
440 -> if not extra_main_kwargs:
441 extra_main_kwargs = {}
442
443 # Create a TransferFuture to return back to the user
444 transfer_future, components = self._get_future_with_components(
445 call_args)
446
447 # Add any provided done callbacks to the created transfer future
448 # to be invoked on the transfer future being complete.
449 for callback in get_callbacks(transfer_future, 'done'):
450 components['coordinator'].add_done_callback(callback)
Okay, so now we have a TransferFuture
, defined in s3transfer/futures.py
No definitive proof that threads have been kicked off yet, but it sure sounds like it when futures become involved.
(Pdb) l
444 transfer_future, components = self._get_future_with_components(
445 call_args)
446
447 # Add any provided done callbacks to the created transfer future
448 # to be invoked on the transfer future being complete.
449 -> for callback in get_callbacks(transfer_future, 'done'):
450 components['coordinator'].add_done_callback(callback)
451
452 # Get the main kwargs needed to instantiate the submission task
453 main_kwargs = self._get_submission_task_main_kwargs(
454 transfer_future, extra_main_kwargs)
(Pdb) transfer_future
<s3transfer.futures.TransferFuture object at 0x7f178db5a780>
The final line below, from the TransferCoordinator
class, seems important at first glance:
class TransferCoordinator(object):
"""A helper class for managing TransferFuture"""
def __init__(self, transfer_id=None):
self.transfer_id = transfer_id
self._status = 'not-started'
self._result = None
self._exception = None
self._associated_futures = set()
self._failure_cleanups = []
self._done_callbacks = []
self._done_event = threading.Event() # < ------ !!!!!!
You typically see threading.Event
being used for one thread to signal an event status while other threads can be waiting for that event to happen.
TransferCoordinator
is what is used by TransferFuture.result()
.
Alright, circling back from the above, we're now at s3transfer.futures.BoundedExecutor
and its max_num_threads
attribute:
class BoundedExecutor(object):
EXECUTOR_CLS = futures.ThreadPoolExecutor
# ...
def __init__(self, max_size, max_num_threads, tag_semaphores=None,
executor_cls=None):
self._max_num_threads = max_num_threads
if executor_cls is None:
executor_cls = self.EXECUTOR_CLS
self._executor = executor_cls(max_workers=self._max_num_threads)
This is basically equivalent to:
from concurrent import futures
_executor = futures.ThreadPoolExecutor(max_workers=10)
But there is still one question remaining: is this a "fire-and-forget," or does the call actually wait on the threads to finish and return?
It seems to be the latter. .result()
calls self._done_event.wait(MAXINT)
.
# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/futures.py#L249
def result(self):
self._done_event.wait(MAXINT)
# Once done waiting, raise an exception if present or return the
# final result.
if self._exception:
raise self._exception
return self._result
Finally, to re-run Victor Val's test, this seems to confirm the above:
>>> import boto3
>>> import time
>>> import io
>>>
>>> buf = io.BytesIO(open('100mb.txt', 'rb').read())
>>>
>>> bucket = boto3.resource('s3').Bucket('test-threads')
>>> start = time.time()
>>> print("starting to upload...")
starting to upload...
>>> bucket.upload_fileobj(buf, '100mb')
>>> print("finished uploading")
finished uploading
>>> end = time.time()
>>> print("time: {}".format(end-start))
time: 2.6030001640319824
(This execution time is probably shorter with this example run on a network-optimized instance. But 2.5 seconds is still a noticeably big chunk of time, and not at all indicative of the threads being kicked off and not waited for.)
Lastly, here's an example of a Callback
for .upload_fileobj()
. It follows along with an example from the docs.
First, a little helper to get a buffer's size efficiently:
def get_bufsize(buf, chunk=1024) -> int:
start = buf.tell()
try:
size = 0
while True:
out = buf.read(chunk)
if out:
size += chunk
else:
break
return size
finally:
buf.seek(start)
The class itself:
import os
import sys
import threading
import time
class ProgressPercentage(object):
def __init__(self, filename, buf):
self._filename = filename
self._size = float(get_bufsize(buf))
self._seen_so_far = 0
self._lock = threading.Lock()
self.start = None
def __call__(self, bytes_amount):
with self._lock:
if not self.start:
self.start = time.monotonic()
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * 100
sys.stdout.write(
"\r%s %s of %s (%.2f%% done, %.2fs elapsed\n" % (
self._filename, self._seen_so_far, self._size,
percentage, time.monotonic() - self.start))
# Use sys.stdout.flush() to update on one line
# sys.stdout.flush()
Example:
In [19]: import io
...:
...: from boto3.session import Session
...:
...: s3 = Session().resource("s3")
...: bucket = s3.Bucket("test-threads")
...: buf = io.BytesIO(open('100mb.txt', 'rb').read())
...:
...: bucket.upload_fileobj(buf, 'mykey', Callback=ProgressPercentage("mykey", buf))
mykey 262144 of 104857600.0 (0.25% done, 0.00s elapsed
mykey 524288 of 104857600.0 (0.50% done, 0.00s elapsed
mykey 786432 of 104857600.0 (0.75% done, 0.01s elapsed
mykey 1048576 of 104857600.0 (1.00% done, 0.01s elapsed
mykey 1310720 of 104857600.0 (1.25% done, 0.01s elapsed
mykey 1572864 of 104857600.0 (1.50% done, 0.02s elapsed
The upload_fileobj takes in a Config parameter. This is a boto3.s3.transfer.TransferConfig object, which in turn has a parameter called use_threads
(default true) - If True, threads will be used when performing S3 transfers. If False, no threads will be used in performing transfers: all logic will be ran in the main thread.
Hopefully this should work for you.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With