Guide to kotlinx.coroutines by example

This is a short guide on core features of kotlinx.coroutines with a series of examples.

Introduction and setup

Kotlin, as a language, provides only minimal low-level APIs in its standard library to enable various other libraries to utilize coroutines. Unlike many other languages with similar capabilities, async and await are not keywords in Kotlin and are not even part of its standard library.

kotlinx.coroutines in one such rich library. It contains a number of high-level coroutine-enabled primitives that this guide covers, including async and await. You need to add a dependency on kotlinx-coroutines-core module as explained here to use primitives from this guide in your projects.

Table of contents

Coroutine basics

This section covers basic coroutine concepts.

Your first coroutine

Run the following code:

fun main(args: Array<String>) {
    launch(CommonPool) { // create new coroutine in common thread pool
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello,") // main function continues while coroutine is delayed
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}

You can get full code here

Run this code:

Hello,
World!

Essentially, coroutines are light-weight threads. They are launched with launch coroutine builder. You can achieve the same result replacing launch(CommonPool) { ... } with thread { ... } and delay(...) with Thread.sleep(...). Try it.

If you start by replacing launch(CommonPool) by thread, the compiler produces the following error:

Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function

That is because delay is a special suspending function that does not block a thread, but suspends coroutine and it can be only used from a coroutine.

Bridging blocking and non-blocking worlds

The first example mixes non-blocking delay(...) and blocking Thread.sleep(...) in the same code of main function. It is easy to get lost. Let's cleanly separate blocking and non-blocking worlds by using runBlocking:

fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
    launch(CommonPool) { // create new coroutine in common thread pool
        delay(1000L)
        println("World!")
    }
    println("Hello,") // main coroutine continues while child is delayed
    delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
}

You can get full code here

The result is the same, but this code uses only non-blocking delay.

runBlocking { ... } works as an adaptor that is used here to start the top-level main coroutine. The regular code outside of runBlocking blocks, until the coroutine inside runBlocking is active.

This is also a way to write unit-tests for suspending functions:

class MyTest {
    @Test
    fun testMySuspendingFunction() = runBlocking<Unit> {
        // here we can use suspending functions using any assertion style that we like
    }
}

Waiting for a job

Delaying for a time while another coroutine is working is not a good approach. Let's explicitly wait (in a non-blocking way) until the background Job that we have launched is complete:

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    job.join() // wait until child coroutine completes
}

You can get full code here

Now the result is still the same, but the code of the main coroutine is not tied to the duration of the background job in any way. Much better.

Extract function refactoring

Let's extract the block of code inside launch(CommonPool} { ... } into a separate function. When you perform "Extract function" refactoring on this code you get a new function with suspend modifier. That is your first suspending function. Suspending functions can be used inside coroutines just like regular functions, but their additional feature is that they can, in turn, use other suspending functions, like delay in this example, to suspend execution of a coroutine.

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) { doWorld() }
    println("Hello,")
    job.join()
}

// this is your first suspending function
suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

You can get full code here

Coroutines ARE light-weight

Run the following code:

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = List(100_000) { // create a lot of coroutines and list their jobs
        launch(CommonPool) {
            delay(1000L)
            print(".")
        }
    }
    jobs.forEach { it.join() } // wait for all jobs to complete
}

You can get full code here

It starts 100K coroutines and, after a second, each coroutine prints a dot. Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)

Coroutines are like daemon threads

The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then returns from the main function after some delay:

fun main(args: Array<String>) = runBlocking<Unit> {
    launch(CommonPool) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // just quit after delay
}

You can get full code here

You can run and see that it prints three lines and terminates:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...

Active coroutines do not keep the process alive. They are like daemon threads.

Cancellation and timeouts

This section covers coroutine cancellation and timeouts.

Cancelling coroutine execution

In small application the return from "main" method might sound like a good idea to get all coroutines implicitly terminated. In a larger, long-running application, you need finer-grained control. The launch function returns a Job that can be used to cancel running coroutine:

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to ensure it was cancelled indeed
    println("main: Now I can quit.")
}

You can get full code here

It produces the following output:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

As soon as main invokes job.cancel, we don't see any output from the other coroutine because it was cancelled.

Cancellation is cooperative

