The Internals of Concurrency in Kotlin With Example

Greetings!
We have recently published 100+ articles on android tutorials with kotlin and java. If you need, you may visit Android Tutorial for beginners page. You can also check Kotlin Tutorial for beginners. Also, if you are interested in content writing, you can mail us at tutorialwing@gmail.com.

Learn the internals of concurrency in Kotlin in this article by Miguel Angel Castiblanco Torres, a software engineer and an early adopter of Kotlin who has written about Kotlin’s concurrency primitives from the first beta release of coroutines.

When you work with Kotlin, it’s important that you have an idea of how suspending computations actually work. In this article, you’ll analyze Continuation Passing Style (CPS) and how it’s related to suspending computations.

Continuation Passing Style

The actual implementation of suspending computations is done using CPS. This paradigm is based on the premise of sending a continuation to a function that is invoked so that upon completion, the function will invoke the continuation. You can think of continuations as callbacks: whenever a suspending computation invokes another, it will pass a continuation that should be called upon completion or error.
All the heavy lifting is done by the compiler, which transforms all the suspending computations so that they send and receive said continuations—this means that the actual signatures of suspending functions are not the same as what you define. To top this, the suspending computations are transformed into state machines that can save and restore their state, and execute one portion of their code at a time—so whenever they are resumed, they restore their state and continue execution where they left off.
Coupling CPS with state machines, the compiler creates computations that can be suspended while waiting for other computations to complete.

Continuations

It all starts with continuations, which can be considered the building blocks of suspending computations. After all, they are even in the name of the paradigm. The reason continuations are so important is that they are what allows a coroutine to be resumed. To make this clearer, here is the definition of the Continuation interface:

public interface Continuation<in T> {
   public val context: CoroutineContext
   public fun resume(value: T)
   public fun resumeWithException(exception: Throwable)
}

This interface is rather simple. Here’s a quick overview of what it defines:

  • The CoroutineContext that is to be used with this Continuation.
  • resume() function that takes a value T as a parameter. This value is the result of the operation that caused the suspension—so, if the function was suspended to call a function that returns an Int, the value will be that integer.
  • resumeWithException() function that allows for the propagation of an exception.

So, a continuation is pretty much an extended callback, one that also contains information about the context in which it should be called. This context is an important part of the design because it’s what allows for each continuation to be executed in a specific thread or pool of threads—or with different configurations like exception handlers—but still remain sequential.

The suspend modifier

One specific goal of the Kotlin team was to make as few language changes as possible in order to support concurrency. Instead, the impact of supporting coroutines and concurrency was to be taken by the compiler, the standard library, and the coroutines library. So, the only relevant change from a language perspective is the addition of the suspend modifier.
This modifier indicates to the compiler that the code in the given scope—function or lambda—will work using continuations. So whenever a suspending computation is compiled, its bytecode will be a big continuation. For example, consider this suspending function:

suspend fun getUserSummary(id: Int): UserSummary {
    logger.log("fetching summary of $id")
    val profile = fetchProfile(id) // suspendingfun
    val age = calculateAge(profile.dateOfBirth)
    val terms = validateTerms(profile.country, age) // suspendingfun
    return UserSummary(profile, age, terms)
}

The compiler here is that the execution of getUserSummary() will happen through a Continuation. So, the compiler will use a Continuation to control the execution of getUserSummary(). In this case, the function suspends twice: first when fetchProfile() is invoked and later for the execution of validateTerms(). You can always count on IntelliJ IDEA and Android Studio to show you the suspension points of a function:

Tutorialwing kotlin concurrency in kotlin example

Example 1

This means that your function’s execution will happen in three steps.

  • First, the function will be started and the log will be printed, then the invocation of fetchProfile() will cause the execution to suspend.
  • Once fetchProfile() has ended, your function will calculate the age of the user, and then the execution will suspend again for validateTerms() to be executed.
  • The last step will occur once the terms are validated when the function resumes one last time, and all the data from the previous steps are used to create the summary of the user.

State machine

Once the compiler has analyzed the code, it will transform the suspending function into a state machine. The idea is that the suspending function can behave as a continuation by executing different portions of its code each time it’s resumed, based on its current state.

Labels

To better understand this, include a label where the execution starts and also on each of the places were the execution can be resumed:

