I am studying about CPS. I was wondering how does it work.
Object createPost(
Token token,
Item item,
Continuation<Post> const){...}
interface Continuation<in T> {
val context: CoroutineContext
fun resume(value: T)
fun resumeWithException(exception: Throwable)
}
People says CPS is just callbacks and nothing more than that.
But
<in T> do in the Continuation interface.End-user perspective
For the end-user the situation is relatively simple: continuation represents an execution flow that was suspended. It allows to resume the execution by invoking resume() or resumeWithException().
For example, assume we want to suspend for a second and then resume execution. We ask coroutines machinery to suspend, it provides a continuation object, we store it and at a later time we invoke resume() on it. Continuation object "knows" how to resume the execution:
suspend fun foo() {
println("foo:1")
val result = suspendCoroutine { cont ->
thread {
Thread.sleep(1000)
cont.resume("OK")
}
}
println("foo:2:$result")
}
suspendCoroutine() is one of possible ways to suspend and acquire a continuation to resume later. thread() and Thread.sleep() is just for demo purposes - usually, we should use delay() instead.
Very often we suspend to acquire some kind of data. This is why continuations support resuming with a result value. In above example we can see that the result of suspendCoroutine() is stored as result and we resume the continuation by passing "OK". This way after resuming result holds "OK". That explains <in T>.
Internals
This is much more complicated. Kotlin is executed in runtimes that don't support coroutines or suspending. For example, JVM can't wait inside a function without blocking any threads. This is simply not possible (I intentionally ignore Project Loom here). To make this possible, Kotlin compiler has to manipulate the bytecode and continuations take important part in this process.
As you noticed, every suspend function receives additional parameter of Continuation type. This object is used to control the process of resuming, it helps returning to the function caller and it holds the current coroutine context. Additionally, suspend functions return Any/Object to allow to signal their state to the caller.
Assume we have another function calling the first one:
suspend fun bar() {
println("bar:1")
foo()
println("bar:2")
}
Then we invoke bar(). Bytecode of both foo() and bar() is much more complicated than you would expect by looking at above source code. This is what's happening:
bar() is invoked with a continuation of its caller (let's ignore for now what does that mean).bar() checks if it "owns" the passed continuation. It sees not, so it assumes this is a continuation of its caller and that this is the initial execution of bar().bar() creates its own continuation object and stores the caller's continuation inside it.bar() starts executing as normal and gets to foo() point.bar() invokes foo() passing its continuation.foo() checks if it owns the passed continuation. It doesn't, continuation is owned by bar(), so foo() creates its own continuation, stores bar()'s continuation in it and starts a normal execution.suspendCoroutine() and similarly as earlier, the local state is stored inside foo()'s continuation.foo() is provided to the end-user inside the lambda passed to suspendCoroutine().foo() wants to suspend its execution, so it... returns... Yes, as said earlier, waiting without blocking the thread is not possible, so the only way to free the thread is by returning from the function.foo() returns with a special value that says: "execution was suspended".bar() reads this special value and also suspends, so also immediately returns.cont.resume().foo() knows how to resume the execution from the suspendCoroutine() point.foo() function passing itself as a parameter.foo() checks if it owns the passed continuation - this time it does, so it assumes this is not an initial call to foo(), but it is a request to resume execution. It reads the stored state from the continuation, it loads local variables and jumps to the proper code offset.foo() to bar().foo() knows that this time it was not invoked by bar(), so simply returning won't work. But it still keeps a continuation of its caller, so bar() suspended at exactly the point where foo() needs to return.foo() returns with magic value that says: "resume the continuation of my caller".bar() is resumed from the point where it executed foo().As you can see, this is pretty complicated. Normally, users of coroutines should not need to understand how they work internally.
Additional important notes:
foo() would not suspend, it would return normally to bar() and bar() would continue execution as usual. This is to decrease the overhead of the whole process in the case suspending is not needed.CoroutineContext, so also inside the continuation.The continuation is an object that store the state of coroutine.It must also store local variable and place where coroutine was suspended . This is an object that we can use to resume the coroutine by resume() or resumeWith() or resumeWithException() .
suspendCancellableCoroutine is a function from the kotlinx.coroutines library. Instead, we could use the suspendCoroutine function from Kotlin standard library, which would behave the same and provide continuation object .i will use suspendCancellableCoroutine that suspend the execution of coroutine and also allow the suspension to be cancellable. I will use it later.
// code 1
suspend fun getEmployee() {
println("Start main()")
var inc: Int = 0
val id = getID()
inc++
println("End main() $inc $id")
}
In above our suspension point is getID() now continuation store local variables of getEmployees() that will be used after the suspension point (getID()) like in above we need inc because they are used after suspension point .
There are a few ways in which suspending functions could have been implemented, but the Kotlin team decided on an option called continuation-passing style. This means that continuations (explained in the previous chapter) are passed from function to function as arguments. By convention, a continuation takes the last parameter position. In our code it will look like that
suspend fun getStudent(rollno: Int): Student?
suspend fun setStudentRollno(name: String): Unit
suspend fun showData(): Unit
// under hood
fun getStudent(rollno: Int, continuation: Continuation<*>): Any?
fun setStudentRollno(name: String, continuation: Continuation<*>): Any
fun showData(continuation: Continuation<*>): Any
You might have also noticed that the result type under the hood is different from the originally declared one. It has changed to Any or Any?. Why so? The reason is that a suspending function might be suspended, and so it might not return a declared type. In such a case, it returns a special COROUTINE_SUSPENDED marker, which we will later see in practice. For now, just notice that since getStudent might return Student? or COROUTINE_SUSPENDED (which is of type Any), its result type must be the closest supertype of Student? and Any, so it is Any?. We will discuss later in more detail.
// code 1 under hood look like
fun getEmployee(continuation: Continuation<*>): Any
fun getID(continuation: Continuation<*>): Any
The next thing is that this getEmpolyess needs its own continuation in order to remember its state. Let's name it getEmployeeContinuation. when getEmployees() is called it first check continuation that comes from parameter is its own continuation(getEmployeeContinuation) or caller continuation(continuation). if continuation is caller continuation(continuation) it means it is beginning of its body .but if continuation is its own continuation(getEmployeeContinuation) it means it is request to resume from suspension point (getID()).and we will see who will request.
At the beginning of its body, getEmployee will wrap the continuation (the parameter) of its caller with its own continuation (getEmployeeContinuation).
val currentContinuation = getEmployeeContinuation(continuation)
This should be done only if the continuation isn't wrapped already. If it is, this is part of the resume process, and we should keep the continuation unchanged(we will see how this will done in code)
val currentContinuation =
if (continuation is getEmployeeContinuation) continuation
else getEmployeeContinuation(continuation)
now let look getID()
suspend fun getID(): Int {
var text = "hello world"
println("before")
suspendCancellableCoroutine<Int> { getIDContinuation ->
thread {
Thread.sleep(1000)
getIDContinuation.resumeWith(Result.success(Unit))
}
}
text += "hh"
println("after. $text")
return 1
}
basic `s concept :
inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
if result is successfull then resume excution of coroutine as return value of suspension otherwise exception is re thrown right after suspension point.
getEmployees() function could be started from two places: either from the beginning (in the case of a first call) or from the point after suspension (in the case of resuming from continuation). To identify the current state, we use a field called label. At the start, it is 0, therefore the function will start from the beginning. However, it is set to the next state before each suspension point so that we start from just after the suspension point after a resume.
The actual code that continuations and suspending functions are compiled to is more complicated as i am giving pseudo code
getEmployee() : pseudo code
class GetEmployeeContinuation( val cont : Continuation<Unit>) : Continuation<Int> {
override val context : CoroutineContext
get() : cont.context
var label : Int = 0 //track state
var inc : Int = 0
var result : Result<Any>? = null
override fun resumeWith(result : Result<Int>) {
this.result = result
val res =
try{
// request to resume
val r = getEmployee(this)
if(r == COROUTINE_SUSPENDED){
return
}
Result.success(r as Unit)
}catch(ex : Throwable ) { Result.failure(ex)}
cont.resumeWith(res)
}
}
fun getEmployee( cont : Continuation<*>) : Any {
val currentContinuation =
if( cont is GetEmployeeContinuation ) cont
else GetEmployeeContinuation(cont)
var result: Result<Any>? = currentContinuation.result
if( currentContinuation.label ==0 ){
println(“start main()”)
currentContinuation.inc = 0
// set state so we can start execution of coroutine where we left
currentContinuation.label = 1
val res = getID(currentContinuation)
if( res == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
result = Result.success(res as Int)
}
if(currentContinuation.label == 1){
val id = result!!.getOrThrow() as Int
currentContinuation.inc = currentContinuation.inc + 1
println(“End main() ${currentContinuation.inc} $id”)
return Unit
}
}
getID() : pseudo code
fun getID(cont: Continuation<*>): Any {
val currentContinuation =
if (cont is GetIDContinuation) cont
else GetIDContinuation(cont)
if (currentContinuation.label == 0) {
println("before")
currentContinuation.text = "hello world"
currentContinuation.label = 1
if (Thread.sleep(1000) == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
}
if (currentContinuation.label == 1) {
currentContinuation.text += "hh"
println("after ${currentContinuation.text}")
return 1
}
error("Impossible")
}
class GetIDContinuation(val cont: Continuation<Int>) : Continuation<Unit> {
override val context: CoroutineContext
get() = cont.context
var text: String? = null
var result: Result<Any>? = null
var label: Int = 0
override fun resumeWith(result: Result<Unit>) {
this.result = result
val res =
try {
val r = getID(this)
if (r == COROUTINE_SUSPENDED) return
Result.success(r as Int)
} catch (ex: Exception) {
Result.failure(ex)
}
cont.resumeWith(res)
}
}
Let take One scene
In above getEmployee() is called then it make its own continuation object(GetEmployeeContinuation) and start execution from beginning and before calling getID() by passing its own continuation object(GetEmployeeContinuation) ,it update its state (set object GetEmployeeContinuation.label = 1) so we resume the execution of coroutine where we was suspended and also store local variable in object GetEmployeeContinuation and getID() make its own continuation object(GetIDContinuation) and start execution from beginning and it comes last suspension point(Thread.sleep())before calling last suspension point getID() also set its label to 1 and then getID() return COROTUINE_SUSPENDED and in getEmployee() the res = getID(currentContinuation ) --> res = COROUTINE_SUSPENDED so getEmployee() also return COROUTINE_SUSPENDED and whole call stack fold so the thread is free by returning from the function and can do something else .
after 1 second this line will run
getIDContinuation.resumeWith(Result.success(Unit)) and resumeWith() of class GetIDContinuation() will run by passing result as Unit that is why getIDContinuation has continuation(Unit)
In resumeWith() of GetEmployeeContinuation
The cont variable is of type Continuation(Int), which means it expects a value of type Int when it is resumed.when continuatin resume the execution of coroutine then resumeWith() method is called with a Result parameter. When the coroutine is resumed, the result of the continuation is set to the value(Result parameter) passed to resumeWith(). In this case, the value is an Int, so the result will have a data type of Any.
now we have set getIDContinuation.label is equal to 1 before calling Thread.sleep we start execution of coroutine where it was suspended so getID() is call again in resumeWith() of class GetIDContinuation by passing its own continuation object (getIDContinuation)(means resume ) to get value of r and getID() completes its execution and return 1 so variable r is 1 and res is 1 so now this line will run cont.resumeWith(res)
here cont = GetEmployeeContinuation , so resumeWith(1) of class GetEmployeeContinuation will be called and result of GetEmplyeeContinuation will be update to 1. getEmployeeContinuation.label =1 was set before calling getID() .so the execution of coroutine was starting where we have left . In resumeWith() of GetEmployeeContinuation we resume the execution of getEmployee() by calling it by passing its own continuaion object (GetEmployeeContinuation) and completed its execution and then it return unit that will be values of res and GetEmplyeeContinuation resume the excution of its caller when this line will execute cont.resumeWith(res) and process continuous in entire call stack .
actually i get link which help me to answer coroutine under hood
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With