Coroutine cancellation is cooperative. A coroutine code has to cooperate to be cancellable. All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled. However, if a coroutine is working in a computation and does not check for cancellation, then it cannot be cancelled, like the following example shows:

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
        var nextPrintTime = 0L
        var i = 0
        while (i < 10) { // computation loop
            val currentTime = System.currentTimeMillis()
            if (currentTime >= nextPrintTime) {
                println("I'm sleeping ${i++} ...")
                nextPrintTime = currentTime + 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to see if it was cancelled....
    println("main: Now I can quit.")
}

You can get full code here

Run it to see that it continues to print "I'm sleeping" even after cancellation.

Making computation code cancellable

There are two approaches to making computation code cancellable. The first one is to periodically invoke a suspending function. There is a yield function that is a good choice for that purpose. The other one is to explicitly check the cancellation status. Let us try the later approach.

Replace while (true) in the previous example with while (isActive) and rerun it.

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
        var nextPrintTime = 0L
        var i = 0
        while (isActive) { // cancellable computation loop
            val currentTime = System.currentTimeMillis()
            if (currentTime >= nextPrintTime) {
                println("I'm sleeping ${i++} ...")
                nextPrintTime = currentTime + 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to see if it was cancelled....
    println("main: Now I can quit.")
}

You can get full code here

As you can see, now this loop can be cancelled. isActive is a property that is available inside the code of coroutines via CoroutineScope object.

Closing resources with finally

Cancellable suspending functions throw CancellationException on cancellation which can be handled in all the usual way. For example, the try {...} finally {...} and Kotlin use function execute their finalization actions normally when coroutine is cancelled:

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
        try {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            println("I'm running finally")
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to ensure it was cancelled indeed
    println("main: Now I can quit.")
}

You can get full code here

The example above produces the following output:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
I'm running finally
main: Now I can quit.

Run non-cancellable block

Any attempt to use a suspending function in the finally block of the previous example will cause CancellationException, because the coroutine running this code is cancelled. Usually, this is not a problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a communication channel) are usually non-blocking and do not involve any suspending functions. However, in the rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in run(NonCancellable) {...} using run function and NonCancellable context as the following example shows:

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
        try {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            run(NonCancellable) {
                println("I'm running finally")
                delay(1000L)
                println("And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to ensure it was cancelled indeed
    println("main: Now I can quit.")
}

You can get full code here

Timeout

The most obvious reason to cancel coroutine execution in practice, is because its execution time has exceeded some timeout. While you can manually track the reference to the corresponding Job and launch a separate coroutine to cancel the tracked one after delay, there is a ready to use withTimeout function that does it. Look at the following example:

fun main(args: Array<String>) = runBlocking<Unit> {
    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
}

You can get full code here

It produces the following output:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" java.util.concurrent.CancellationException: Timed out waiting for 1300 MILLISECONDS

We have not seen the CancellationException stack trace printed on the console before. That is because inside a cancelled coroutine CancellationException is considered to be a normal reason for coroutine completion. However, in this example we have used withTimeout right inside the main function.

Because cancellation is just an exception, all the resources will be closed in a usual way. You can wrap the code with timeout in try {...} catch (e: CancellationException) {...} block if you need to do some additional action specifically on timeout.

Composing suspending functions

This section covers various approaches to composition of suspending functions.

Sequential by default

Assume that we have two suspending functions defined elsewhere that do something useful like some kind of remote service call or computation. We just pretend they are useful, but actually each one just delays for a second for the purpose of this example:

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}

What do we do if need to invoke them sequentially -- first doSomethingUsefulOne and then doSomethingUsefulTwo and compute the sum of their results? In practise we do this if we use the results of the first function to make a decision on whether we need to invoke the second one or to decide on how to invoke it.

We just use a normal sequential invocation, because the code in the coroutine, just like in the regular code, is sequential by default. The following example demonstrates it by measuring the total time it takes to execute both suspending functions:

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()
        println("The answer is ${one + two}")
    }
    println("Completed in $time ms")
}

You can get full code here

It produces something like this:

The answer is 42
Completed in 2017 ms

Concurrent using async

What if there are no dependencies between invocation of doSomethingUsefulOne and doSomethingUsefulTwo and we want to get the answer faster, by doing both concurrently? This is where async comes to help.

