I need to upload many files to S3, it would take hours to complete that job sequentially. That's exactly what Kotlin's new coroutines excels in, so I wanted to give them a first try instead of fiddling around again with some Thread-based execution service.
Here is my (simplified) code:
fun upload(superTiles: Map<Int, Map<Int, SuperTile>>) = runBlocking {
val s3 = AmazonS3ClientBuilder.standard().withRegion("eu-west-1").build()
for ((x, ys) in superTiles) {
val jobs = mutableListOf<Deferred<Any>>()
for ((y, superTile) in ys) {
val job = async(CommonPool) {
uploadTile(s3, x, y, superTile)
}
jobs.add(job)
}
jobs.map { it.await() }
}
}
suspend fun uploadTile(s3: AmazonS3, x: Int, y: Int, superTile: SuperTile) {
val json: String = "{}"
val key = "$s3Prefix/x4/$z/$x/$y.json"
s3.putObject(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata))
}
The problem: the code is still very slow and logging reveals that requests are still executed sequentially: a job is finished before the next one is created. Only in very few cases (1 out of 10) I see jobs running concurrently.
Why does the code not run much faster / concurrently? What can I do about it?
Kotlin coroutines excel when you work with asynchronous API, while AmazonS3.putObject
API that you are using is an old-school blocking, synchronous API, so you get only as many concurrent uploads as the number of threads in the CommonPool
that you are using. There is no value in marking your uploadTile
function with suspend
modified, because it does not use any suspending functions in its body.
The first step in getting more throughput in your upload task is to start using asynchronous API for that. I'd suggest to look at Amazon S3 TransferManager for that purse. See if that gets your problem solved first.
Kotlin coroutines are designed to help you to combine your async APIs into a easy-to-use logical workflows. For example, it is straightforward to adapt asynchronous API of TransferManager
for use with coroutines by writing the following extension function:
suspend fun Upload.await(): UploadResult = suspendCancellableCoroutine { cont ->
addProgressListener {
if (isDone) {
// we know it should not actually wait when done
try { cont.resume(waitForUploadResult()) }
catch (e: Throwable) { cont.resumeWithException(e) }
}
}
cont.invokeOnCompletion { abort() }
}
This extension enables you to write very fluent code that works with TransferManager
and you can rewrite your uploadTile
function to work with TransferManager
instead of working with blocking AmazonS3
interface:
suspend fun uploadTile(tm: TransferManager, x: Int, y: Int, superTile: SuperTile) {
val json: String = "{}"
val key = "$s3Prefix/x4/$z/$x/$y.json"
tm.upload(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata))
.await()
}
Notice, how this new version of uploadTile
uses a suspending function await
that was defined above.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With