blob: 308fd84efb427114c31e196a64036b134136768a [file] [log] [blame] [view]
Roman Elizarov43e90112017-05-10 11:25:20 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt
Roman Elizarova5e653f2017-02-13 13:49:55 +03002/*
3 * Copyright 2016-2017 JetBrains s.r.o.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
Roman Elizarovf16fd272017-02-07 11:26:00 +030017
Roman Elizarova5e653f2017-02-13 13:49:55 +030018// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
19package guide.$$1.example$$2
Roman Elizarovf16fd272017-02-07 11:26:00 +030020
Roman Elizarova5e653f2017-02-13 13:49:55 +030021import kotlinx.coroutines.experimental.*
Roman Elizarovf16fd272017-02-07 11:26:00 +030022-->
Roman Elizarove8d79342017-08-29 15:21:21 +030023<!--- KNIT core/kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
24<!--- TEST_OUT core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
Roman Elizarov731f0ad2017-02-22 20:48:45 +030025// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
26package guide.test
27
28import org.junit.Test
29
30class GuideTest {
31-->
Roman Elizarovf16fd272017-02-07 11:26:00 +030032
Roman Elizarov7deefb82017-01-31 10:33:17 +030033# Guide to kotlinx.coroutines by example
34
35This is a short guide on core features of `kotlinx.coroutines` with a series of examples.
36
Roman Elizarov2a638922017-03-04 10:22:43 +030037## Introduction and setup
38
39Kotlin, as a language, provides only minimal low-level APIs in its standard library to enable various other
40libraries to utilize coroutines. Unlike many other languages with similar capabilities, `async` and `await`
41are not keywords in Kotlin and are not even part of its standard library.
42
Robert Hencke497d3432017-04-11 00:14:29 -040043`kotlinx.coroutines` is one such rich library. It contains a number of high-level
Roman Elizarov2a638922017-03-04 10:22:43 +030044coroutine-enabled primitives that this guide covers, including `async` and `await`.
45You need to add a dependency on `kotlinx-coroutines-core` module as explained
46[here](README.md#using-in-your-projects) to use primitives from this guide in your projects.
47
Roman Elizarov1293ccd2017-02-01 18:49:54 +030048## Table of contents
49
Roman Elizarovfa7723e2017-02-06 11:17:51 +030050<!--- TOC -->
51
Roman Elizarov1293ccd2017-02-01 18:49:54 +030052* [Coroutine basics](#coroutine-basics)
53 * [Your first coroutine](#your-first-coroutine)
54 * [Bridging blocking and non-blocking worlds](#bridging-blocking-and-non-blocking-worlds)
55 * [Waiting for a job](#waiting-for-a-job)
56 * [Extract function refactoring](#extract-function-refactoring)
57 * [Coroutines ARE light-weight](#coroutines-are-light-weight)
58 * [Coroutines are like daemon threads](#coroutines-are-like-daemon-threads)
59* [Cancellation and timeouts](#cancellation-and-timeouts)
60 * [Cancelling coroutine execution](#cancelling-coroutine-execution)
61 * [Cancellation is cooperative](#cancellation-is-cooperative)
62 * [Making computation code cancellable](#making-computation-code-cancellable)
63 * [Closing resources with finally](#closing-resources-with-finally)
64 * [Run non-cancellable block](#run-non-cancellable-block)
65 * [Timeout](#timeout)
66* [Composing suspending functions](#composing-suspending-functions)
67 * [Sequential by default](#sequential-by-default)
Roman Elizarov32d95322017-02-09 15:57:31 +030068 * [Concurrent using async](#concurrent-using-async)
69 * [Lazily started async](#lazily-started-async)
70 * [Async-style functions](#async-style-functions)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030071* [Coroutine context and dispatchers](#coroutine-context-and-dispatchers)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030072 * [Dispatchers and threads](#dispatchers-and-threads)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030073 * [Unconfined vs confined dispatcher](#unconfined-vs-confined-dispatcher)
74 * [Debugging coroutines and threads](#debugging-coroutines-and-threads)
75 * [Jumping between threads](#jumping-between-threads)
76 * [Job in the context](#job-in-the-context)
77 * [Children of a coroutine](#children-of-a-coroutine)
78 * [Combining contexts](#combining-contexts)
Roman Elizarov8b38fa22017-09-27 17:44:31 +030079 * [Parental responsibilities](#parental-responsibilities)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030080 * [Naming coroutines for debugging](#naming-coroutines-for-debugging)
Roman Elizarov2fd7cb32017-02-11 23:18:59 +030081 * [Cancellation via explicit job](#cancellation-via-explicit-job)
Roman Elizarovb7721cf2017-02-03 19:23:08 +030082* [Channels](#channels)
83 * [Channel basics](#channel-basics)
84 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
85 * [Building channel producers](#building-channel-producers)
86 * [Pipelines](#pipelines)
87 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
88 * [Fan-out](#fan-out)
89 * [Fan-in](#fan-in)
90 * [Buffered channels](#buffered-channels)
Roman Elizarovb0517ba2017-02-27 14:03:14 +030091 * [Channels are fair](#channels-are-fair)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030092* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
93 * [The problem](#the-problem)
Roman Elizarov1e459602017-02-27 11:05:17 +030094 * [Volatiles are of no help](#volatiles-are-of-no-help)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030095 * [Thread-safe data structures](#thread-safe-data-structures)
Roman Elizarov1e459602017-02-27 11:05:17 +030096 * [Thread confinement fine-grained](#thread-confinement-fine-grained)
97 * [Thread confinement coarse-grained](#thread-confinement-coarse-grained)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030098 * [Mutual exclusion](#mutual-exclusion)
99 * [Actors](#actors)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +0300100* [Select expression](#select-expression)
101 * [Selecting from channels](#selecting-from-channels)
102 * [Selecting on close](#selecting-on-close)
103 * [Selecting to send](#selecting-to-send)
104 * [Selecting deferred values](#selecting-deferred-values)
105 * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values)
Roman Elizarov8db17332017-03-09 12:40:45 +0300106* [Further reading](#further-reading)
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300107
Roman Elizarova5e653f2017-02-13 13:49:55 +0300108<!--- END_TOC -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300109
110## Coroutine basics
111
112This section covers basic coroutine concepts.
113
114### Your first coroutine
Roman Elizarov7deefb82017-01-31 10:33:17 +0300115
116Run the following code:
117
118```kotlin
119fun main(args: Array<String>) {
120 launch(CommonPool) { // create new coroutine in common thread pool
121 delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
122 println("World!") // print after delay
123 }
124 println("Hello,") // main function continues while coroutine is delayed
125 Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
126}
127```
128
Roman Elizarove8d79342017-08-29 15:21:21 +0300129> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300130
131Run this code:
132
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300133```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300134Hello,
135World!
136```
137
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300138<!--- TEST -->
139
Roman Elizarov419a6c82017-02-09 18:36:22 +0300140Essentially, coroutines are light-weight threads.
141They are launched with [launch] _coroutine builder_.
142You can achieve the same result replacing
Roman Elizarov7deefb82017-01-31 10:33:17 +0300143`launch(CommonPool) { ... }` with `thread { ... }` and `delay(...)` with `Thread.sleep(...)`. Try it.
144
145If you start by replacing `launch(CommonPool)` by `thread`, the compiler produces the following error:
146
147```
148Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
149```
150
Roman Elizarov419a6c82017-02-09 18:36:22 +0300151That is because [delay] is a special _suspending function_ that does not block a thread, but _suspends_
Roman Elizarov7deefb82017-01-31 10:33:17 +0300152coroutine and it can be only used from a coroutine.
153
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300154### Bridging blocking and non-blocking worlds
Roman Elizarov7deefb82017-01-31 10:33:17 +0300155
156The first example mixes _non-blocking_ `delay(...)` and _blocking_ `Thread.sleep(...)` in the same
157code of `main` function. It is easy to get lost. Let's cleanly separate blocking and non-blocking
Roman Elizarov419a6c82017-02-09 18:36:22 +0300158worlds by using [runBlocking]:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300159
160```kotlin
161fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
162 launch(CommonPool) { // create new coroutine in common thread pool
163 delay(1000L)
164 println("World!")
165 }
166 println("Hello,") // main coroutine continues while child is delayed
167 delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
168}
169```
170
Roman Elizarove8d79342017-08-29 15:21:21 +0300171> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300172
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300173<!--- TEST
174Hello,
175World!
176-->
177
Roman Elizarov419a6c82017-02-09 18:36:22 +0300178The result is the same, but this code uses only non-blocking [delay].
Roman Elizarov7deefb82017-01-31 10:33:17 +0300179
180`runBlocking { ... }` works as an adaptor that is used here to start the top-level main coroutine.
181The regular code outside of `runBlocking` _blocks_, until the coroutine inside `runBlocking` is active.
182
183This is also a way to write unit-tests for suspending functions:
184
185```kotlin
186class MyTest {
187 @Test
188 fun testMySuspendingFunction() = runBlocking<Unit> {
189 // here we can use suspending functions using any assertion style that we like
190 }
191}
192```
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300193
194<!--- CLEAR -->
Roman Elizarov7deefb82017-01-31 10:33:17 +0300195
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300196### Waiting for a job
Roman Elizarov7deefb82017-01-31 10:33:17 +0300197
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300198Delaying for a time while another coroutine is working is not a good approach. Let's explicitly
Roman Elizarov419a6c82017-02-09 18:36:22 +0300199wait (in a non-blocking way) until the background [Job] that we have launched is complete:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300200
201```kotlin
202fun main(args: Array<String>) = runBlocking<Unit> {
203 val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
204 delay(1000L)
205 println("World!")
206 }
207 println("Hello,")
208 job.join() // wait until child coroutine completes
209}
210```
211
Roman Elizarove8d79342017-08-29 15:21:21 +0300212> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300213
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300214<!--- TEST
215Hello,
216World!
217-->
218
Roman Elizarov7deefb82017-01-31 10:33:17 +0300219Now the result is still the same, but the code of the main coroutine is not tied to the duration of
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300220the background job in any way. Much better.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300221
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300222### Extract function refactoring
Roman Elizarov7deefb82017-01-31 10:33:17 +0300223
Kafji48fd6282017-05-28 08:25:10 +0700224Let's extract the block of code inside `launch(CommonPool) { ... }` into a separate function. When you
Roman Elizarov7deefb82017-01-31 10:33:17 +0300225perform "Extract function" refactoring on this code you get a new function with `suspend` modifier.
226That is your first _suspending function_. Suspending functions can be used inside coroutines
227just like regular functions, but their additional feature is that they can, in turn,
228use other suspending functions, like `delay` in this example, to _suspend_ execution of a coroutine.
229
230```kotlin
231fun main(args: Array<String>) = runBlocking<Unit> {
232 val job = launch(CommonPool) { doWorld() }
233 println("Hello,")
234 job.join()
235}
236
237// this is your first suspending function
238suspend fun doWorld() {
239 delay(1000L)
240 println("World!")
241}
242```
243
Roman Elizarove8d79342017-08-29 15:21:21 +0300244> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-04.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300245
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300246<!--- TEST
247Hello,
248World!
249-->
250
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300251### Coroutines ARE light-weight
Roman Elizarov7deefb82017-01-31 10:33:17 +0300252
253Run the following code:
254
255```kotlin
256fun main(args: Array<String>) = runBlocking<Unit> {
257 val jobs = List(100_000) { // create a lot of coroutines and list their jobs
258 launch(CommonPool) {
259 delay(1000L)
260 print(".")
261 }
262 }
263 jobs.forEach { it.join() } // wait for all jobs to complete
264}
265```
266
Roman Elizarove8d79342017-08-29 15:21:21 +0300267> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-05.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300268
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300269<!--- TEST lines.size == 1 && lines[0] == ".".repeat(100_000) -->
270
Roman Elizarov7deefb82017-01-31 10:33:17 +0300271It starts 100K coroutines and, after a second, each coroutine prints a dot.
272Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
273
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300274### Coroutines are like daemon threads
Roman Elizarov7deefb82017-01-31 10:33:17 +0300275
276The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300277returns from the main function after some delay:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300278
279```kotlin
280fun main(args: Array<String>) = runBlocking<Unit> {
281 launch(CommonPool) {
282 repeat(1000) { i ->
283 println("I'm sleeping $i ...")
284 delay(500L)
285 }
286 }
287 delay(1300L) // just quit after delay
288}
289```
290
Roman Elizarove8d79342017-08-29 15:21:21 +0300291> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-06.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300292
293You can run and see that it prints three lines and terminates:
294
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300295```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300296I'm sleeping 0 ...
297I'm sleeping 1 ...
298I'm sleeping 2 ...
299```
300
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300301<!--- TEST -->
302
Roman Elizarov7deefb82017-01-31 10:33:17 +0300303Active coroutines do not keep the process alive. They are like daemon threads.
304
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300305## Cancellation and timeouts
306
307This section covers coroutine cancellation and timeouts.
308
309### Cancelling coroutine execution
Roman Elizarov7deefb82017-01-31 10:33:17 +0300310
311In small application the return from "main" method might sound like a good idea to get all coroutines
312implicitly terminated. In a larger, long-running application, you need finer-grained control.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300313The [launch] function returns a [Job] that can be used to cancel running coroutine:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300314
315```kotlin
316fun main(args: Array<String>) = runBlocking<Unit> {
317 val job = launch(CommonPool) {
318 repeat(1000) { i ->
319 println("I'm sleeping $i ...")
320 delay(500L)
321 }
322 }
323 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300324 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300325 job.cancel() // cancels the job
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300326 job.join() // waits for job's completion
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300327 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300328}
329```
330
Roman Elizarove8d79342017-08-29 15:21:21 +0300331> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300332
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300333It produces the following output:
334
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300335```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300336I'm sleeping 0 ...
337I'm sleeping 1 ...
338I'm sleeping 2 ...
339main: I'm tired of waiting!
340main: Now I can quit.
341```
342
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300343<!--- TEST -->
344
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300345As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300346There is also an extension function [cancelAndJoin] that combines [cancel] and [join] invocations.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300347
348### Cancellation is cooperative
Roman Elizarov7deefb82017-01-31 10:33:17 +0300349
Tair Rzayevaf734622017-02-01 22:30:16 +0200350Coroutine cancellation is _cooperative_. A coroutine code has to cooperate to be cancellable.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300351All the suspending functions in `kotlinx.coroutines` are _cancellable_. They check for cancellation of
Roman Elizarov419a6c82017-02-09 18:36:22 +0300352coroutine and throw [CancellationException] when cancelled. However, if a coroutine is working in
Roman Elizarov7deefb82017-01-31 10:33:17 +0300353a computation and does not check for cancellation, then it cannot be cancelled, like the following
354example shows:
355
356```kotlin
357fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov24cd6542017-08-03 21:20:04 -0700358 val startTime = System.currentTimeMillis()
Roman Elizarov7deefb82017-01-31 10:33:17 +0300359 val job = launch(CommonPool) {
Roman Elizarov24cd6542017-08-03 21:20:04 -0700360 var nextPrintTime = startTime
Roman Elizarov7deefb82017-01-31 10:33:17 +0300361 var i = 0
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300362 while (i < 5) { // computation loop, just wastes CPU
Roman Elizarov24cd6542017-08-03 21:20:04 -0700363 // print a message twice a second
364 if (System.currentTimeMillis() >= nextPrintTime) {
Roman Elizarov7deefb82017-01-31 10:33:17 +0300365 println("I'm sleeping ${i++} ...")
Roman Elizarov35d2c342017-07-20 14:54:39 +0300366 nextPrintTime += 500L
Roman Elizarov7deefb82017-01-31 10:33:17 +0300367 }
368 }
369 }
370 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300371 println("main: I'm tired of waiting!")
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300372 job.cancelAndJoin() // cancels the job and waits for its completion
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300373 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300374}
375```
376
Roman Elizarove8d79342017-08-29 15:21:21 +0300377> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300378
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300379Run it to see that it continues to print "I'm sleeping" even after cancellation
380until the job completes by itself after five iterations.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300381
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300382<!--- TEST
383I'm sleeping 0 ...
384I'm sleeping 1 ...
385I'm sleeping 2 ...
386main: I'm tired of waiting!
387I'm sleeping 3 ...
388I'm sleeping 4 ...
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300389main: Now I can quit.
390-->
391
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300392### Making computation code cancellable
Roman Elizarov7deefb82017-01-31 10:33:17 +0300393
394There are two approaches to making computation code cancellable. The first one is to periodically
Roman Elizarov419a6c82017-02-09 18:36:22 +0300395invoke a suspending function. There is a [yield] function that is a good choice for that purpose.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300396The other one is to explicitly check the cancellation status. Let us try the later approach.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300397
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300398Replace `while (i < 5)` in the previous example with `while (isActive)` and rerun it.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300399
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300400```kotlin
401fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov24cd6542017-08-03 21:20:04 -0700402 val startTime = System.currentTimeMillis()
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300403 val job = launch(CommonPool) {
Roman Elizarov24cd6542017-08-03 21:20:04 -0700404 var nextPrintTime = startTime
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300405 var i = 0
406 while (isActive) { // cancellable computation loop
Roman Elizarov24cd6542017-08-03 21:20:04 -0700407 // print a message twice a second
408 if (System.currentTimeMillis() >= nextPrintTime) {
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300409 println("I'm sleeping ${i++} ...")
Roman Elizarov24cd6542017-08-03 21:20:04 -0700410 nextPrintTime += 500L
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300411 }
412 }
413 }
414 delay(1300L) // delay a bit
415 println("main: I'm tired of waiting!")
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300416 job.cancelAndJoin() // cancels the job and waits for its completion
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300417 println("main: Now I can quit.")
418}
419```
420
Roman Elizarove8d79342017-08-29 15:21:21 +0300421> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300422
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300423As you can see, now this loop is cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
Roman Elizarov419a6c82017-02-09 18:36:22 +0300424the code of coroutines via [CoroutineScope] object.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300425
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300426<!--- TEST
427I'm sleeping 0 ...
428I'm sleeping 1 ...
429I'm sleeping 2 ...
430main: I'm tired of waiting!
431main: Now I can quit.
432-->
433
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300434### Closing resources with finally
435
Roman Elizarov419a6c82017-02-09 18:36:22 +0300436Cancellable suspending functions throw [CancellationException] on cancellation which can be handled in
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300437all the usual way. For example, the `try {...} finally {...}` and Kotlin `use` function execute their
438finalization actions normally when coroutine is cancelled:
439
440```kotlin
441fun main(args: Array<String>) = runBlocking<Unit> {
442 val job = launch(CommonPool) {
443 try {
444 repeat(1000) { i ->
445 println("I'm sleeping $i ...")
446 delay(500L)
447 }
448 } finally {
449 println("I'm running finally")
450 }
451 }
452 delay(1300L) // delay a bit
453 println("main: I'm tired of waiting!")
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300454 job.cancelAndJoin() // cancels the job and waits for its completion
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300455 println("main: Now I can quit.")
456}
457```
458
Roman Elizarove8d79342017-08-29 15:21:21 +0300459> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300460
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300461Both [join] and [cancelAndJoin] wait for all the finalization actions to complete, so the example above
462produces the following output:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300463
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300464```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300465I'm sleeping 0 ...
466I'm sleeping 1 ...
467I'm sleeping 2 ...
468main: I'm tired of waiting!
469I'm running finally
470main: Now I can quit.
471```
472
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300473<!--- TEST -->
474
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300475### Run non-cancellable block
476
477Any attempt to use a suspending function in the `finally` block of the previous example will cause
Roman Elizarov419a6c82017-02-09 18:36:22 +0300478[CancellationException], because the coroutine running this code is cancelled. Usually, this is not a
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300479problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
480communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
481rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
Roman Elizarov419a6c82017-02-09 18:36:22 +0300482`run(NonCancellable) {...}` using [run] function and [NonCancellable] context as the following example shows:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300483
484```kotlin
485fun main(args: Array<String>) = runBlocking<Unit> {
486 val job = launch(CommonPool) {
487 try {
488 repeat(1000) { i ->
489 println("I'm sleeping $i ...")
490 delay(500L)
491 }
492 } finally {
493 run(NonCancellable) {
494 println("I'm running finally")
495 delay(1000L)
496 println("And I've just delayed for 1 sec because I'm non-cancellable")
497 }
498 }
499 }
500 delay(1300L) // delay a bit
501 println("main: I'm tired of waiting!")
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300502 job.cancelAndJoin() // cancels the job and waits for its completion
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300503 println("main: Now I can quit.")
504}
505```
506
Roman Elizarove8d79342017-08-29 15:21:21 +0300507> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300508
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300509<!--- TEST
510I'm sleeping 0 ...
511I'm sleeping 1 ...
512I'm sleeping 2 ...
513main: I'm tired of waiting!
514I'm running finally
515And I've just delayed for 1 sec because I'm non-cancellable
516main: Now I can quit.
517-->
518
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300519### Timeout
520
521The most obvious reason to cancel coroutine execution in practice,
522is because its execution time has exceeded some timeout.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300523While you can manually track the reference to the corresponding [Job] and launch a separate coroutine to cancel
524the tracked one after delay, there is a ready to use [withTimeout] function that does it.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300525Look at the following example:
526
527```kotlin
528fun main(args: Array<String>) = runBlocking<Unit> {
529 withTimeout(1300L) {
530 repeat(1000) { i ->
531 println("I'm sleeping $i ...")
532 delay(500L)
533 }
534 }
535}
536```
537
Roman Elizarove8d79342017-08-29 15:21:21 +0300538> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-06.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300539
540It produces the following output:
541
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300542```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300543I'm sleeping 0 ...
544I'm sleeping 1 ...
545I'm sleeping 2 ...
Roman Elizarov63f6ea22017-09-06 18:42:34 +0300546Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300547```
548
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300549<!--- TEST STARTS_WITH -->
550
Roman Elizarov63f6ea22017-09-06 18:42:34 +0300551The `TimeoutCancellationException` that is thrown by [withTimeout] is a subclass of [CancellationException].
Roman Elizarovca9d5be2017-04-20 19:23:18 +0300552We have not seen its stack trace printed on the console before. That is because
Roman Elizarov7c864d82017-02-27 10:17:50 +0300553inside a cancelled coroutine `CancellationException` is considered to be a normal reason for coroutine completion.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300554However, in this example we have used `withTimeout` right inside the `main` function.
555
556Because cancellation is just an exception, all the resources will be closed in a usual way.
Roman Elizarov63f6ea22017-09-06 18:42:34 +0300557You can wrap the code with timeout in `try {...} catch (e: TimeoutCancellationException) {...}` block if
558you need to do some additional action specifically on any kind of timeout or use [withTimeoutOrNull] function
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300559that is similar to [withTimeout], but returns `null` on timeout instead of throwing an exception:
560
561```kotlin
562fun main(args: Array<String>) = runBlocking<Unit> {
563 val result = withTimeoutOrNull(1300L) {
564 repeat(1000) { i ->
565 println("I'm sleeping $i ...")
566 delay(500L)
567 }
568 "Done" // will get cancelled before it produces this result
569 }
570 println("Result is $result")
571}
572```
573
574> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-07.kt)
575
576There is no longer an exception when running this code:
577
578```text
579I'm sleeping 0 ...
580I'm sleeping 1 ...
581I'm sleeping 2 ...
582Result is null
583```
584
585<!--- TEST -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300586
587## Composing suspending functions
588
589This section covers various approaches to composition of suspending functions.
590
591### Sequential by default
592
593Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300594remote service call or computation. We just pretend they are useful, but actually each one just
595delays for a second for the purpose of this example:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300596
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300597<!--- INCLUDE .*/example-compose-([0-9]+).kt
598import kotlin.system.measureTimeMillis
599-->
600
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300601```kotlin
602suspend fun doSomethingUsefulOne(): Int {
603 delay(1000L) // pretend we are doing something useful here
604 return 13
605}
606
607suspend fun doSomethingUsefulTwo(): Int {
608 delay(1000L) // pretend we are doing something useful here, too
609 return 29
610}
611```
612
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300613<!--- INCLUDE .*/example-compose-([0-9]+).kt -->
614
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300615What do we do if need to invoke them _sequentially_ -- first `doSomethingUsefulOne` _and then_
616`doSomethingUsefulTwo` and compute the sum of their results?
617In practise we do this if we use the results of the first function to make a decision on whether we need
618to invoke the second one or to decide on how to invoke it.
619
620We just use a normal sequential invocation, because the code in the coroutine, just like in the regular
Roman Elizarov32d95322017-02-09 15:57:31 +0300621code, is _sequential_ by default. The following example demonstrates it by measuring the total
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300622time it takes to execute both suspending functions:
623
624```kotlin
625fun main(args: Array<String>) = runBlocking<Unit> {
626 val time = measureTimeMillis {
627 val one = doSomethingUsefulOne()
628 val two = doSomethingUsefulTwo()
629 println("The answer is ${one + two}")
630 }
631 println("Completed in $time ms")
632}
633```
634
Roman Elizarove8d79342017-08-29 15:21:21 +0300635> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-01.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300636
637It produces something like this:
638
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300639```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300640The answer is 42
641Completed in 2017 ms
642```
643
Roman Elizarov35d2c342017-07-20 14:54:39 +0300644<!--- TEST ARBITRARY_TIME -->
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300645
Roman Elizarov32d95322017-02-09 15:57:31 +0300646### Concurrent using async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300647
648What if there are no dependencies between invocation of `doSomethingUsefulOne` and `doSomethingUsefulTwo` and
Roman Elizarov419a6c82017-02-09 18:36:22 +0300649we want to get the answer faster, by doing both _concurrently_? This is where [async] comes to help.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300650
Roman Elizarov419a6c82017-02-09 18:36:22 +0300651Conceptually, [async] is just like [launch]. It starts a separate coroutine which is a light-weight thread
652that works concurrently with all the other coroutines. The difference is that `launch` returns a [Job] and
653does not carry any resulting value, while `async` returns a [Deferred] -- a light-weight non-blocking future
Roman Elizarov32d95322017-02-09 15:57:31 +0300654that represents a promise to provide a result later. You can use `.await()` on a deferred value to get its eventual result,
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300655but `Deferred` is also a `Job`, so you can cancel it if needed.
656
657```kotlin
658fun main(args: Array<String>) = runBlocking<Unit> {
659 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300660 val one = async(CommonPool) { doSomethingUsefulOne() }
661 val two = async(CommonPool) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300662 println("The answer is ${one.await() + two.await()}")
663 }
664 println("Completed in $time ms")
665}
666```
667
Roman Elizarove8d79342017-08-29 15:21:21 +0300668> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-02.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300669
670It produces something like this:
671
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300672```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300673The answer is 42
674Completed in 1017 ms
675```
676
Roman Elizarov35d2c342017-07-20 14:54:39 +0300677<!--- TEST ARBITRARY_TIME -->
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300678
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300679This is twice as fast, because we have concurrent execution of two coroutines.
680Note, that concurrency with coroutines is always explicit.
681
Roman Elizarov32d95322017-02-09 15:57:31 +0300682### Lazily started async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300683
Roman Elizarovecda27f2017-04-06 23:06:26 +0300684There is a laziness option to [async] with [CoroutineStart.LAZY] parameter.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300685It starts coroutine only when its result is needed by some
686[await][Deferred.await] or if a [start][Job.start] function
Roman Elizarov32d95322017-02-09 15:57:31 +0300687is invoked. Run the following example that differs from the previous one only by this option:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300688
689```kotlin
690fun main(args: Array<String>) = runBlocking<Unit> {
691 val time = measureTimeMillis {
Roman Elizarovecda27f2017-04-06 23:06:26 +0300692 val one = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulOne() }
693 val two = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300694 println("The answer is ${one.await() + two.await()}")
695 }
696 println("Completed in $time ms")
697}
698```
699
Roman Elizarove8d79342017-08-29 15:21:21 +0300700> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-03.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300701
702It produces something like this:
703
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300704```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300705The answer is 42
706Completed in 2017 ms
707```
708
Roman Elizarov35d2c342017-07-20 14:54:39 +0300709<!--- TEST ARBITRARY_TIME -->
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300710
Roman Elizarov32d95322017-02-09 15:57:31 +0300711So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
712for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
713the standard `lazy` function in cases when computation of the value involves suspending functions.
714
715### Async-style functions
716
717We can define async-style functions that invoke `doSomethingUsefulOne` and `doSomethingUsefulTwo`
Roman Elizarov419a6c82017-02-09 18:36:22 +0300718_asynchronously_ using [async] coroutine builder. It is a good style to name such functions with
Roman Elizarov32d95322017-02-09 15:57:31 +0300719either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous
720computation and one needs to use the resulting deferred value to get the result.
721
722```kotlin
723// The result type of asyncSomethingUsefulOne is Deferred<Int>
724fun asyncSomethingUsefulOne() = async(CommonPool) {
725 doSomethingUsefulOne()
726}
727
728// The result type of asyncSomethingUsefulTwo is Deferred<Int>
729fun asyncSomethingUsefulTwo() = async(CommonPool) {
730 doSomethingUsefulTwo()
731}
732```
733
734Note, that these `asyncXXX` function are **not** _suspending_ functions. They can be used from anywhere.
735However, their use always implies asynchronous (here meaning _concurrent_) execution of their action
736with the invoking code.
737
738The following example shows their use outside of coroutine:
739
740```kotlin
741// note, that we don't have `runBlocking` to the right of `main` in this example
742fun main(args: Array<String>) {
743 val time = measureTimeMillis {
744 // we can initiate async actions outside of a coroutine
745 val one = asyncSomethingUsefulOne()
746 val two = asyncSomethingUsefulTwo()
747 // but waiting for a result must involve either suspending or blocking.
748 // here we use `runBlocking { ... }` to block the main thread while waiting for the result
749 runBlocking {
750 println("The answer is ${one.await() + two.await()}")
751 }
752 }
753 println("Completed in $time ms")
754}
755```
756
Roman Elizarove8d79342017-08-29 15:21:21 +0300757> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300758
Roman Elizarov35d2c342017-07-20 14:54:39 +0300759<!--- TEST ARBITRARY_TIME
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300760The answer is 42
761Completed in 1085 ms
762-->
763
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300764## Coroutine context and dispatchers
765
Roman Elizarov32d95322017-02-09 15:57:31 +0300766We've already seen `launch(CommonPool) {...}`, `async(CommonPool) {...}`, `run(NonCancellable) {...}`, etc.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300767In these code snippets [CommonPool] and [NonCancellable] are _coroutine contexts_.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300768This section covers other available choices.
769
770### Dispatchers and threads
771
Roman Elizarov419a6c82017-02-09 18:36:22 +0300772Coroutine context includes a [_coroutine dispatcher_][CoroutineDispatcher] which determines what thread or threads
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300773the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
774to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:
775
776```kotlin
777fun main(args: Array<String>) = runBlocking<Unit> {
778 val jobs = arrayListOf<Job>()
779 jobs += launch(Unconfined) { // not confined -- will work with main thread
Roman Elizarov43e3af72017-07-21 16:01:31 +0300780 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300781 }
Roman Elizarov43e3af72017-07-21 16:01:31 +0300782 jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
783 println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300784 }
785 jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
Roman Elizarov43e3af72017-07-21 16:01:31 +0300786 println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300787 }
788 jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
Roman Elizarov43e3af72017-07-21 16:01:31 +0300789 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300790 }
791 jobs.forEach { it.join() }
792}
793```
794
Roman Elizarove8d79342017-08-29 15:21:21 +0300795> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-01.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300796
797It produces the following output (maybe in different order):
798
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300799```text
Roman Elizarov43e3af72017-07-21 16:01:31 +0300800 'Unconfined': I'm working in thread main
801 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
802 'newSTC': I'm working in thread MyOwnThread
803'coroutineContext': I'm working in thread main
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300804```
805
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300806<!--- TEST LINES_START_UNORDERED -->
807
Roman Elizarov43e3af72017-07-21 16:01:31 +0300808The difference between parent [coroutineContext][CoroutineScope.coroutineContext] and
809[Unconfined] context will be shown later.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300810
811### Unconfined vs confined dispatcher
812
Roman Elizarov419a6c82017-02-09 18:36:22 +0300813The [Unconfined] coroutine dispatcher starts coroutine in the caller thread, but only until the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300814first suspension point. After suspension it resumes in the thread that is fully determined by the
815suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not
816consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
817
Roman Elizarov43e3af72017-07-21 16:01:31 +0300818On the other side, [coroutineContext][CoroutineScope.coroutineContext] property that is available inside the block of any coroutine
Roman Elizarov419a6c82017-02-09 18:36:22 +0300819via [CoroutineScope] interface, is a reference to a context of this particular coroutine.
820This way, a parent context can be inherited. The default context of [runBlocking], in particular,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300821is confined to be invoker thread, so inheriting it has the effect of confining execution to
822this thread with a predictable FIFO scheduling.
823
824```kotlin
825fun main(args: Array<String>) = runBlocking<Unit> {
826 val jobs = arrayListOf<Job>()
827 jobs += launch(Unconfined) { // not confined -- will work with main thread
Roman Elizarov43e3af72017-07-21 16:01:31 +0300828 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarovd0021622017-03-10 15:43:38 +0300829 delay(500)
Roman Elizarov43e3af72017-07-21 16:01:31 +0300830 println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300831 }
Roman Elizarov43e3af72017-07-21 16:01:31 +0300832 jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
833 println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300834 delay(1000)
Roman Elizarov43e3af72017-07-21 16:01:31 +0300835 println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300836 }
837 jobs.forEach { it.join() }
838}
839```
840
Roman Elizarove8d79342017-08-29 15:21:21 +0300841> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-02.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300842
843Produces the output:
844
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300845```text
Roman Elizarov43e3af72017-07-21 16:01:31 +0300846 'Unconfined': I'm working in thread main
847'coroutineContext': I'm working in thread main
848 'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
849'coroutineContext': After delay in thread main
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300850```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300851
852<!--- TEST LINES_START -->
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300853
Roman Elizarov43e3af72017-07-21 16:01:31 +0300854So, the coroutine that had inherited `coroutineContext` of `runBlocking {...}` continues to execute
855in the `main` thread, while the unconfined one had resumed in the default executor thread that [delay]
856function is using.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300857
858### Debugging coroutines and threads
859
Roman Elizarov419a6c82017-02-09 18:36:22 +0300860Coroutines can suspend on one thread and resume on another thread with [Unconfined] dispatcher or
861with a multi-threaded dispatcher like [CommonPool]. Even with a single-threaded dispatcher it might be hard to
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300862figure out what coroutine was doing what, where, and when. The common approach to debugging applications with
863threads is to print the thread name in the log file on each log statement. This feature is universally supported
864by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
865`kotlinx.coroutines` includes debugging facilities to make it easier.
866
867Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
868
869```kotlin
870fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
871
872fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov43e3af72017-07-21 16:01:31 +0300873 val a = async(coroutineContext) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300874 log("I'm computing a piece of the answer")
875 6
876 }
Roman Elizarov43e3af72017-07-21 16:01:31 +0300877 val b = async(coroutineContext) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300878 log("I'm computing another piece of the answer")
879 7
880 }
881 log("The answer is ${a.await() * b.await()}")
882}
883```
884
Roman Elizarove8d79342017-08-29 15:21:21 +0300885> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300886
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300887There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300888and two coroutines computing deferred values `a` (#2) and `b` (#3).
889They are all executing in the context of `runBlocking` and are confined to the main thread.
890The output of this code is:
891
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300892```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300893[main @coroutine#2] I'm computing a piece of the answer
894[main @coroutine#3] I'm computing another piece of the answer
895[main @coroutine#1] The answer is 42
896```
897
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300898<!--- TEST -->
899
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300900The `log` function prints the name of the thread in square brackets and you can see, that it is the `main`
901thread, but the identifier of the currently executing coroutine is appended to it. This identifier
902is consecutively assigned to all created coroutines when debugging mode is turned on.
903
Roman Elizarov419a6c82017-02-09 18:36:22 +0300904You can read more about debugging facilities in the documentation for [newCoroutineContext] function.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300905
906### Jumping between threads
907
908Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
909
910```kotlin
911fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
912
913fun main(args: Array<String>) {
914 val ctx1 = newSingleThreadContext("Ctx1")
915 val ctx2 = newSingleThreadContext("Ctx2")
916 runBlocking(ctx1) {
917 log("Started in ctx1")
918 run(ctx2) {
919 log("Working in ctx2")
920 }
921 log("Back to ctx1")
922 }
923}
924```
925
Roman Elizarove8d79342017-08-29 15:21:21 +0300926> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300927
Roman Elizarov419a6c82017-02-09 18:36:22 +0300928It demonstrates two new techniques. One is using [runBlocking] with an explicitly specified context, and
929the second one is using [run] function to change a context of a coroutine while still staying in the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300930same coroutine as you can see in the output below:
931
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300932```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300933[Ctx1 @coroutine#1] Started in ctx1
934[Ctx2 @coroutine#1] Working in ctx2
935[Ctx1 @coroutine#1] Back to ctx1
936```
937
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300938<!--- TEST -->
939
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300940### Job in the context
941
Roman Elizarov419a6c82017-02-09 18:36:22 +0300942The coroutine [Job] is part of its context. The coroutine can retrieve it from its own context
Roman Elizarov43e3af72017-07-21 16:01:31 +0300943using `coroutineContext[Job]` expression:
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300944
945```kotlin
946fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov43e3af72017-07-21 16:01:31 +0300947 println("My job is ${coroutineContext[Job]}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300948}
949```
950
Roman Elizarove8d79342017-08-29 15:21:21 +0300951> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-05.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300952
shifujun81a6f232017-06-18 15:37:59 +0800953It produces something like
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300954
955```
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300956My job is "coroutine#1":BlockingCoroutine{Active}@6d311334
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300957```
958
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300959<!--- TEST lines.size == 1 && lines[0].startsWith("My job is \"coroutine#1\":BlockingCoroutine{Active}@") -->
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300960
Roman Elizarov43e3af72017-07-21 16:01:31 +0300961So, [isActive][CoroutineScope.isActive] in [CoroutineScope] is just a convenient shortcut for
962`coroutineContext[Job]!!.isActive`.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300963
964### Children of a coroutine
965
Roman Elizarov43e3af72017-07-21 16:01:31 +0300966When [coroutineContext][CoroutineScope.coroutineContext] of a coroutine is used to launch another coroutine,
Roman Elizarov419a6c82017-02-09 18:36:22 +0300967the [Job] of the new coroutine becomes
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300968a _child_ of the parent coroutine's job. When the parent coroutine is cancelled, all its children
969are recursively cancelled, too.
970
971```kotlin
972fun main(args: Array<String>) = runBlocking<Unit> {
973 // start a coroutine to process some kind of incoming request
974 val request = launch(CommonPool) {
975 // it spawns two other jobs, one with its separate context
976 val job1 = launch(CommonPool) {
977 println("job1: I have my own context and execute independently!")
978 delay(1000)
979 println("job1: I am not affected by cancellation of the request")
980 }
981 // and the other inherits the parent context
Roman Elizarov43e3af72017-07-21 16:01:31 +0300982 val job2 = launch(coroutineContext) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300983 println("job2: I am a child of the request coroutine")
984 delay(1000)
985 println("job2: I will not execute this line if my parent request is cancelled")
986 }
987 // request completes when both its sub-jobs complete:
988 job1.join()
989 job2.join()
990 }
991 delay(500)
992 request.cancel() // cancel processing of the request
993 delay(1000) // delay a second to see what happens
994 println("main: Who has survived request cancellation?")
995}
996```
997
Roman Elizarove8d79342017-08-29 15:21:21 +0300998> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-06.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300999
1000The output of this code is:
1001
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001002```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001003job1: I have my own context and execute independently!
1004job2: I am a child of the request coroutine
1005job1: I am not affected by cancellation of the request
1006main: Who has survived request cancellation?
1007```
1008
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001009<!--- TEST -->
1010
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001011### Combining contexts
1012
1013Coroutine context can be combined using `+` operator. The context on the right-hand side replaces relevant entries
Roman Elizarov419a6c82017-02-09 18:36:22 +03001014of the context on the left-hand side. For example, a [Job] of the parent coroutine can be inherited, while
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001015its dispatcher replaced:
1016
1017```kotlin
1018fun main(args: Array<String>) = runBlocking<Unit> {
1019 // start a coroutine to process some kind of incoming request
Roman Elizarov43e3af72017-07-21 16:01:31 +03001020 val request = launch(coroutineContext) { // use the context of `runBlocking`
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001021 // spawns CPU-intensive child job in CommonPool !!!
Roman Elizarov43e3af72017-07-21 16:01:31 +03001022 val job = launch(coroutineContext + CommonPool) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001023 println("job: I am a child of the request coroutine, but with a different dispatcher")
1024 delay(1000)
1025 println("job: I will not execute this line if my parent request is cancelled")
1026 }
1027 job.join() // request completes when its sub-job completes
1028 }
1029 delay(500)
1030 request.cancel() // cancel processing of the request
1031 delay(1000) // delay a second to see what happens
1032 println("main: Who has survived request cancellation?")
1033}
1034```
1035
Roman Elizarove8d79342017-08-29 15:21:21 +03001036> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-07.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001037
1038The expected outcome of this code is:
1039
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001040```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001041job: I am a child of the request coroutine, but with a different dispatcher
1042main: Who has survived request cancellation?
1043```
1044
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001045<!--- TEST -->
1046
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001047### Parental responsibilities
1048
1049A parent coroutine always waits for completion of all its children. Parent does not have to explicitly track
1050all the children it launches and it does not have to use [join] to wait for them at the end:
1051
1052```kotlin
1053fun main(args: Array<String>) = runBlocking<Unit> {
1054 // start a coroutine to process some kind of incoming request
1055 val request = launch(CommonPool) {
1056 repeat(3) { i -> // launch a few children jobs
1057 launch(coroutineContext) {
1058 delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
1059 println("Coroutine $i is done")
1060 }
1061 }
1062 println("request: I'm done and I don't explicitly join my children that are still active")
1063 }
1064 request.join() // wait for completion of the request, including all its children
1065 println("Now processing of the request is complete")
1066}
1067```
1068
1069> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
1070
1071The result is going to be:
1072
1073```text
1074request: I'm done and I don't explicitly join my children that are still active
1075Coroutine 0 is done
1076Coroutine 1 is done
1077Coroutine 2 is done
1078Now processing of the request is complete
1079```
1080
1081<!--- TEST -->
1082
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001083### Naming coroutines for debugging
1084
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001085Automatically assigned ids are good when coroutines log often and you just need to correlate log records
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001086coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
1087or doing some specific background task, it is better to name it explicitly for debugging purposes.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001088[CoroutineName] serves the same function as a thread name. It'll get displayed in the thread name that
Victor Osolovskiyed5ed492017-06-25 00:03:35 +03001089is executing this coroutine when debugging mode is turned on.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001090
1091The following example demonstrates this concept:
1092
1093```kotlin
1094fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
1095
1096fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
1097 log("Started main coroutine")
1098 // run two background value computations
Roman Elizarov32d95322017-02-09 15:57:31 +03001099 val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001100 log("Computing v1")
1101 delay(500)
1102 252
1103 }
Roman Elizarov32d95322017-02-09 15:57:31 +03001104 val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001105 log("Computing v2")
1106 delay(1000)
1107 6
1108 }
1109 log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
1110}
1111```
1112
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001113> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001114
1115The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
1116
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001117```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001118[main @main#1] Started main coroutine
1119[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
1120[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
1121[main @main#1] The answer for v1 / v2 = 42
1122```
Roman Elizarov1293ccd2017-02-01 18:49:54 +03001123
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001124<!--- TEST FLEXIBLE_THREAD -->
1125
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001126### Cancellation via explicit job
1127
1128Let us put our knowledge about contexts, children and jobs together. Assume that our application has
1129an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application
1130and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch
1131and update data, do animations, etc. All of these coroutines must be cancelled when activity is destroyed
1132to avoid memory leaks.
1133
1134We can manage a lifecycle of our coroutines by creating an instance of [Job] that is tied to
Roman Elizarov256812a2017-07-22 01:00:30 +03001135the lifecycle of our activity. A job instance is created using [`Job()`][Job] factory function
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001136as the following example shows. We need to make sure that all the coroutines are started
1137with this job in their context and then a single invocation of [Job.cancel] terminates them all.
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001138Moreover, [Job.join] waits for all of them to complete, so we can also use [cancelAndJoin] here in
1139this example:
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001140
1141```kotlin
1142fun main(args: Array<String>) = runBlocking<Unit> {
1143 val job = Job() // create a job object to manage our lifecycle
1144 // now launch ten coroutines for a demo, each working for a different time
1145 val coroutines = List(10) { i ->
1146 // they are all children of our job object
Roman Elizarov43e3af72017-07-21 16:01:31 +03001147 launch(coroutineContext + job) { // we use the context of main runBlocking thread, but with our own job object
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001148 delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001149 println("Coroutine $i is done")
1150 }
1151 }
1152 println("Launched ${coroutines.size} coroutines")
1153 delay(500L) // delay for half a second
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001154 println("Cancelling the job!")
1155 job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001156}
1157```
1158
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001159> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-10.kt)
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001160
1161The output of this example is:
1162
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001163```text
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001164Launched 10 coroutines
1165Coroutine 0 is done
1166Coroutine 1 is done
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001167Cancelling the job!
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001168```
1169
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001170<!--- TEST -->
1171
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001172As you can see, only the first three coroutines had printed a message and the others were cancelled
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001173by a single invocation of `job.cancelAndJoin()`. So all we need to do in our hypothetical Android
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001174application is to create a parent job object when activity is created, use it for child coroutines,
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001175and cancel it when activity is destroyed. We cannot `join` them in the case of Android lifecycle,
1176since it is synchronous, but this joining ability is useful when building backend services to ensure bounded
1177resource usage.
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001178
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001179## Channels
Roman Elizarov7deefb82017-01-31 10:33:17 +03001180
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001181Deferred values provide a convenient way to transfer a single value between coroutines.
1182Channels provide a way to transfer a stream of values.
1183
1184<!--- INCLUDE .*/example-channel-([0-9]+).kt
1185import kotlinx.coroutines.experimental.channels.*
1186-->
1187
1188### Channel basics
1189
Roman Elizarov419a6c82017-02-09 18:36:22 +03001190A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
1191instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
1192a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001193
1194```kotlin
1195fun main(args: Array<String>) = runBlocking<Unit> {
1196 val channel = Channel<Int>()
1197 launch(CommonPool) {
1198 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
1199 for (x in 1..5) channel.send(x * x)
1200 }
1201 // here we print five received integers:
1202 repeat(5) { println(channel.receive()) }
1203 println("Done!")
1204}
1205```
1206
Roman Elizarove8d79342017-08-29 15:21:21 +03001207> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001208
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001209The output of this code is:
1210
1211```text
12121
12134
12149
121516
121625
1217Done!
1218```
1219
1220<!--- TEST -->
1221
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001222### Closing and iteration over channels
1223
1224Unlike a queue, a channel can be closed to indicate that no more elements are coming.
1225On the receiver side it is convenient to use a regular `for` loop to receive elements
1226from the channel.
1227
Roman Elizarov419a6c82017-02-09 18:36:22 +03001228Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001229The iteration stops as soon as this close token is received, so there is a guarantee
1230that all previously sent elements before the close are received:
1231
1232```kotlin
1233fun main(args: Array<String>) = runBlocking<Unit> {
1234 val channel = Channel<Int>()
1235 launch(CommonPool) {
1236 for (x in 1..5) channel.send(x * x)
1237 channel.close() // we're done sending
1238 }
1239 // here we print received values using `for` loop (until the channel is closed)
1240 for (y in channel) println(y)
1241 println("Done!")
1242}
1243```
1244
Roman Elizarove8d79342017-08-29 15:21:21 +03001245> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001246
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001247<!--- TEST
12481
12494
12509
125116
125225
1253Done!
1254-->
1255
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001256### Building channel producers
1257
Roman Elizarova5e653f2017-02-13 13:49:55 +03001258The pattern where a coroutine is producing a sequence of elements is quite common.
1259This is a part of _producer-consumer_ pattern that is often found in concurrent code.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001260You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
Roman Elizarova5e653f2017-02-13 13:49:55 +03001261to common sense that results must be returned from functions.
1262
Roman Elizarov86349be2017-03-17 16:47:37 +03001263There is a convenience coroutine builder named [produce] that makes it easy to do it right on producer side,
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001264and an extension function [consumeEach], that replaces a `for` loop on the consumer side:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001265
1266```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001267fun produceSquares() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001268 for (x in 1..5) send(x * x)
1269}
1270
1271fun main(args: Array<String>) = runBlocking<Unit> {
1272 val squares = produceSquares()
Roman Elizarov86349be2017-03-17 16:47:37 +03001273 squares.consumeEach { println(it) }
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001274 println("Done!")
1275}
1276```
1277
Roman Elizarove8d79342017-08-29 15:21:21 +03001278> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001279
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001280<!--- TEST
12811
12824
12839
128416
128525
1286Done!
1287-->
1288
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001289### Pipelines
1290
1291Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
1292
1293```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001294fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001295 var x = 1
1296 while (true) send(x++) // infinite stream of integers starting from 1
1297}
1298```
1299
Roman Elizarova5e653f2017-02-13 13:49:55 +03001300And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001301In the below example the numbers are just squared:
1302
1303```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001304fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001305 for (x in numbers) send(x * x)
1306}
1307```
1308
Roman Elizarova5e653f2017-02-13 13:49:55 +03001309The main code starts and connects the whole pipeline:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001310
1311```kotlin
1312fun main(args: Array<String>) = runBlocking<Unit> {
1313 val numbers = produceNumbers() // produces integers from 1 and on
1314 val squares = square(numbers) // squares integers
1315 for (i in 1..5) println(squares.receive()) // print first five
1316 println("Done!") // we are done
1317 squares.cancel() // need to cancel these coroutines in a larger app
1318 numbers.cancel()
1319}
1320```
1321
Roman Elizarove8d79342017-08-29 15:21:21 +03001322> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001323
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001324<!--- TEST
13251
13264
13279
132816
132925
1330Done!
1331-->
1332
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001333We don't have to cancel these coroutines in this example app, because
1334[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
1335but in a larger app we'll need to stop our pipeline if we don't need it anymore.
1336Alternatively, we could have run pipeline coroutines as
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001337[children of a coroutine](#children-of-a-coroutine) as is demonstrated in the following example.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001338
1339### Prime numbers with pipeline
1340
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001341Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001342of coroutines. We start with an infinite sequence of numbers. This time we introduce an
1343explicit context parameter, so that caller can control where our coroutines run:
1344
Roman Elizarove8d79342017-08-29 15:21:21 +03001345<!--- INCLUDE core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001346import kotlin.coroutines.experimental.CoroutineContext
1347-->
1348
1349```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001350fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001351 var x = start
1352 while (true) send(x++) // infinite stream of integers from start
1353}
1354```
1355
1356The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1357that are divisible by the given prime number:
1358
1359```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001360fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001361 for (x in numbers) if (x % prime != 0) send(x)
1362}
1363```
1364
1365Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001366and launching new pipeline stage for each prime number found:
1367
1368```
Roman Elizarova5e653f2017-02-13 13:49:55 +03001369numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
Roman Elizarov62500ba2017-02-09 18:55:40 +03001370```
1371
1372The following example prints the first ten prime numbers,
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001373running the whole pipeline in the context of the main thread. Since all the coroutines are launched as
1374children of the main [runBlocking] coroutine in its [coroutineContext][CoroutineScope.coroutineContext],
1375we don't have to keep an explicit list of all the coroutine we have created.
1376We use [CoroutineContext.cancelChildren] extension to cancel all the children coroutines.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001377
1378```kotlin
1379fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov43e3af72017-07-21 16:01:31 +03001380 var cur = numbersFrom(coroutineContext, 2)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001381 for (i in 1..10) {
1382 val prime = cur.receive()
1383 println(prime)
Roman Elizarov43e3af72017-07-21 16:01:31 +03001384 cur = filter(coroutineContext, cur, prime)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001385 }
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001386 coroutineContext.cancelChildren() // cancel all children to let main finish
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001387}
1388```
1389
Roman Elizarove8d79342017-08-29 15:21:21 +03001390> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001391
1392The output of this code is:
1393
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001394```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +030013952
13963
13975
13987
139911
140013
140117
140219
140323
140429
1405```
1406
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001407<!--- TEST -->
1408
Roman Elizarova5e653f2017-02-13 13:49:55 +03001409Note, that you can build the same pipeline using `buildIterator` coroutine builder from the standard library.
1410Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001411`ReceiveChannel` with `Iterator`, and get rid of the context. You will not need `runBlocking` either.
1412However, the benefit of a pipeline that uses channels as shown above is that it can actually use
1413multiple CPU cores if you run it in [CommonPool] context.
1414
Roman Elizarova5e653f2017-02-13 13:49:55 +03001415Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
Roman Elizarov62500ba2017-02-09 18:55:40 +03001416other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
1417built using `buildSeqeunce`/`buildIterator`, because they do not allow arbitrary suspension, unlike
Roman Elizarova5e653f2017-02-13 13:49:55 +03001418`produce` which is fully asynchronous.
Roman Elizarov62500ba2017-02-09 18:55:40 +03001419
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001420### Fan-out
1421
1422Multiple coroutines may receive from the same channel, distributing work between themselves.
1423Let us start with a producer coroutine that is periodically producing integers
1424(ten numbers per second):
1425
1426```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001427fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001428 var x = 1 // start from 1
1429 while (true) {
1430 send(x++) // produce next
1431 delay(100) // wait 0.1s
1432 }
1433}
1434```
1435
1436Then we can have several processor coroutines. In this example, they just print their id and
1437received number:
1438
1439```kotlin
1440fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
Roman Elizarov86349be2017-03-17 16:47:37 +03001441 channel.consumeEach {
1442 println("Processor #$id received $it")
Roman Elizarovec9384c2017-03-02 22:09:08 +03001443 }
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001444}
1445```
1446
Roman Elizarov35d2c342017-07-20 14:54:39 +03001447Now let us launch five processors and let them work for almost a second. See what happens:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001448
1449```kotlin
1450fun main(args: Array<String>) = runBlocking<Unit> {
1451 val producer = produceNumbers()
1452 repeat(5) { launchProcessor(it, producer) }
Roman Elizarov35d2c342017-07-20 14:54:39 +03001453 delay(950)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001454 producer.cancel() // cancel producer coroutine and thus kill them all
1455}
1456```
1457
Roman Elizarove8d79342017-08-29 15:21:21 +03001458> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001459
1460The output will be similar to the the following one, albeit the processor ids that receive
1461each specific integer may be different:
1462
1463```
1464Processor #2 received 1
1465Processor #4 received 2
1466Processor #0 received 3
1467Processor #1 received 4
1468Processor #3 received 5
1469Processor #2 received 6
1470Processor #4 received 7
1471Processor #0 received 8
1472Processor #1 received 9
1473Processor #3 received 10
1474```
1475
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001476<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
1477
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001478Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1479over the channel that processor coroutines are doing.
1480
1481### Fan-in
1482
1483Multiple coroutines may send to the same channel.
1484For example, let us have a channel of strings, and a suspending function that
1485repeatedly sends a specified string to this channel with a specified delay:
1486
1487```kotlin
1488suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1489 while (true) {
1490 delay(time)
1491 channel.send(s)
1492 }
1493}
1494```
1495
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001496Now, let us see what happens if we launch a couple of coroutines sending strings
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001497(in this example we launch them in the context of the main thread as main coroutine's children):
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001498
1499```kotlin
1500fun main(args: Array<String>) = runBlocking<Unit> {
1501 val channel = Channel<String>()
Roman Elizarov43e3af72017-07-21 16:01:31 +03001502 launch(coroutineContext) { sendString(channel, "foo", 200L) }
1503 launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001504 repeat(6) { // receive first six
1505 println(channel.receive())
1506 }
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001507 coroutineContext.cancelChildren() // cancel all children to let main finish
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001508}
1509```
1510
Roman Elizarove8d79342017-08-29 15:21:21 +03001511> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001512
1513The output is:
1514
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001515```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001516foo
1517foo
1518BAR!
1519foo
1520foo
1521BAR!
1522```
1523
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001524<!--- TEST -->
1525
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001526### Buffered channels
1527
1528The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1529meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1530if receive is invoked first, it is suspended until send is invoked.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001531
Roman Elizarov256812a2017-07-22 01:00:30 +03001532Both [`Channel()`][Channel] factory function and [produce] builder take an optional `capacity` parameter to
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001533specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1534similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1535
1536Take a look at the behavior of the following code:
1537
1538```kotlin
1539fun main(args: Array<String>) = runBlocking<Unit> {
1540 val channel = Channel<Int>(4) // create buffered channel
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001541 val sender = launch(coroutineContext) { // launch sender coroutine
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001542 repeat(10) {
1543 println("Sending $it") // print before sending each element
1544 channel.send(it) // will suspend when buffer is full
1545 }
1546 }
1547 // don't receive anything... just wait....
1548 delay(1000)
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001549 sender.cancel() // cancel sender coroutine
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001550}
1551```
1552
Roman Elizarove8d79342017-08-29 15:21:21 +03001553> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001554
1555It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1556
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001557```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001558Sending 0
1559Sending 1
1560Sending 2
1561Sending 3
1562Sending 4
1563```
1564
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001565<!--- TEST -->
1566
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001567The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001568
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001569### Channels are fair
1570
1571Send and receive operations to channels are _fair_ with respect to the order of their invocation from
1572multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
1573gets the element. In the following example two coroutines "ping" and "pong" are
1574receiving the "ball" object from the shared "table" channel.
1575
1576```kotlin
1577data class Ball(var hits: Int)
1578
1579fun main(args: Array<String>) = runBlocking<Unit> {
1580 val table = Channel<Ball>() // a shared table
Roman Elizarov43e3af72017-07-21 16:01:31 +03001581 launch(coroutineContext) { player("ping", table) }
1582 launch(coroutineContext) { player("pong", table) }
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001583 table.send(Ball(0)) // serve the ball
1584 delay(1000) // delay 1 second
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001585 coroutineContext.cancelChildren() // game over, cancel them
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001586}
1587
1588suspend fun player(name: String, table: Channel<Ball>) {
1589 for (ball in table) { // receive the ball in a loop
1590 ball.hits++
1591 println("$name $ball")
Roman Elizarovf526b132017-03-10 16:07:14 +03001592 delay(300) // wait a bit
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001593 table.send(ball) // send the ball back
1594 }
1595}
1596```
1597
Roman Elizarove8d79342017-08-29 15:21:21 +03001598> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-09.kt)
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001599
1600The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
1601coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
1602received by the "pong" coroutine, because it was already waiting for it:
1603
1604```text
1605ping Ball(hits=1)
1606pong Ball(hits=2)
1607ping Ball(hits=3)
1608pong Ball(hits=4)
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001609```
1610
1611<!--- TEST -->
1612
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001613Note, that sometimes channels may produce executions that look unfair due to the nature of the executor
1614that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details.
1615
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001616## Shared mutable state and concurrency
1617
1618Coroutines can be executed concurrently using a multi-threaded dispatcher like [CommonPool]. It presents
1619all the usual concurrency problems. The main problem being synchronization of access to **shared mutable state**.
1620Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
1621but others are unique.
1622
1623### The problem
1624
Roman Elizarov1e459602017-02-27 11:05:17 +03001625Let us launch a thousand coroutines all doing the same action thousand times (for a total of a million executions).
1626We'll also measure their completion time for further comparisons:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001627
Roman Elizarov43e90112017-05-10 11:25:20 +03001628<!--- INCLUDE .*/example-sync-([0-9a-z]+).kt
Roman Elizarov1e459602017-02-27 11:05:17 +03001629import kotlin.coroutines.experimental.CoroutineContext
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001630import kotlin.system.measureTimeMillis
1631-->
1632
Roman Elizarov1e459602017-02-27 11:05:17 +03001633<!--- INCLUDE .*/example-sync-03.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001634import java.util.concurrent.atomic.AtomicInteger
1635-->
1636
Roman Elizarov1e459602017-02-27 11:05:17 +03001637<!--- INCLUDE .*/example-sync-06.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001638import kotlinx.coroutines.experimental.sync.Mutex
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001639import kotlinx.coroutines.experimental.sync.withLock
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001640-->
1641
Roman Elizarov1e459602017-02-27 11:05:17 +03001642<!--- INCLUDE .*/example-sync-07.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001643import kotlinx.coroutines.experimental.channels.*
1644-->
1645
1646```kotlin
Roman Elizarov1e459602017-02-27 11:05:17 +03001647suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
1648 val n = 1000 // number of coroutines to launch
1649 val k = 1000 // times an action is repeated by each coroutine
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001650 val time = measureTimeMillis {
1651 val jobs = List(n) {
Roman Elizarov1e459602017-02-27 11:05:17 +03001652 launch(context) {
1653 repeat(k) { action() }
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001654 }
1655 }
1656 jobs.forEach { it.join() }
1657 }
Roman Elizarov1e459602017-02-27 11:05:17 +03001658 println("Completed ${n * k} actions in $time ms")
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001659}
1660```
1661
Roman Elizarov43e90112017-05-10 11:25:20 +03001662<!--- INCLUDE .*/example-sync-([0-9a-z]+).kt -->
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001663
Roman Elizarov1e459602017-02-27 11:05:17 +03001664We start with a very simple action that increments a shared mutable variable using
1665multi-threaded [CommonPool] context.
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001666
1667```kotlin
1668var counter = 0
1669
1670fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001671 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001672 counter++
1673 }
1674 println("Counter = $counter")
1675}
1676```
1677
Roman Elizarove8d79342017-08-29 15:21:21 +03001678> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001679
Roman Elizarov1e459602017-02-27 11:05:17 +03001680<!--- TEST LINES_START
1681Completed 1000000 actions in
1682Counter =
1683-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001684
Roman Elizarov1e459602017-02-27 11:05:17 +03001685What does it print at the end? It is highly unlikely to ever print "Counter = 1000000", because a thousand coroutines
1686increment the `counter` concurrently from multiple threads without any synchronization.
1687
Roman Elizarov43e90112017-05-10 11:25:20 +03001688> Note: if you have an old system with 2 or fewer CPUs, then you _will_ consistently see 1000000, because
1689`CommonPool` is running in only one thread in this case. To reproduce the problem you'll need to make the
1690following change:
1691
1692```kotlin
1693val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
1694var counter = 0
1695
1696fun main(args: Array<String>) = runBlocking<Unit> {
1697 massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
1698 counter++
1699 }
1700 println("Counter = $counter")
1701}
1702```
1703
Roman Elizarove8d79342017-08-29 15:21:21 +03001704> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01b.kt)
Roman Elizarov43e90112017-05-10 11:25:20 +03001705
1706<!--- TEST LINES_START
1707Completed 1000000 actions in
1708Counter =
1709-->
1710
Roman Elizarov1e459602017-02-27 11:05:17 +03001711### Volatiles are of no help
1712
1713There is common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
1714
1715```kotlin
1716@Volatile // in Kotlin `volatile` is an annotation
1717var counter = 0
1718
1719fun main(args: Array<String>) = runBlocking<Unit> {
1720 massiveRun(CommonPool) {
1721 counter++
1722 }
1723 println("Counter = $counter")
1724}
1725```
1726
Roman Elizarove8d79342017-08-29 15:21:21 +03001727> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt)
Roman Elizarov1e459602017-02-27 11:05:17 +03001728
1729<!--- TEST LINES_START
1730Completed 1000000 actions in
1731Counter =
1732-->
1733
1734This code works slower, but we still don't get "Counter = 1000000" at the end, because volatile variables guarantee
1735linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but
1736do not provide atomicity of larger actions (increment in our case).
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001737
1738### Thread-safe data structures
1739
1740The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
1741linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
1742operations that needs to be performed on a shared state.
Roman Elizarov1e459602017-02-27 11:05:17 +03001743In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001744
1745```kotlin
1746var counter = AtomicInteger()
1747
1748fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001749 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001750 counter.incrementAndGet()
1751 }
1752 println("Counter = ${counter.get()}")
1753}
1754```
1755
Roman Elizarove8d79342017-08-29 15:21:21 +03001756> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001757
Roman Elizarov1e459602017-02-27 11:05:17 +03001758<!--- TEST ARBITRARY_TIME
1759Completed 1000000 actions in xxx ms
1760Counter = 1000000
1761-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001762
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001763This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
1764standard data structures and basic operations on them. However, it does not easily scale to complex
1765state or to complex operations that do not have ready-to-use thread-safe implementations.
1766
Roman Elizarov1e459602017-02-27 11:05:17 +03001767### Thread confinement fine-grained
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001768
Roman Elizarov1e459602017-02-27 11:05:17 +03001769_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001770state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
1771the single event-dispatch/application thread. It is easy to apply with coroutines by using a
1772single-threaded context:
1773
1774```kotlin
1775val counterContext = newSingleThreadContext("CounterContext")
1776var counter = 0
1777
1778fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001779 massiveRun(CommonPool) { // run each coroutine in CommonPool
1780 run(counterContext) { // but confine each increment to the single-threaded context
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001781 counter++
1782 }
1783 }
1784 println("Counter = $counter")
1785}
1786```
1787
Roman Elizarove8d79342017-08-29 15:21:21 +03001788> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001789
Roman Elizarov1e459602017-02-27 11:05:17 +03001790<!--- TEST ARBITRARY_TIME
1791Completed 1000000 actions in xxx ms
1792Counter = 1000000
1793-->
1794
1795This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches
1796from multi-threaded `CommonPool` context to the single-threaded context using [run] block.
1797
1798### Thread confinement coarse-grained
1799
1800In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic
1801are confined to the single thread. The following example does it like that, running each coroutine in
1802the single-threaded context to start with.
1803
1804```kotlin
1805val counterContext = newSingleThreadContext("CounterContext")
1806var counter = 0
1807
1808fun main(args: Array<String>) = runBlocking<Unit> {
1809 massiveRun(counterContext) { // run each coroutine in the single-threaded context
1810 counter++
1811 }
1812 println("Counter = $counter")
1813}
1814```
1815
Roman Elizarove8d79342017-08-29 15:21:21 +03001816> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt)
Roman Elizarov1e459602017-02-27 11:05:17 +03001817
1818<!--- TEST ARBITRARY_TIME
1819Completed 1000000 actions in xxx ms
1820Counter = 1000000
1821-->
1822
1823This now works much faster and produces correct result.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001824
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001825### Mutual exclusion
1826
1827Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
1828that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
1829Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
1830delimit a critical section. The key difference is that `Mutex.lock` is a suspending function. It does not block a thread.
1831
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001832There is also [Mutex.withLock] extension function that conveniently represents
1833`mutex.lock(); try { ... } finally { mutex.unlock() }` pattern:
1834
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001835```kotlin
1836val mutex = Mutex()
1837var counter = 0
1838
1839fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001840 massiveRun(CommonPool) {
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001841 mutex.withLock {
1842 counter++
1843 }
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001844 }
1845 println("Counter = $counter")
1846}
1847```
1848
Roman Elizarove8d79342017-08-29 15:21:21 +03001849> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001850
Roman Elizarov1e459602017-02-27 11:05:17 +03001851<!--- TEST ARBITRARY_TIME
1852Completed 1000000 actions in xxx ms
1853Counter = 1000000
1854-->
1855
1856The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations
1857where you absolutely must modify some shared state periodically, but there is no natural thread that this state
1858is confined to.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001859
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001860### Actors
1861
1862An actor is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine,
1863and a channel to communicate with other coroutines. A simple actor can be written as a function,
1864but an actor with a complex state is better suited for a class.
1865
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001866There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its
1867scope to receive messages from and combines the send channel into the resulting job object, so that a
1868single reference to the actor can be carried around as its handle.
1869
Roman Elizarov256812a2017-07-22 01:00:30 +03001870The first step of using an actor is to define a class of messages that an actor is going to process.
1871Kotlin's [sealed classes](https://kotlinlang.org/docs/reference/sealed-classes.html) are well suited for that purpose.
1872We define `CounterMsg` sealed class with `IncCounter` message to increment a counter and `GetCounter` message
1873to get its value. The later needs to send a response. A [CompletableDeferred] communication
1874primitive, that represents a single value that will be known (communicated) in the future,
1875is used here for that purpose.
1876
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001877```kotlin
1878// Message types for counterActor
1879sealed class CounterMsg
1880object IncCounter : CounterMsg() // one-way message to increment counter
Roman Elizarov256812a2017-07-22 01:00:30 +03001881class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
1882```
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001883
Roman Elizarov256812a2017-07-22 01:00:30 +03001884Then we define a function that launches an actor using an [actor] coroutine builder:
1885
1886```kotlin
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001887// This function launches a new counter actor
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001888fun counterActor() = actor<CounterMsg>(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001889 var counter = 0 // actor state
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001890 for (msg in channel) { // iterate over incoming messages
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001891 when (msg) {
1892 is IncCounter -> counter++
Roman Elizarov256812a2017-07-22 01:00:30 +03001893 is GetCounter -> msg.response.complete(counter)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001894 }
1895 }
1896}
Roman Elizarov256812a2017-07-22 01:00:30 +03001897```
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001898
Roman Elizarov256812a2017-07-22 01:00:30 +03001899The main code is straightforward:
1900
1901```kotlin
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001902fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001903 val counter = counterActor() // create the actor
Roman Elizarov1e459602017-02-27 11:05:17 +03001904 massiveRun(CommonPool) {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001905 counter.send(IncCounter)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001906 }
Roman Elizarov256812a2017-07-22 01:00:30 +03001907 // send a message to get a counter value from an actor
1908 val response = CompletableDeferred<Int>()
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001909 counter.send(GetCounter(response))
Roman Elizarov256812a2017-07-22 01:00:30 +03001910 println("Counter = ${response.await()}")
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001911 counter.close() // shutdown the actor
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001912}
1913```
1914
Roman Elizarove8d79342017-08-29 15:21:21 +03001915> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001916
Roman Elizarov1e459602017-02-27 11:05:17 +03001917<!--- TEST ARBITRARY_TIME
1918Completed 1000000 actions in xxx ms
1919Counter = 1000000
1920-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001921
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001922It does not matter (for correctness) what context the actor itself is executed in. An actor is
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001923a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
1924works as a solution to the problem of shared mutable state.
1925
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001926Actor is more efficient than locking under load, because in this case it always has work to do and it does not
1927have to switch to a different context at all.
1928
1929> Note, that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated
1930 with the channel that it receives messages from, while a producer is associated with the channel that it
1931 sends elements to.
Roman Elizarov1e459602017-02-27 11:05:17 +03001932
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001933## Select expression
1934
Roman Elizarova84730b2017-02-22 11:58:50 +03001935Select expression makes it possible to await multiple suspending functions simultaneously and _select_
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001936the first one that becomes available.
1937
1938<!--- INCLUDE .*/example-select-([0-9]+).kt
1939import kotlinx.coroutines.experimental.channels.*
1940import kotlinx.coroutines.experimental.selects.*
1941-->
1942
1943### Selecting from channels
1944
Roman Elizarov57857202017-03-02 23:17:25 +03001945Let us have two producers of strings: `fizz` and `buzz`. The `fizz` produces "Fizz" string every 300 ms:
1946
1947<!--- INCLUDE .*/example-select-01.kt
1948import kotlin.coroutines.experimental.CoroutineContext
1949-->
1950
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001951```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001952fun fizz(context: CoroutineContext) = produce<String>(context) {
1953 while (true) { // sends "Fizz" every 300 ms
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001954 delay(300)
1955 send("Fizz")
1956 }
1957}
1958```
1959
Roman Elizarov57857202017-03-02 23:17:25 +03001960And the `buzz` produces "Buzz!" string every 500 ms:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001961
1962```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001963fun buzz(context: CoroutineContext) = produce<String>(context) {
1964 while (true) { // sends "Buzz!" every 500 ms
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001965 delay(500)
1966 send("Buzz!")
1967 }
1968}
1969```
1970
1971Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the
1972other. But [select] expression allows us to receive from _both_ simultaneously using its
Roman Elizarov8a5564d2017-09-06 18:48:22 +03001973[onReceive][ReceiveChannel.onReceive] clauses:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001974
1975```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001976suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001977 select<Unit> { // <Unit> means that this select expression does not produce any result
1978 fizz.onReceive { value -> // this is the first select clause
1979 println("fizz -> '$value'")
1980 }
1981 buzz.onReceive { value -> // this is the second select clause
1982 println("buzz -> '$value'")
1983 }
1984 }
1985}
1986```
1987
Roman Elizarov57857202017-03-02 23:17:25 +03001988Let us run it all seven times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001989
1990```kotlin
1991fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov43e3af72017-07-21 16:01:31 +03001992 val fizz = fizz(coroutineContext)
1993 val buzz = buzz(coroutineContext)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001994 repeat(7) {
Roman Elizarov57857202017-03-02 23:17:25 +03001995 selectFizzBuzz(fizz, buzz)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001996 }
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001997 coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001998}
1999```
2000
Roman Elizarove8d79342017-08-29 15:21:21 +03002001> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002002
2003The result of this code is:
2004
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002005```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002006fizz -> 'Fizz'
2007buzz -> 'Buzz!'
2008fizz -> 'Fizz'
2009fizz -> 'Fizz'
2010buzz -> 'Buzz!'
2011fizz -> 'Fizz'
2012buzz -> 'Buzz!'
2013```
2014
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002015<!--- TEST -->
2016
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002017### Selecting on close
2018
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002019The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed and the corresponding
2020`select` throws an exception. We can use [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] clause to perform a
Roman Elizarova84730b2017-02-22 11:58:50 +03002021specific action when the channel is closed. The following example also shows that `select` is an expression that returns
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002022the result of its selected clause:
2023
2024```kotlin
2025suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
2026 select<String> {
2027 a.onReceiveOrNull { value ->
2028 if (value == null)
2029 "Channel 'a' is closed"
2030 else
2031 "a -> '$value'"
2032 }
2033 b.onReceiveOrNull { value ->
2034 if (value == null)
2035 "Channel 'b' is closed"
2036 else
2037 "b -> '$value'"
2038 }
2039 }
2040```
2041
Roman Elizarova84730b2017-02-22 11:58:50 +03002042Let's use it with channel `a` that produces "Hello" string four times and
2043channel `b` that produces "World" four times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002044
2045```kotlin
2046fun main(args: Array<String>) = runBlocking<Unit> {
2047 // we are using the context of the main thread in this example for predictability ...
Roman Elizarov43e3af72017-07-21 16:01:31 +03002048 val a = produce<String>(coroutineContext) {
Roman Elizarova84730b2017-02-22 11:58:50 +03002049 repeat(4) { send("Hello $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002050 }
Roman Elizarov43e3af72017-07-21 16:01:31 +03002051 val b = produce<String>(coroutineContext) {
Roman Elizarova84730b2017-02-22 11:58:50 +03002052 repeat(4) { send("World $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002053 }
2054 repeat(8) { // print first eight results
2055 println(selectAorB(a, b))
2056 }
Roman Elizarov8b38fa22017-09-27 17:44:31 +03002057 coroutineContext.cancelChildren()
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002058}
2059```
2060
Roman Elizarove8d79342017-08-29 15:21:21 +03002061> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002062
Roman Elizarova84730b2017-02-22 11:58:50 +03002063The result of this code is quite interesting, so we'll analyze it in mode detail:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002064
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002065```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002066a -> 'Hello 0'
2067a -> 'Hello 1'
2068b -> 'World 0'
2069a -> 'Hello 2'
2070a -> 'Hello 3'
2071b -> 'World 1'
2072Channel 'a' is closed
2073Channel 'a' is closed
2074```
2075
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002076<!--- TEST -->
2077
Roman Elizarova84730b2017-02-22 11:58:50 +03002078There are couple of observations to make out of it.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002079
2080First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time,
2081the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel,
Roman Elizarova84730b2017-02-22 11:58:50 +03002082being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002083time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
2084
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002085The second observation, is that [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] gets immediately selected when the
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002086channel is already closed.
2087
2088### Selecting to send
2089
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002090Select expression has [onSend][SendChannel.onSend] clause that can be used for a great good in combination
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002091with a biased nature of selection.
2092
Roman Elizarova84730b2017-02-22 11:58:50 +03002093Let us write an example of producer of integers that sends its values to a `side` channel when
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002094the consumers on its primary channel cannot keep up with it:
2095
Roman Elizarov8b38fa22017-09-27 17:44:31 +03002096<!--- INCLUDE
2097import kotlin.coroutines.experimental.CoroutineContext
2098-->
2099
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002100```kotlin
Roman Elizarov8b38fa22017-09-27 17:44:31 +03002101fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) {
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002102 for (num in 1..10) { // produce 10 numbers from 1 to 10
2103 delay(100) // every 100 ms
2104 select<Unit> {
Roman Elizarova84730b2017-02-22 11:58:50 +03002105 onSend(num) {} // Send to the primary channel
2106 side.onSend(num) {} // or to the side channel
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002107 }
2108 }
2109}
2110```
2111
2112Consumer is going to be quite slow, taking 250 ms to process each number:
2113
2114```kotlin
2115fun main(args: Array<String>) = runBlocking<Unit> {
2116 val side = Channel<Int>() // allocate side channel
Roman Elizarov43e3af72017-07-21 16:01:31 +03002117 launch(coroutineContext) { // this is a very fast consumer for the side channel
Roman Elizarov86349be2017-03-17 16:47:37 +03002118 side.consumeEach { println("Side channel has $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002119 }
Roman Elizarov8b38fa22017-09-27 17:44:31 +03002120 produceNumbers(coroutineContext, side).consumeEach {
Roman Elizarov86349be2017-03-17 16:47:37 +03002121 println("Consuming $it")
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002122 delay(250) // let us digest the consumed number properly, do not hurry
2123 }
2124 println("Done consuming")
Roman Elizarov8b38fa22017-09-27 17:44:31 +03002125 coroutineContext.cancelChildren()
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002126}
2127```
2128
Roman Elizarove8d79342017-08-29 15:21:21 +03002129> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002130
2131So let us see what happens:
2132
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002133```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002134Consuming 1
2135Side channel has 2
2136Side channel has 3
2137Consuming 4
2138Side channel has 5
2139Side channel has 6
2140Consuming 7
2141Side channel has 8
2142Side channel has 9
2143Consuming 10
2144Done consuming
2145```
2146
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002147<!--- TEST -->
2148
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002149### Selecting deferred values
2150
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002151Deferred values can be selected using [onAwait][Deferred.onAwait] clause.
Roman Elizarova84730b2017-02-22 11:58:50 +03002152Let us start with an async function that returns a deferred string value after
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002153a random delay:
2154
2155<!--- INCLUDE .*/example-select-04.kt
2156import java.util.*
2157-->
2158
2159```kotlin
2160fun asyncString(time: Int) = async(CommonPool) {
2161 delay(time.toLong())
2162 "Waited for $time ms"
2163}
2164```
2165
Roman Elizarova84730b2017-02-22 11:58:50 +03002166Let us start a dozen of them with a random delay.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002167
2168```kotlin
2169fun asyncStringsList(): List<Deferred<String>> {
2170 val random = Random(3)
Roman Elizarova84730b2017-02-22 11:58:50 +03002171 return List(12) { asyncString(random.nextInt(1000)) }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002172}
2173```
2174
Roman Elizarova84730b2017-02-22 11:58:50 +03002175Now the main function awaits for the first of them to complete and counts the number of deferred values
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002176that are still active. Note, that we've used here the fact that `select` expression is a Kotlin DSL,
Roman Elizarova84730b2017-02-22 11:58:50 +03002177so we can provide clauses for it using an arbitrary code. In this case we iterate over a list
2178of deferred values to provide `onAwait` clause for each deferred value.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002179
2180```kotlin
2181fun main(args: Array<String>) = runBlocking<Unit> {
2182 val list = asyncStringsList()
2183 val result = select<String> {
2184 list.withIndex().forEach { (index, deferred) ->
2185 deferred.onAwait { answer ->
2186 "Deferred $index produced answer '$answer'"
2187 }
2188 }
2189 }
2190 println(result)
Roman Elizarov7c864d82017-02-27 10:17:50 +03002191 val countActive = list.count { it.isActive }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002192 println("$countActive coroutines are still active")
2193}
2194```
2195
Roman Elizarove8d79342017-08-29 15:21:21 +03002196> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-04.kt)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002197
2198The output is:
2199
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002200```text
Roman Elizarova84730b2017-02-22 11:58:50 +03002201Deferred 4 produced answer 'Waited for 128 ms'
Roman Elizarovd4dcbe22017-02-22 09:57:46 +0300220211 coroutines are still active
2203```
2204
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002205<!--- TEST -->
2206
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002207### Switch over a channel of deferred values
2208
Roman Elizarova84730b2017-02-22 11:58:50 +03002209Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
2210deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002211[onReceiveOrNull][ReceiveChannel.onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002212
2213```kotlin
2214fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String>(CommonPool) {
Roman Elizarova84730b2017-02-22 11:58:50 +03002215 var current = input.receive() // start with first received deferred value
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002216 while (isActive) { // loop while not cancelled/closed
2217 val next = select<Deferred<String>?> { // return next deferred value from this select or null
2218 input.onReceiveOrNull { update ->
2219 update // replaces next value to wait
2220 }
2221 current.onAwait { value ->
2222 send(value) // send value that current deferred has produced
2223 input.receiveOrNull() // and use the next deferred from the input channel
2224 }
2225 }
2226 if (next == null) {
2227 println("Channel was closed")
2228 break // out of loop
2229 } else {
2230 current = next
2231 }
2232 }
2233}
2234```
2235
2236To test it, we'll use a simple async function that resolves to a specified string after a specified time:
2237
2238```kotlin
2239fun asyncString(str: String, time: Long) = async(CommonPool) {
2240 delay(time)
2241 str
2242}
2243```
2244
2245The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
2246data to it:
2247
2248```kotlin
2249fun main(args: Array<String>) = runBlocking<Unit> {
2250 val chan = Channel<Deferred<String>>() // the channel for test
Roman Elizarov43e3af72017-07-21 16:01:31 +03002251 launch(coroutineContext) { // launch printing coroutine
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002252 for (s in switchMapDeferreds(chan))
2253 println(s) // print each received string
2254 }
2255 chan.send(asyncString("BEGIN", 100))
2256 delay(200) // enough time for "BEGIN" to be produced
2257 chan.send(asyncString("Slow", 500))
Roman Elizarova84730b2017-02-22 11:58:50 +03002258 delay(100) // not enough time to produce slow
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002259 chan.send(asyncString("Replace", 100))
Roman Elizarova84730b2017-02-22 11:58:50 +03002260 delay(500) // give it time before the last one
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002261 chan.send(asyncString("END", 500))
2262 delay(1000) // give it time to process
Roman Elizarova84730b2017-02-22 11:58:50 +03002263 chan.close() // close the channel ...
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002264 delay(500) // and wait some time to let it finish
2265}
2266```
2267
Roman Elizarove8d79342017-08-29 15:21:21 +03002268> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-05.kt)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002269
2270The result of this code:
2271
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002272```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002273BEGIN
2274Replace
2275END
2276Channel was closed
2277```
2278
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002279<!--- TEST -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002280
Roman Elizarov8db17332017-03-09 12:40:45 +03002281## Further reading
2282
2283* [Guide to UI programming with coroutines](ui/coroutines-guide-ui.md)
Roman Elizarov8a4a8e12017-03-09 19:52:58 +03002284* [Guide to reactive streams with coroutines](reactive/coroutines-guide-reactive.md)
Roman Elizarov8db17332017-03-09 12:40:45 +03002285* [Coroutines design document (KEEP)](https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md)
2286* [Full kotlinx.coroutines API reference](http://kotlin.github.io/kotlinx.coroutines)
2287
Roman Elizarove7e2ad12017-05-17 14:47:31 +03002288<!--- MODULE kotlinx-coroutines-core -->
Roman Elizarove0c817d2017-02-10 10:22:01 +03002289<!--- INDEX kotlinx.coroutines.experimental -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002290[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
2291[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/delay.html
2292[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
Roman Elizarove82dee72017-08-18 16:49:09 +03002293[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
Roman Elizarov8b38fa22017-09-27 17:44:31 +03002294[cancelAndJoin]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/cancel-and-join.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002295[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-cancellation-exception.html
2296[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002297[CoroutineScope.isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/is-active.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002298[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
2299[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
2300[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
2301[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
Roman Elizarov63f6ea22017-09-06 18:42:34 +03002302[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout-or-null.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002303[async]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/async.html
2304[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
Roman Elizarovecda27f2017-04-06 23:06:26 +03002305[CoroutineStart.LAZY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-start/-l-a-z-y.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002306[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html
2307[Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/start.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002308[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
2309[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
Roman Elizarov43e3af72017-07-21 16:01:31 +03002310[CoroutineScope.coroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/coroutine-context.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002311[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002312[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002313[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002314[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/cancel.html
Roman Elizarov8b38fa22017-09-27 17:44:31 +03002315[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/join.html
Roman Elizarove82dee72017-08-18 16:49:09 +03002316[CompletableDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-completable-deferred/index.html
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002317[Deferred.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/on-await.html
Roman Elizarovf5bc0472017-02-22 11:38:13 +03002318<!--- INDEX kotlinx.coroutines.experimental.sync -->
Roman Elizarove82dee72017-08-18 16:49:09 +03002319[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002320[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/lock.html
2321[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/unlock.html
Roman Elizarove0c817d2017-02-10 10:22:01 +03002322<!--- INDEX kotlinx.coroutines.experimental.channels -->
Roman Elizarove82dee72017-08-18 16:49:09 +03002323[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002324[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
2325[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
2326[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html
Roman Elizarova5e653f2017-02-13 13:49:55 +03002327[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
Roman Elizarov86349be2017-03-17 16:47:37 +03002328[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
Roman Elizarovc0e19f82017-02-27 11:59:14 +03002329[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002330[ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive.html
2331[ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive-or-null.html
2332[SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/on-send.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002333<!--- INDEX kotlinx.coroutines.experimental.selects -->
2334[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002335<!--- END -->