Conceptually, async is just like launch. It starts a separate coroutine which is a light-weight thread that works concurrently with all the other coroutines. The difference is that launch returns a Job and does not carry any resulting value, while async returns a Deferred -- a light-weight non-blocking future that represents a promise to provide a result later. You can use .await() on a deferred value to get its eventual result, but Deferred is also a Job, so you can cancel it if needed.

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async(CommonPool) { doSomethingUsefulOne() }
        val two = async(CommonPool) { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

You can get full code here

It produces something like this:

The answer is 42
Completed in 1017 ms

This is twice as fast, because we have concurrent execution of two coroutines. Note, that concurrency with coroutines is always explicit.

Lazily started async

There is a laziness option to async with start = false parameter. It starts coroutine only when its result is needed by some await or if a start function is invoked. Run the following example that differs from the previous one only by this option:

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async(CommonPool, start = false) { doSomethingUsefulOne() }
        val two = async(CommonPool, start = false) { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

You can get full code here

It produces something like this:

The answer is 42
Completed in 2017 ms

So, we are back to sequential execution, because we first start and await for one, and then start and await for two. It is not the intended use-case for laziness. It is designed as a replacement for the standard lazy function in cases when computation of the value involves suspending functions.

Async-style functions

We can define async-style functions that invoke doSomethingUsefulOne and doSomethingUsefulTwo asynchronously using async coroutine builder. It is a good style to name such functions with either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous computation and one needs to use the resulting deferred value to get the result.

// The result type of asyncSomethingUsefulOne is Deferred<Int>
fun asyncSomethingUsefulOne() = async(CommonPool) {
    doSomethingUsefulOne()
}

// The result type of asyncSomethingUsefulTwo is Deferred<Int>
fun asyncSomethingUsefulTwo() = async(CommonPool)  {
    doSomethingUsefulTwo()
}

Note, that these asyncXXX function are not suspending functions. They can be used from anywhere. However, their use always implies asynchronous (here meaning concurrent) execution of their action with the invoking code.

The following example shows their use outside of coroutine:

// note, that we don't have `runBlocking` to the right of `main` in this example
fun main(args: Array<String>) {
    val time = measureTimeMillis {
        // we can initiate async actions outside of a coroutine
        val one = asyncSomethingUsefulOne()
        val two = asyncSomethingUsefulTwo()
        // but waiting for a result must involve either suspending or blocking.
        // here we use `runBlocking { ... }` to block the main thread while waiting for the result
        runBlocking {
            println("The answer is ${one.await() + two.await()}")
        }
    }
    println("Completed in $time ms")
}

You can get full code here

Coroutine context and dispatchers

We've already seen launch(CommonPool) {...}, async(CommonPool) {...}, run(NonCancellable) {...}, etc. In these code snippets CommonPool and NonCancellable are coroutine contexts. This section covers other available choices.

Dispatchers and threads

Coroutine context includes a coroutine dispatcher which determines what thread or threads the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { // not confined -- will work with main thread
        println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(context) { // context of the parent, runBlocking coroutine
        println("    'context': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
        println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
        println("     'newSTC': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

You can get full code here

It produces the following output (maybe in different order):

 'Unconfined': I'm working in thread main
 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
     'newSTC': I'm working in thread MyOwnThread
    'context': I'm working in thread main

The difference between parent context and Unconfined context will be shown later.

Unconfined vs confined dispatcher

The Unconfined coroutine dispatcher starts coroutine in the caller thread, but only until the first suspension point. After suspension it resumes in the thread that is fully determined by the suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.

On the other side, context property that is available inside the block of any coroutine via CoroutineScope interface, is a reference to a context of this particular coroutine. This way, a parent context can be inherited. The default context of runBlocking, in particular, is confined to be invoker thread, so inheriting it has the effect of confining execution to this thread with a predictable FIFO scheduling.

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { // not confined -- will work with main thread
        println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
    }
    jobs += launch(context) { // context of the parent, runBlocking coroutine
        println("    'context': I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("    'context': After delay in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

You can get full code here

Produces the output:

 'Unconfined': I'm working in thread main
    'context': I'm working in thread main
 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
    'context': After delay in thread main

So, the coroutine that had inherited context of runBlocking {...} continues to execute in the main thread, while the unconfined one had resumed in the scheduler thread that delay function is using.

Debugging coroutines and threads

Coroutines can suspend on one thread and resume on another thread with Unconfined dispatcher or with a multi-threaded dispatcher like CommonPool. Even with a single-threaded dispatcher it might be hard to figure out what coroutine was doing what, where, and when. The common approach to debugging applications with threads is to print the thread name in the log file on each log statement. This feature is universally supported by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so kotlinx.coroutines includes debugging facilities to make it easier.

Run the following code with -Dkotlinx.coroutines.debug JVM option:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) = runBlocking<Unit> {
    val a = async(context) {
        log("I'm computing a piece of the answer")
        6
    }
    val b = async(context) {
        log("I'm computing another piece of the answer")
        7
    }
    log("The answer is ${a.await() * b.await()}")
}

You can get full code here

There are three coroutines. The main coroutine (#1) -- runBlocking one, and two coroutines computing deferred values a (#2) and b (#3). They are all executing in the context of runBlocking and are confined to the main thread. The output of this code is:

[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42

The log function prints the name of the thread in square brackets and you can see, that it is the main thread, but the identifier of the currently executing coroutine is appended to it. This identifier is consecutively assigned to all created coroutines when debugging mode is turned on.

You can read more about debugging facilities in the documentation for newCoroutineContext function.

Jumping between threads

Run the following code with -Dkotlinx.coroutines.debug JVM option:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) {
    val ctx1 = newSingleThreadContext("Ctx1")
    val ctx2 = newSingleThreadContext("Ctx2")
    runBlocking(ctx1) {
        log("Started in ctx1")
        run(ctx2) {
            log("Working in ctx2")
        }
        log("Back to ctx1")
    }
}

You can get full code here

It demonstrates two new techniques. One is using runBlocking with an explicitly specified context, and the second one is using run function to change a context of a coroutine while still staying in the same coroutine as you can see in the output below:

[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

Job in the context

The coroutine Job is part of its context. The coroutine can retrieve it from its own context using context[Job] expression:

fun main(args: Array<String>) = runBlocking<Unit> {
    println("My job is ${context[Job]}")
}

You can get full code here

It produces somethine like

My job is BlockingCoroutine{Active}@65ae6ba4

So, isActive in CoroutineScope is just a convenient shortcut for context[Job]!!.isActive.

Children of a coroutine

When context of a coroutine is used to launch another coroutine, the Job of the new coroutine becomes a child of the parent coroutine's job. When the parent coroutine is cancelled, all its children are recursively cancelled, too.

fun main(args: Array<String>) = runBlocking<Unit> {
    // start a coroutine to process some kind of incoming request
    val request = launch(CommonPool) {
        // it spawns two other jobs, one with its separate context
        val job1 = launch(CommonPool) {
            println("job1: I have my own context and execute independently!")
            delay(1000)
            println("job1: I am not affected by cancellation of the request")
        }
        // and the other inherits the parent context
        val job2 = launch(context) {
            println("job2: I am a child of the request coroutine")
            delay(1000)
            println("job2: I will not execute this line if my parent request is cancelled")
        }
        // request completes when both its sub-jobs complete:
        job1.join()
        job2.join()
    }
    delay(500)
    request.cancel() // cancel processing of the request
    delay(1000) // delay a second to see what happens
    println("main: Who has survived request cancellation?")
}

You can get full code here

The output of this code is:

job1: I have my own context and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?

Combining contexts

Coroutine context can be combined using + operator. The context on the right-hand side replaces relevant entries of the context on the left-hand side. For example, a Job of the parent coroutine can be inherited, while its dispatcher replaced:

fun main(args: Array<String>) = runBlocking<Unit> {
    // start a coroutine to process some kind of incoming request
    val request = launch(context) { // use the context of `runBlocking`
        // spawns CPU-intensive child job in CommonPool !!! 
        val job = launch(context + CommonPool) {
            println("job: I am a child of the request coroutine, but with a different dispatcher")
            delay(1000)
            println("job: I will not execute this line if my parent request is cancelled")
        }
        job.join() // request completes when its sub-job completes
    }
    delay(500)
    request.cancel() // cancel processing of the request
    delay(1000) // delay a second to see what happens
    println("main: Who has survived request cancellation?")
}

You can get full code here

The expected outcome of this code is:

job: I am a child of the request coroutine, but with a different dispatcher
main: Who has survived request cancellation?

Naming coroutines for debugging

Automatically assigned ids are good when coroutines log often and you just need to correlate log records coming from the same coroutine. However, when coroutine is tied to the processing of a specific request or doing some specific background task, it is better to name it explicitly for debugging purposes. CoroutineName serves the same function as a thread name. It'll get displayed in the thread name that is executing this coroutine when debugging more is turned on.

The following example demonstrates this concept:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
    log("Started main coroutine")
    // run two background value computations
    val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
        log("Computing v1")
        delay(500)
        252
    }
    val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
        log("Computing v2")
        delay(1000)
        6
    }
    log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}

You can get full code here

The output it produces with -Dkotlinx.coroutines.debug JVM option is similar to:

[main @main#1] Started main coroutine
[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42

Cancellation via explicit job

Let us put our knowledge about contexts, children and jobs together. Assume that our application has an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch and update data, do animations, etc. All of these coroutines must be cancelled when activity is destroyed to avoid memory leaks.

We can manage a lifecycle of our coroutines by creating an instance of Job that is tied to the lifecycle of our activity. A job instance is created using Job() factory function as the following example shows. We need to make sure that all the coroutines are started with this job in their context and then a single invocation of Job.cancel terminates them all.

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = Job() // create a job object to manage our lifecycle
    // now launch ten coroutines for a demo, each working for a different time
    val coroutines = List(10) { i ->
        // they are all children of our job object
        launch(context + job) { // we use the context of main runBlocking thread, but with our own job object 
            delay(i * 200L) // variable delay 0ms, 200ms, 400ms, ... etc
            println("Coroutine $i is done")
        }
    }
    println("Launched ${coroutines.size} coroutines")
    delay(500L) // delay for half a second
    println("Cancelling job!")
    job.cancel() // cancel our job.. !!!
    delay(1000L) // delay for more to see if our coroutines are still working
}

You can get full code here

The output of this example is:

Launched 10 coroutines
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Cancelling job!

As you can see, only the first three coroutines had printed a message and the others were cancelled by a single invocation of job.cancel(). So all we need to do in our hypothetical Android application is to create a parent job object when activity is created, use it for child coroutines, and cancel it when activity is destroyed.

Channels

Deferred values provide a convenient way to transfer a single value between coroutines. Channels provide a way to transfer a stream of values.

Channel basics

A Channel is conceptually very similar to BlockingQueue. One key difference is that instead of a blocking put operation it has a suspending send, and instead of a blocking take operation it has a suspending receive.

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch(CommonPool) {
        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
        for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

You can get full code here

The output of this code is:

1
4
9
16
25
Done!

Closing and iteration over channels

Unlike a queue, a channel can be closed to indicate that no more elements are coming. On the receiver side it is convenient to use a regular for loop to receive elements from the channel.

Conceptually, a close is like sending a special close token to the channel. The iteration stops as soon as this close token is received, so there is a guarantee that all previously sent elements before the close are received:

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch(CommonPool) {
        for (x in 1..5) channel.send(x * x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
}

You can get full code here

Building channel producers

The pattern where a coroutine is producing a sequence of elements is quite common. This is a part of producer-consumer pattern that is often found in concurrent code. You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary to common sense that results must be returned from functions.

There is a convenience coroutine builder named produce that makes it easy to do it right:

fun produceSquares() = produce<Int>(CommonPool) {
    for (x in 1..5) send(x * x)
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val squares = produceSquares()
    for (y in squares) println(y)
    println("Done!")
}

You can get full code here

Pipelines

Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:

fun produceNumbers() = produce<Int>(CommonPool) {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results. In the below example the numbers are just squared:

fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
    for (x in numbers) send(x * x)
}

The main code starts and connects the whole pipeline:

fun main(args: Array<String>) = runBlocking<Unit> {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    for (i in 1..5) println(squares.receive()) // print first five
    println("Done!") // we are done
    squares.cancel() // need to cancel these coroutines in a larger app
    numbers.cancel()
}

You can get full code here

We don't have to cancel these coroutines in this example app, because coroutines are like daemon threads, but in a larger app we'll need to stop our pipeline if we don't need it anymore. Alternatively, we could have run pipeline coroutines as children of a coroutine.

Prime numbers with pipeline

Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline of coroutines. We start with an infinite sequence of numbers. This time we introduce an explicit context parameter, so that caller can control where our coroutines run:

fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

The following pipeline stage filters an incoming stream of numbers, removing all the numbers that are divisible by the given prime number:

fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
    for (x in numbers) if (x % prime != 0) send(x)
}

Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel, and launching new pipeline stage for each prime number found:

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... 

The following example prints the first ten prime numbers, running the whole pipeline in the context of the main thread:

fun main(args: Array<String>) = runBlocking<Unit> {
    var cur = numbersFrom(context, 2)
    for (i in 1..10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(context, cur, prime)
    }
}

You can get full code here

The output of this code is:

2
3
5
7
11
13
17
19
23
29

Note, that you can build the same pipeline using buildIterator coroutine builder from the standard library. Replace produce with buildIterator, send with yield, receive with next, ReceiveChannel with Iterator, and get rid of the context. You will not need runBlocking either. However, the benefit of a pipeline that uses channels as shown above is that it can actually use multiple CPU cores if you run it in CommonPool context.

Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be built using buildSeqeunce/buildIterator, because they do not allow arbitrary suspension, unlike produce which is fully asynchronous.

Fan-out

Multiple coroutines may receive from the same channel, distributing work between themselves. Let us start with a producer coroutine that is periodically producing integers (ten numbers per second):

fun produceNumbers() = produce<Int>(CommonPool) {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

Then we can have several processor coroutines. In this example, they just print their id and received number:

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
    for (x in channel) {
        println("Processor #$id received $x")
    }    
}

Now let us launch five processors and let them work for a second. See what happens:

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(1000)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

You can get full code here

The output will be similar to the the following one, albeit the processor ids that receive each specific integer may be different:

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration over the channel that processor coroutines are doing.

Fan-in

Multiple coroutines may send to the same channel. For example, let us have a channel of strings, and a suspending function that repeatedly sends a specified string to this channel with a specified delay:

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

Now, let us see what happens if we launch a couple of coroutines sending strings (in this example we launch them in the context of the main thread):

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<String>()
    launch(context) { sendString(channel, "foo", 200L) }
    launch(context) { sendString(channel, "BAR!", 500L) }
    repeat(6) { // receive first six
        println(channel.receive())
    }
}

You can get full code here

The output is:

foo
foo
BAR!
foo
foo
BAR!

Buffered channels

The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked, if receive is invoked first, it is suspended until send is invoked.

Both Channel() factory function and produce builder take an optional capacity parameter to specify buffer size. Buffer allows senders to send multiple elements before suspending, similar to the BlockingQueue with a specified capacity, which blocks when buffer is full.

Take a look at the behavior of the following code:

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>(4) // create buffered channel
    launch(context) { // launch sender coroutine
        repeat(10) {
            println("Sending $it") // print before sending each element
            channel.send(it) // will suspend when buffer is full
        }
    }
    // don't receive anything... just wait....
    delay(1000)
}

You can get full code here

It prints "sending" five times using a buffered channel with capacity of four:

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.

Channels are fair

Send and receive operations to channels are fair with respect to the order of their invocation from multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke receive gets the element. In the following example two coroutines "ping" and "pong" are receiving the "ball" object from the shared "table" channel.

data class Ball(var hits: Int)

fun main(args: Array<String>) = runBlocking<Unit> {
    val table = Channel<Ball>() // a shared table
    launch(context) { player("ping", table) }
    launch(context) { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    table.receive() // game over, grab the ball
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(200) // wait a bit
        table.send(ball) // send the ball back
    }
}

You can get full code here

The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping" coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets received by the "pong" coroutine, because it was already waiting for it:

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
ping Ball(hits=5)
pong Ball(hits=6)

Shared mutable state and concurrency

Coroutines can be executed concurrently using a multi-threaded dispatcher like CommonPool. It presents all the usual concurrency problems. The main problem being synchronization of access to shared mutable state. Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world, but others are unique.

The problem

Let us launch a thousand coroutines all doing the same action thousand times (for a total of a million executions). We'll also measure their completion time for further comparisons:

suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
    val n = 1000 // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch(context) {
                repeat(k) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
    println("Completed ${n * k} actions in $time ms")    
}

We start with a very simple action that increments a shared mutable variable using multi-threaded CommonPool context.

var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter++
    }
    println("Counter = $counter")
}

You can get full code here

What does it print at the end? It is highly unlikely to ever print "Counter = 1000000", because a thousand coroutines increment the counter concurrently from multiple threads without any synchronization.

Volatiles are of no help

There is common misconception that making a variable volatile solves concurrency problem. Let us try it:

@Volatile // in Kotlin `volatile` is an annotation 
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter++
    }
    println("Counter = $counter")
}

You can get full code here

This code works slower, but we still don't get "Counter = 1000000" at the end, because volatile variables guarantee linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but do not provide atomicity of larger actions (increment in our case).

Thread-safe data structures

The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized, linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding operations that needs to be performed on a shared state. In the case of a simple counter we can use AtomicInteger class which has atomic incrementAndGet operations:

var counter = AtomicInteger()

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter.incrementAndGet()
    }
    println("Counter = ${counter.get()}")
}

