Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrent S3 File Upload via Kotlin Coroutines

I need to upload many files to S3, it would take hours to complete that job sequentially. That's exactly what Kotlin's new coroutines excels in, so I wanted to give them a first try instead of fiddling around again with some Thread-based execution service.

Here is my (simplified) code:

fun upload(superTiles: Map<Int, Map<Int, SuperTile>>) = runBlocking {
    val s3 = AmazonS3ClientBuilder.standard().withRegion("eu-west-1").build()
    for ((x, ys) in superTiles) {
        val jobs = mutableListOf<Deferred<Any>>()
        for ((y, superTile) in ys) {
            val job = async(CommonPool) {
                uploadTile(s3, x, y, superTile)
            }
            jobs.add(job)
        }
        jobs.map { it.await() }
    }
}

suspend fun uploadTile(s3: AmazonS3, x: Int, y: Int, superTile: SuperTile) {
    val json: String = "{}"
    val key = "$s3Prefix/x4/$z/$x/$y.json"
    s3.putObject(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata))
}

The problem: the code is still very slow and logging reveals that requests are still executed sequentially: a job is finished before the next one is created. Only in very few cases (1 out of 10) I see jobs running concurrently.

Why does the code not run much faster / concurrently? What can I do about it?

like image 970
linqu Avatar asked Jun 22 '17 16:06

linqu


1 Answers

Kotlin coroutines excel when you work with asynchronous API, while AmazonS3.putObject API that you are using is an old-school blocking, synchronous API, so you get only as many concurrent uploads as the number of threads in the CommonPool that you are using. There is no value in marking your uploadTile function with suspend modified, because it does not use any suspending functions in its body.

The first step in getting more throughput in your upload task is to start using asynchronous API for that. I'd suggest to look at Amazon S3 TransferManager for that purse. See if that gets your problem solved first.

Kotlin coroutines are designed to help you to combine your async APIs into a easy-to-use logical workflows. For example, it is straightforward to adapt asynchronous API of TransferManager for use with coroutines by writing the following extension function:

suspend fun Upload.await(): UploadResult = suspendCancellableCoroutine { cont ->
    addProgressListener {
        if (isDone) {
            // we know it should not actually wait when done
            try { cont.resume(waitForUploadResult()) }
            catch (e: Throwable) { cont.resumeWithException(e) }
        }
    }
    cont.invokeOnCompletion { abort() }
}

This extension enables you to write very fluent code that works with TransferManager and you can rewrite your uploadTile function to work with TransferManager instead of working with blocking AmazonS3 interface:

suspend fun uploadTile(tm: TransferManager, x: Int, y: Int, superTile: SuperTile) {
    val json: String = "{}"
    val key = "$s3Prefix/x4/$z/$x/$y.json"
    tm.upload(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata))
        .await()
}

Notice, how this new version of uploadTile uses a suspending function await that was defined above.

like image 197
Roman Elizarov Avatar answered Oct 05 '22 23:10

Roman Elizarov