Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do Kotlin coroutines work internally?

How does Kotlin implement coroutines internally?

Coroutines are said to be a "lighter version" of threads, and I understand that they use threads internally to execute coroutines.

What happens when I start a coroutine using any of the builder functions?

This is my understanding of running this code:

GlobalScope.launch {       <---- (A)     val y = loadData()     <---- (B)  // suspend fun loadData()      println(y)             <---- (C)     delay(1000)            <---- (D)     println("completed")   <---- (E) } 
  1. Kotlin has a pre-defined ThreadPool at the beginning.
  2. At (A), Kotlin starts executing the coroutine in the next available free thread (Say Thread01).
  3. At (B), Kotlin stops executing the current thread, and starts the suspending function loadData() in the next available free thread (Thread02).
  4. When (B) returns after execution, Kotlin continues the coroutine in the next available free thread (Thread03).
  5. (C) executes on Thread03.
  6. At (D), the Thread03 is stopped.
  7. After 1000ms, (E) is executed on the next free thread, say Thread01.

Am I understanding this correctly? Or are coroutines implemented in a different way?


Update on 2021: Here's an excellent article by Manuel Vivo that complements all the answers below.

like image 434
Vishnu Haridas Avatar asked Nov 28 '18 19:11

Vishnu Haridas


People also ask

How coroutine works under the hood?

Coroutines are like light-weight threads. They need less resources that regular threads, so you can create more of them. Coroutines use thread pool behind the scenes. Instead of blocking an entire thread, coroutine suspends.

Is coroutines run on main thread?

We can for example schedule coroutines on a Java Executor or on Android main looper. However, we can't schedule coroutines on just any thread, it has to cooperate.

Do Kotlin coroutines use threads?

A coroutine can start executing in one thread, suspend execution, and resume on a different thread. Coroutines are not managed by the operating system, but by the Kotlin Runtime. When you are sleeping a thread it is blocked for a particular period of time.

Are coroutines part of Kotlin?

Coroutines were added to Kotlin in version 1.3 and are based on established concepts from other languages. On Android, coroutines help to manage long-running tasks that might otherwise block the main thread and cause your app to become unresponsive.


2 Answers

Coroutines are a completely separate thing from any scheduling policy that you describe. A coroutine is basically a call chain of suspend funs. Suspension is totally under your control: you just have to call suspendCoroutine. You'll get a callback object so you can call its resume method and get back to where you suspended.

Here's some code where you can see that suspension is a very direct and trasparent mechanism, fully under your control:

import kotlin.coroutines.* import kotlinx.coroutines.*  var continuation: Continuation<String>? = null  fun main(args: Array<String>) {     val job = GlobalScope.launch(Dispatchers.Unconfined) {         while (true) {             println(suspendHere())         }     }     continuation!!.resume("Resumed first time")     continuation!!.resume("Resumed second time") }  suspend fun suspendHere() = suspendCancellableCoroutine<String> {     continuation = it } 

All the code above executes on the same, main thread. There is no multithreading at all going on.

The coroutine you launch suspends itself each time it calls suspendHere(). It writes the continuation callback to the continuation property, and then you explicitly use that continuation to resume the coroutine.

The code uses the Unconfined coroutine dispatcher which does no dispatching to threads at all, it just runs the coroutine code right there where you invoke continuation.resume().


With that in mind, let's revisit your diagram:

GlobalScope.launch {       <---- (A)     val y = loadData()     <---- (B)  // suspend fun loadData()      println(y)             <---- (C)     delay(1000)            <---- (D)     println("completed")   <---- (E) } 
  1. Kotlin has a pre-defined ThreadPool at the beginning.

It may or may not have a thread pool. A UI dispatcher works with a single thread.

The prerequisite for a thread to be the target of a coroutine dispatcher is that there is a concurrent queue associated with it and the thread runs a top-level loop that takes Runnable objects from this queue and executes them. A coroutine dispatcher simply puts the continuation on that queue.

  1. At (A), Kotlin starts executing the coroutine in the next available free thread (Say Thread01).

It can also be the same thread where you called launch.

  1. At (B), Kotlin stops executing the current thread, and starts the suspending function loadData() in the next available free thread (Thread02).

Kotlin has no need to stop any threads in order to suspend a coroutine. In fact, the main point of coroutines is that threads don't get started or stopped. The thread's top-level loop will go on and pick another runnable to run.

Furthermore, the mere fact that you're calling a suspend fun has no significance. The coroutine will only suspend itself when it explicitly calls suspendCoroutine. The function may also simply return without suspension.

But let's assume it did call suspendCoroutine. In that case the coroutine is no longer running on any thread. It is suspended and can't continue until some code, somewhere, calls continuation.resume(). That code could be running on any thread, any time in the future.

  1. When (B) returns after execution, Kotlin continues the coroutine in the next available free thread (Thread03).

B doesn't "return after execution", the coroutine resumes while still inside its body. It may suspend and resume any number of times before returning.

  1. (C) executes on Thread03.
  2. At (D), the Thread03 is stopped.
  3. After 1000ms, (E) is executed on the next free thread, say Thread01.

Again, no threads are being stopped. The coroutine gets suspended and a mechanism, usually specific to the dispatcher, is used to schedule its resumption after 1000 ms. At that point it will be added to the run queue associated with the dispatcher.


For specificity, let's see some examples of what kind of code it takes to dispatch a coroutine.

Swing UI dispatcher:

EventQueue.invokeLater { continuation.resume(value) } 

Android UI dispatcher:

mainHandler.post { continuation.resume(value) } 

ExecutorService dispatcher:

executor.submit { continuation.resume(value) }  
like image 60
Marko Topolnik Avatar answered Oct 26 '22 22:10

Marko Topolnik


Coroutines work by creating a switch over possible resume points:

class MyClass$Coroutine extends CoroutineImpl {     public Object doResume(Object o, Throwable t) {         switch(super.state) {         default:                 throw new IllegalStateException("call to \"resume\" before \"invoke\" with coroutine");         case 0:  {              // code before first suspension              state = 1; // or something else depending on your branching              break;         }         case 1: {             ...         }         }         return null;     } } 

The resulting code executing this coroutine is then creating that instance and calls the doResume() function everytime it needs to resume execution, how that is handled depends on the scheduler used for execution.

Here is an example compilation for a simple coroutine:

launch {     println("Before")     delay(1000)     println("After") } 

Which compiles to this bytecode

private kotlinx.coroutines.experimental.CoroutineScope p$;  public final java.lang.Object doResume(java.lang.Object, java.lang.Throwable); Code:    0: invokestatic  #18                 // Method kotlin/coroutines/experimental/intrinsics/IntrinsicsKt.getCOROUTINE_SUSPENDED:()Ljava/lang/Object;    3: astore        5    5: aload_0    6: getfield      #22                 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I    9: tableswitch   { // 0 to 1                  0: 32                  1: 77            default: 102       }   32: aload_2   33: dup   34: ifnull        38   37: athrow   38: pop   39: aload_0   40: getfield      #24                 // Field p$:Lkotlinx/coroutines/experimental/CoroutineScope;   43: astore_3   44: ldc           #26                 // String Before   46: astore        4   48: getstatic     #32                 // Field java/lang/System.out:Ljava/io/PrintStream;   51: aload         4   53: invokevirtual #38                 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V   56: sipush        1000   59: aload_0   60: aload_0   61: iconst_1   62: putfield      #22                 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I   65: invokestatic  #44                 // Method kotlinx/coroutines/experimental/DelayKt.delay:(ILkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;   68: dup   69: aload         5   71: if_acmpne     85   74: aload         5   76: areturn   77: aload_2   78: dup   79: ifnull        83   82: athrow   83: pop   84: aload_1   85: pop   86: ldc           #46                 // String After   88: astore        4   90: getstatic     #32                 // Field java/lang/System.out:Ljava/io/PrintStream;   93: aload         4   95: invokevirtual #38                 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V   98: getstatic     #52                 // Field kotlin/Unit.INSTANCE:Lkotlin/Unit;  101: areturn  102: new           #54                 // class java/lang/IllegalStateException  105: dup  106: ldc           #56                 // String call to \'resume\' before \'invoke\' with coroutine  108: invokespecial #60                 // Method java/lang/IllegalStateException."<init>":(Ljava/lang/String;)V  111: athrow 

I compiled this with kotlinc 1.2.41

From 32 to 76 is the code for printing Before and calling delay(1000) which suspends.

From 77 to 101 is the code for printing After.

From 102 to 111 is error handling for illegal resume states, as denoted by the default label in the switch table.

So as a summary, the coroutines in kotlin are simply state-machines that are controlled by some scheduler.

like image 33
Minn Avatar answered Oct 26 '22 22:10

Minn