You can get full code here

This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other standard data structures and basic operations on them. However, it does not easily scale to complex state or to complex operations that do not have ready-to-use thread-safe implementations.

Thread confinement fine-grained

Thread confinement is an approach to the problem of shared mutable state where all access to the particular shared state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to the single event-dispatch/application thread. It is easy to apply with coroutines by using a
single-threaded context:

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) { // run each coroutine in CommonPool
        run(counterContext) { // but confine each increment to the single-threaded context
            counter++
        }
    }
    println("Counter = $counter")
}

You can get full code here

This code works very slowly, because it does fine-grained thread-confinement. Each individual increment switches from multi-threaded CommonPool context to the single-threaded context using run block.

Thread confinement coarse-grained

In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic are confined to the single thread. The following example does it like that, running each coroutine in the single-threaded context to start with.

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(counterContext) { // run each coroutine in the single-threaded context
        counter++
    }
    println("Counter = $counter")
}

You can get full code here

This now works much faster and produces correct result.

Mutual exclusion

Mutual exclusion solution to the problem is to protect all modifications of the shared state with a critical section that is never executed concurrently. In a blocking world you'd typically use synchronized or ReentrantLock for that. Coroutine's alternative is called Mutex. It has lock and unlock functions to delimit a critical section. The key difference is that Mutex.lock is a suspending function. It does not block a thread.