suspend fun getUserSummary(id: Int): UserSummary {

   // label 0 -> first execution
   logger.log("fetching summary of $id")
   val profile = fetchProfile(id)

   // label 1 -> resuming
   val age = calculateAge(profile.dateOfBirth)
   val terms = validateTerms(profile.country, age)

   // label 2 -> resuming
   return UserSummary(profile, age, terms)
}

Now, pretend that you can somehow receive the label that indicates which part of the code to execute. Then, you could write a when statement to separate the code to be executed:

when(label) {
  0 -> { // Label 0 -> first execution
  logger.log("fetching summary of $id")
  fetchProfile(id)
      return
  }
  1 -> { // label 1 -> resuming
  calculateAge(profile.dateOfBirth)
  validateTerms(profile.country, age)
      return
  }
  2 -> // label 2 -> resuming and terminating
  UserSummary(profile, age, terms)
}

Note that this snippet is a simplified representation of the generated bytecode, and some of them are pseudo-Kotlin. This is not intended to transform the bytecode generated by the compiler into valid Kotlin, but rather to give you a fairly accurate idea of how it works.

Continuations

So, now that you have a bare function that can resume execution in a different point, you need to find a way to indicate the label of the function. Here is where Continuation takes the spotlight. Say that you implement a continuation at the very start of your function, one that will simply redirect any invocation to the callback back at the same function.
To be able to resume, you need to have at least the label. So, create an implementation of CoroutineImpl, which is an abstract implementation of Continuation that already includes a label property. The only abstract function in CoroutineImpl is doResume(), so that’s the only thing you need to implement at this point:

suspend fun getUserSummary(id: Int): UserSummary {
   val sm = object : CoroutineImpl {
      override fun doResume(data: Any?, exception: Throwable?) {
          // TODO: Call getUserSummary to resume it
      }
   }
}

Then, you just need to receive Continuation as a parameter. That way, you can have doResume() forward the callback to getUserSummary(), as shown here:

suspend fun getUserSummary(id: Int, cont: Continuation<Any?>): UserSummary {

   val sm = object : CoroutineImpl {
       override fun doResume(data: Any?, exception: Throwable?) {
           getUserSummary(id, this)
       }
   }

   val state = sm as CoroutineImpl
       when(state.label) {
         ...
       }
}

Note that you aren’t receiving CoroutineImpl directly in getUserSummary(). This is because you want to invoke cont at the completion of getUserSummary() so that the caller of it gets resumed as well. So, for compatibility, it makes more sense to receive a Continuation in case the caller didn’t use CoroutineImpl.

Callbacks

Now that you have the ability to resume at a given point using the label, you need to modify the other suspending functions that are called from getUserSummary() so that they also receive CoroutineImpl. Assume that you, as the compiler, have modified the fetchProfile() and validateTerms() functions so that they received a Continuation —just like you did for getUserSummary() —and called doResume() upon the completion of their execution. Then, you could invoke them like this:

when(state.label) {
   0 -> { // Label 0 -> first execution
      logger.log("fetching summary of $id")
      fetchProfile(id, sm)
      return
   }
   1 -> { // label 1 -> resuming
      calculateAge(profile.dateOfBirth)
      validateTerms(profile.country, age, sm)
      return
   }
   2 -> // label 2 -> resuming and terminating
     UserSummary(profile, age, terms)
}

By having both fetchProfile() and validateTerms() call the continuation that they receive whenever they finish their execution, you are having them call the continuation that you implemented in getUserSummary(), thus resuming its execution.

Incrementing the Label

However, currently, you aren’t actually incrementing the label, so the function will loop on label zero. That should be done right before the call to the other suspending functions:

when(state.label) {
   0 -> { // Label 0 -> first execution
     logger.log("fetching summary of $id")
     sm.label = 1
     fetchProfile(id, sm)
     return
   }
   1 -> { // label 1 -> resuming
     calculateAge(profile.dateOfBirth)
     sm.label = 2
     validateTerms(profile.country, age, sm)
     return
   }
   2 -> // label 2 -> resuming and terminating
     UserSummary(profile, age, terms)
}

Note that, by default, the label is zero in CoroutineImpl. Also, note that you set the value for the next piece of code, not for the current one.

Storing the Result From the Other Operations

Currently, you aren’t storing the result of the other suspending functions. To do so, you need to create a more complete implementation of the state machine, which can be easy if you use CoroutineImpl as the base. In this case, you could create this private class outside of the function:

private class GetUserSummarySM: CoroutineImpl {

    var value: Any? = null
    var exception: Throwable? = null
    var cont: Continuation<Any?>?= null
    val id: Int? = null
    var profile: Profile? = null
    var age: Int? = null
    var terms: Terms? = null

