How does Kotlin implement coroutines internally?
Coroutines are said to be a "lighter version" of threads, and I understand that they use threads internally to execute coroutines.
What happens when I start a coroutine using any of the builder functions?
This is my understanding of running this code:
GlobalScope.launch { <---- (A) val y = loadData() <---- (B) // suspend fun loadData() println(y) <---- (C) delay(1000) <---- (D) println("completed") <---- (E) }
ThreadPool
at the beginning.(A)
, Kotlin starts executing the coroutine in the next available free thread (Say Thread01
).(B)
, Kotlin stops executing the current thread, and starts the suspending function loadData()
in the next available free thread (Thread02
).(B)
returns after execution, Kotlin continues the coroutine in the next available free thread (Thread03
).(C)
executes on Thread03
.(D)
, the Thread03
is stopped.(E)
is executed on the next free thread, say Thread01
.Am I understanding this correctly? Or are coroutines implemented in a different way?
Update on 2021: Here's an excellent article by Manuel Vivo that complements all the answers below.
Coroutines are like light-weight threads. They need less resources that regular threads, so you can create more of them. Coroutines use thread pool behind the scenes. Instead of blocking an entire thread, coroutine suspends.
We can for example schedule coroutines on a Java Executor or on Android main looper. However, we can't schedule coroutines on just any thread, it has to cooperate.
A coroutine can start executing in one thread, suspend execution, and resume on a different thread. Coroutines are not managed by the operating system, but by the Kotlin Runtime. When you are sleeping a thread it is blocked for a particular period of time.
Coroutines were added to Kotlin in version 1.3 and are based on established concepts from other languages. On Android, coroutines help to manage long-running tasks that might otherwise block the main thread and cause your app to become unresponsive.
Coroutines are a completely separate thing from any scheduling policy that you describe. A coroutine is basically a call chain of suspend fun
s. Suspension is totally under your control: you just have to call suspendCoroutine
. You'll get a callback object so you can call its resume
method and get back to where you suspended.
Here's some code where you can see that suspension is a very direct and trasparent mechanism, fully under your control:
import kotlin.coroutines.* import kotlinx.coroutines.* var continuation: Continuation<String>? = null fun main(args: Array<String>) { val job = GlobalScope.launch(Dispatchers.Unconfined) { while (true) { println(suspendHere()) } } continuation!!.resume("Resumed first time") continuation!!.resume("Resumed second time") } suspend fun suspendHere() = suspendCancellableCoroutine<String> { continuation = it }
All the code above executes on the same, main thread. There is no multithreading at all going on.
The coroutine you launch
suspends itself each time it calls suspendHere()
. It writes the continuation callback to the continuation
property, and then you explicitly use that continuation to resume the coroutine.
The code uses the Unconfined
coroutine dispatcher which does no dispatching to threads at all, it just runs the coroutine code right there where you invoke continuation.resume()
.
With that in mind, let's revisit your diagram:
GlobalScope.launch { <---- (A) val y = loadData() <---- (B) // suspend fun loadData() println(y) <---- (C) delay(1000) <---- (D) println("completed") <---- (E) }
- Kotlin has a pre-defined
ThreadPool
at the beginning.
It may or may not have a thread pool. A UI dispatcher works with a single thread.
The prerequisite for a thread to be the target of a coroutine dispatcher is that there is a concurrent queue associated with it and the thread runs a top-level loop that takes Runnable
objects from this queue and executes them. A coroutine dispatcher simply puts the continuation on that queue.
- At
(A)
, Kotlin starts executing the coroutine in the next available free thread (SayThread01
).
It can also be the same thread where you called launch
.
- At
(B)
, Kotlin stops executing the current thread, and starts the suspending functionloadData()
in the next available free thread (Thread02
).
Kotlin has no need to stop any threads in order to suspend a coroutine. In fact, the main point of coroutines is that threads don't get started or stopped. The thread's top-level loop will go on and pick another runnable to run.
Furthermore, the mere fact that you're calling a suspend fun
has no significance. The coroutine will only suspend itself when it explicitly calls suspendCoroutine
. The function may also simply return without suspension.
But let's assume it did call suspendCoroutine
. In that case the coroutine is no longer running on any thread. It is suspended and can't continue until some code, somewhere, calls continuation.resume()
. That code could be running on any thread, any time in the future.
- When
(B)
returns after execution, Kotlin continues the coroutine in the next available free thread (Thread03
).
B
doesn't "return after execution", the coroutine resumes while still inside its body. It may suspend and resume any number of times before returning.
(C)
executes onThread03
.- At
(D)
, theThread03
is stopped.- After 1000ms,
(E)
is executed on the next free thread, sayThread01
.
Again, no threads are being stopped. The coroutine gets suspended and a mechanism, usually specific to the dispatcher, is used to schedule its resumption after 1000 ms. At that point it will be added to the run queue associated with the dispatcher.
For specificity, let's see some examples of what kind of code it takes to dispatch a coroutine.
Swing UI dispatcher:
EventQueue.invokeLater { continuation.resume(value) }
Android UI dispatcher:
mainHandler.post { continuation.resume(value) }
ExecutorService dispatcher:
executor.submit { continuation.resume(value) }
Coroutines work by creating a switch over possible resume points:
class MyClass$Coroutine extends CoroutineImpl { public Object doResume(Object o, Throwable t) { switch(super.state) { default: throw new IllegalStateException("call to \"resume\" before \"invoke\" with coroutine"); case 0: { // code before first suspension state = 1; // or something else depending on your branching break; } case 1: { ... } } return null; } }
The resulting code executing this coroutine is then creating that instance and calls the doResume()
function everytime it needs to resume execution, how that is handled depends on the scheduler used for execution.
Here is an example compilation for a simple coroutine:
launch { println("Before") delay(1000) println("After") }
Which compiles to this bytecode
private kotlinx.coroutines.experimental.CoroutineScope p$; public final java.lang.Object doResume(java.lang.Object, java.lang.Throwable); Code: 0: invokestatic #18 // Method kotlin/coroutines/experimental/intrinsics/IntrinsicsKt.getCOROUTINE_SUSPENDED:()Ljava/lang/Object; 3: astore 5 5: aload_0 6: getfield #22 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I 9: tableswitch { // 0 to 1 0: 32 1: 77 default: 102 } 32: aload_2 33: dup 34: ifnull 38 37: athrow 38: pop 39: aload_0 40: getfield #24 // Field p$:Lkotlinx/coroutines/experimental/CoroutineScope; 43: astore_3 44: ldc #26 // String Before 46: astore 4 48: getstatic #32 // Field java/lang/System.out:Ljava/io/PrintStream; 51: aload 4 53: invokevirtual #38 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V 56: sipush 1000 59: aload_0 60: aload_0 61: iconst_1 62: putfield #22 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I 65: invokestatic #44 // Method kotlinx/coroutines/experimental/DelayKt.delay:(ILkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object; 68: dup 69: aload 5 71: if_acmpne 85 74: aload 5 76: areturn 77: aload_2 78: dup 79: ifnull 83 82: athrow 83: pop 84: aload_1 85: pop 86: ldc #46 // String After 88: astore 4 90: getstatic #32 // Field java/lang/System.out:Ljava/io/PrintStream; 93: aload 4 95: invokevirtual #38 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V 98: getstatic #52 // Field kotlin/Unit.INSTANCE:Lkotlin/Unit; 101: areturn 102: new #54 // class java/lang/IllegalStateException 105: dup 106: ldc #56 // String call to \'resume\' before \'invoke\' with coroutine 108: invokespecial #60 // Method java/lang/IllegalStateException."<init>":(Ljava/lang/String;)V 111: athrow
I compiled this with kotlinc 1.2.41
From 32 to 76 is the code for printing Before
and calling delay(1000)
which suspends.
From 77 to 101 is the code for printing After
.
From 102 to 111 is error handling for illegal resume states, as denoted by the default
label in the switch table.
So as a summary, the coroutines in kotlin are simply state-machines that are controlled by some scheduler.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With