val mutex = Mutex()
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        mutex.lock()
        try { counter++ }
        finally { mutex.unlock() }
    }
    println("Counter = $counter")
}

You can get full code here

The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations where you absolutely must modify some shared state periodically, but there is no natural thread that this state is confined to.

Actors

An actor is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine, and a channel to communicate with other coroutines. A simple actor can be written as a function, but an actor with a complex state is better suited for a class.

There is an actor coroutine builder that conveniently combines actor's mailbox channel into its scope to receive messages from and combines the send channel into the resulting job object, so that a single reference to the actor can be carried around as its handle.

// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply

// This function launches a new counter actor
fun counterActor() = actor<CounterMsg>(CommonPool) {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.send(counter)
        }
    }
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val counter = counterActor() // create the actor
    massiveRun(CommonPool) {
        counter.send(IncCounter)
    }
    val response = Channel<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.receive()}")
    counter.close() // shutdown the actor
}

You can get full code here

It does not matter (for correctness) what context the actor itself is executed in. An actor is a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine works as a solution to the problem of shared mutable state.

Actor is more efficient than locking under load, because in this case it always has work to do and it does not have to switch to a different context at all.

Note, that an actor coroutine builder is a dual of produce coroutine builder. An actor is associated with the channel that it receives messages from, while a producer is associated with the channel that it sends elements to.