    override fun doResume(data: Any?, exception: Throwable?) {
        this.value = data
        this.exception = exception
        getUserSummary(id, this)
    }
}

Here you have done the following:

  • Mapped all the different variables that exist in the function to the class (id, profile, age, terms)
  • Added one value to store the data returned by the caller in doResume()
  • Added one value to store the exception that can be sent in doResume()
  • Added one value to store the initial continuation that is sent when the execution of getUserSummary() is first started

So, now you can update the function itself to both use this class and set the properties as they become available:

val sm = cont as? GetUserSummarySM ?: GetUserSummarySM()

when(sm.label) {
   0 -> { // Label 0 -> first execution
       sm.cont = cont
       logger.log("fetching summary of $id")
       sm.label = 1
       fetchProfile(id, sm)
       return
   }
   1 -> { // label 1 -> resuming
       sm.profile = sm.value as Profile
       sm.age = calculateAge(sm.profile!!.dateOfBirth)
       sm.label = 2
       validateTerms(sm.profile!!.country, sm.age!!, sm)
       return
    }
    2 -> {// label 2 -> resuming and terminating
       sm.terms = sm.value as Terms
       UserSummary(sm.profile!!, sm.age!!, sm.terms!!)
    }
}

There are many important things in this code:

  • You check whether cont is an instance of GetUserSummarySM. If that’s the case, you use it like the state. If not, that means that it’s the initial execution of the function, so a new one is created.
  • As a part of the first label, you store the current cont in the state machine. This will be used later to resume the caller of getUserSummary().
  • The second and third labels start by casting sm.value into the result of the last operation and storing it in the correct variable of the state machine.
  • You use all the variables directly from the state machine.

Returning the Result of the Suspending Computation

Now, your state machine is able to do almost everything it needs to do. The big missing part is to actually return the result of the operation somehow. Because this uses CPS, it isn’t actually going to return the value in the classical sense; instead, it will use the first continuation it received as a callback, where it will send the result. This would be a complete implementation—minus error handling:

suspend fun getUserSummary(id: Int, cont: Continuation<Any?>) {

   val sm = cont as? GetUserSummarySM ?: GetUserSummarySM()

   when(sm.label) {
       0 -> { // Label 0 -> first execution
           sm.cont = cont
           logger.log("fetching summary of $id")
           sm.label = 1
           fetchProfile(id, sm)
           return
        }
        1 -> { // label 1 -> resuming
           sm.profile = sm.value as Profile
           sm.age = calculateAge(sm.profile!!.dateOfBirth)
           sm.label = 2
           validateTerms(sm.profile!!.country, sm.age!!, sm)
           return
        }
        2 -> {// label 2 -> resuming and terminating
           sm.terms = sm.value as Terms
           sm.cont!!.resume(UserSummary(sm.profile!!, sm.age!!, sm.terms!!))
        }
    }
}

In the actual bytecode, getUserSummary() will only suspend if the suspending functions it calls returns COROUTINE_SUSPENDED otherwise, it will cast the result of the function to the expected typ — Profile and Terms in this case—and continue executing the next label. This guarantees that no unnecessary suspensions occur.

Note that not only are you now using sm.cont as a callback to return the result, but you have also removed the return type from the function. But in reality, the signature of suspending functions indicates that they return Any?, and this happens because suspending computations can either return the value COROUTINE_SUSPENDED to indicate that the suspension did happen, or they can directly return a result if they didn’t suspend. For example, imagine a suspending function that will only suspend in a given condition; if that condition doesn’t occur, the function doesn’t need to suspend and can instead return the result directly.

If you found this article interesting, you can explore Learning Concurrency in Kotlin to take advantage of Kotlin’s concurrency primitives to write efficient multithreaded applications. Learning Concurrency in Kotlin will help you build an Android application – an RSS reader – designed and implemented according to the different topics covered in the book.

That’s end of tutorial on Concurrency in Kotlin.

Support Us

If you like Tutorialwing and would like to contribute, you can email an article on any educational topic at tutorialwing@gmail.com. We would love to publish your article. See your article on Tutorialwing and help others with your knowledge. Follow Facebook, LinkedIn, Google+, Twitter, Youtube for latest updates.
Greetings!
We have recently published 100+ articles on android tutorials with kotlin and java. If you need, you may visit Android Tutorial for beginners page. You can also check Kotlin Tutorial for beginners