Select expression

Select expression makes it possible to await multiple suspending functions simultaneously and select the first one that becomes available.

Selecting from channels

Let us have two producers of strings: fizz and buzz. The fizz produces "Fizz" string every 300 ms:

fun fizz(context: CoroutineContext) = produce<String>(context) {
    while (true) { // sends "Fizz" every 300 ms
        delay(300)
        send("Fizz")
    }
}

And the buzz produces "Buzz!" string every 500 ms:

fun buzz(context: CoroutineContext) = produce<String>(context) {
    while (true) { // sends "Buzz!" every 500 ms
        delay(500)
        send("Buzz!")
    }
}

Using receive suspending function we can receive either from one channel or the other. But select expression allows us to receive from both simultaneously using its onReceive clauses:

suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
    select<Unit> { // <Unit> means that this select expression does not produce any result 
        fizz.onReceive { value ->  // this is the first select clause
            println("fizz -> '$value'")
        }
        buzz.onReceive { value ->  // this is the second select clause
            println("buzz -> '$value'")
        }
    }
}

Let us run it all seven times:

fun main(args: Array<String>) = runBlocking<Unit> {
    val fizz = fizz(context)
    val buzz = buzz(context)
    repeat(7) {
        selectFizzBuzz(fizz, buzz)
    }
}

You can get full code here

The result of this code is:

fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'

Selecting on close

The onReceive clause in select fails when the channel is closed and the corresponding select throws an exception. We can use onReceiveOrNull clause to perform a specific action when the channel is closed. The following example also shows that select is an expression that returns the result of its selected clause:

suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
    select<String> {
        a.onReceiveOrNull { value -> 
            if (value == null) 
                "Channel 'a' is closed" 
            else 
                "a -> '$value'"
        }
        b.onReceiveOrNull { value -> 
            if (value == null) 
                "Channel 'b' is closed"
            else    
                "b -> '$value'"
        }
    }

Let's use it with channel a that produces "Hello" string four times and channel b that produces "World" four times:

fun main(args: Array<String>) = runBlocking<Unit> {
    // we are using the context of the main thread in this example for predictability ... 
    val a = produce<String>(context) { 
        repeat(4) { send("Hello $it") }
    }
    val b = produce<String>(context) { 
        repeat(4) { send("World $it") }
    }
    repeat(8) { // print first eight results
        println(selectAorB(a, b))
    }
}

You can get full code here

The result of this code is quite interesting, so we'll analyze it in mode detail:

a -> 'Hello 0'
a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed

There are couple of observations to make out of it.

First of all, select is biased to the first clause. When several clauses are selectable at the same time, the first one among them gets selected. Here, both channels are constantly producing strings, so a channel, being the first clause in select, wins. However, because we are using unbuffered channel, the a gets suspended from time to time on its send invocation and gives a chance for b to send, too.

The second observation, is that onReceiveOrNull gets immediately selected when the channel is already closed.

Selecting to send

Select expression has onSend clause that can be used for a great good in combination with a biased nature of selection.

Let us write an example of producer of integers that sends its values to a side channel when the consumers on its primary channel cannot keep up with it:

fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
    for (num in 1..10) { // produce 10 numbers from 1 to 10
        delay(100) // every 100 ms
        select<Unit> {
            onSend(num) {} // Send to the primary channel
            side.onSend(num) {} // or to the side channel     
        }
    }
}

Consumer is going to be quite slow, taking 250 ms to process each number:

fun main(args: Array<String>) = runBlocking<Unit> {
    val side = Channel<Int>() // allocate side channel
    launch(context) { // this is a very fast consumer for the side channel
        for (num in side) println("Side channel has $num")
    }
    for (num in produceNumbers(side)) {
        println("Consuming $num")
        delay(250) // let us digest the consumed number properly, do not hurry
    }
    println("Done consuming")
}

You can get full code here

So let us see what happens:

Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming

Selecting deferred values

Deferred values can be selected using onAwait clause. Let us start with an async function that returns a deferred string value after a random delay:

fun asyncString(time: Int) = async(CommonPool) {
    delay(time.toLong())
    "Waited for $time ms"
}

Let us start a dozen of them with a random delay.

fun asyncStringsList(): List<Deferred<String>> {
    val random = Random(3)
    return List(12) { asyncString(random.nextInt(1000)) }
}

Now the main function awaits for the first of them to complete and counts the number of deferred values that are still active. Note, that we've used here the fact that select expression is a Kotlin DSL, so we can provide clauses for it using an arbitrary code. In this case we iterate over a list of deferred values to provide onAwait clause for each deferred value.

fun main(args: Array<String>) = runBlocking<Unit> {
    val list = asyncStringsList()
    val result = select<String> {
        list.withIndex().forEach { (index, deferred) ->
            deferred.onAwait { answer ->
                "Deferred $index produced answer '$answer'"
            }
        }
    }
    println(result)
    val countActive = list.count { it.isActive }
    println("$countActive coroutines are still active")
}

You can get full code here

The output is:

Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active

Switch over a channel of deferred values

Let us write a channel producer function that consumes a channel of deferred string values, waits for each received deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together onReceiveOrNull and onAwait clauses in the same select:

fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String>(CommonPool) {
    var current = input.receive() // start with first received deferred value
    while (isActive) { // loop while not cancelled/closed
        val next = select<Deferred<String>?> { // return next deferred value from this select or null
            input.onReceiveOrNull { update ->
                update // replaces next value to wait
            }
            current.onAwait { value ->  
                send(value) // send value that current deferred has produced
                input.receiveOrNull() // and use the next deferred from the input channel
            }
        }
        if (next == null) {
            println("Channel was closed")
            break // out of loop
        } else {
            current = next
        }
    }
}

To test it, we'll use a simple async function that resolves to a specified string after a specified time:

fun asyncString(str: String, time: Long) = async(CommonPool) {
    delay(time)
    str
}

The main function just launches a coroutine to print results of switchMapDeferreds and sends some test data to it:

fun main(args: Array<String>) = runBlocking<Unit> {
    val chan = Channel<Deferred<String>>() // the channel for test
    launch(context) { // launch printing coroutine
        for (s in switchMapDeferreds(chan)) 
            println(s) // print each received string
    }
    chan.send(asyncString("BEGIN", 100))
    delay(200) // enough time for "BEGIN" to be produced
    chan.send(asyncString("Slow", 500))
    delay(100) // not enough time to produce slow
    chan.send(asyncString("Replace", 100))
    delay(500) // give it time before the last one
    chan.send(asyncString("END", 500))
    delay(1000) // give it time to process
    chan.close() // close the channel ... 
    delay(500) // and wait some time to let it finish
}

You can get full code here

The result of this code:

BEGIN
Replace
END
Channel was closed

Further reading