blob: 283e706194019b720d92341f126805fa74aff613 [file] [log] [blame] [view]
Roman Elizarov43e90112017-05-10 11:25:20 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt
Roman Elizarova5e653f2017-02-13 13:49:55 +03002/*
3 * Copyright 2016-2017 JetBrains s.r.o.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
Roman Elizarovf16fd272017-02-07 11:26:00 +030017
Roman Elizarova5e653f2017-02-13 13:49:55 +030018// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
19package guide.$$1.example$$2
Roman Elizarovf16fd272017-02-07 11:26:00 +030020
Roman Elizarova5e653f2017-02-13 13:49:55 +030021import kotlinx.coroutines.experimental.*
Roman Elizarovf16fd272017-02-07 11:26:00 +030022-->
Roman Elizarove8d79342017-08-29 15:21:21 +030023<!--- KNIT core/kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
24<!--- TEST_OUT core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
Roman Elizarov731f0ad2017-02-22 20:48:45 +030025// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
26package guide.test
27
28import org.junit.Test
29
30class GuideTest {
31-->
Roman Elizarovf16fd272017-02-07 11:26:00 +030032
Roman Elizarov7deefb82017-01-31 10:33:17 +030033# Guide to kotlinx.coroutines by example
34
35This is a short guide on core features of `kotlinx.coroutines` with a series of examples.
36
Roman Elizarov2a638922017-03-04 10:22:43 +030037## Introduction and setup
38
39Kotlin, as a language, provides only minimal low-level APIs in its standard library to enable various other
40libraries to utilize coroutines. Unlike many other languages with similar capabilities, `async` and `await`
41are not keywords in Kotlin and are not even part of its standard library.
42
Robert Hencke497d3432017-04-11 00:14:29 -040043`kotlinx.coroutines` is one such rich library. It contains a number of high-level
Roman Elizarov2a638922017-03-04 10:22:43 +030044coroutine-enabled primitives that this guide covers, including `async` and `await`.
45You need to add a dependency on `kotlinx-coroutines-core` module as explained
46[here](README.md#using-in-your-projects) to use primitives from this guide in your projects.
47
Roman Elizarov1293ccd2017-02-01 18:49:54 +030048## Table of contents
49
Roman Elizarovfa7723e2017-02-06 11:17:51 +030050<!--- TOC -->
51
Roman Elizarov1293ccd2017-02-01 18:49:54 +030052* [Coroutine basics](#coroutine-basics)
53 * [Your first coroutine](#your-first-coroutine)
54 * [Bridging blocking and non-blocking worlds](#bridging-blocking-and-non-blocking-worlds)
55 * [Waiting for a job](#waiting-for-a-job)
56 * [Extract function refactoring](#extract-function-refactoring)
57 * [Coroutines ARE light-weight](#coroutines-are-light-weight)
58 * [Coroutines are like daemon threads](#coroutines-are-like-daemon-threads)
59* [Cancellation and timeouts](#cancellation-and-timeouts)
60 * [Cancelling coroutine execution](#cancelling-coroutine-execution)
61 * [Cancellation is cooperative](#cancellation-is-cooperative)
62 * [Making computation code cancellable](#making-computation-code-cancellable)
63 * [Closing resources with finally](#closing-resources-with-finally)
64 * [Run non-cancellable block](#run-non-cancellable-block)
65 * [Timeout](#timeout)
66* [Composing suspending functions](#composing-suspending-functions)
67 * [Sequential by default](#sequential-by-default)
Roman Elizarov32d95322017-02-09 15:57:31 +030068 * [Concurrent using async](#concurrent-using-async)
69 * [Lazily started async](#lazily-started-async)
70 * [Async-style functions](#async-style-functions)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030071* [Coroutine context and dispatchers](#coroutine-context-and-dispatchers)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030072 * [Dispatchers and threads](#dispatchers-and-threads)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030073 * [Unconfined vs confined dispatcher](#unconfined-vs-confined-dispatcher)
74 * [Debugging coroutines and threads](#debugging-coroutines-and-threads)
75 * [Jumping between threads](#jumping-between-threads)
76 * [Job in the context](#job-in-the-context)
77 * [Children of a coroutine](#children-of-a-coroutine)
78 * [Combining contexts](#combining-contexts)
Roman Elizarov8b38fa22017-09-27 17:44:31 +030079 * [Parental responsibilities](#parental-responsibilities)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030080 * [Naming coroutines for debugging](#naming-coroutines-for-debugging)
Roman Elizarov2fd7cb32017-02-11 23:18:59 +030081 * [Cancellation via explicit job](#cancellation-via-explicit-job)
Roman Elizarovb7721cf2017-02-03 19:23:08 +030082* [Channels](#channels)
83 * [Channel basics](#channel-basics)
84 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
85 * [Building channel producers](#building-channel-producers)
86 * [Pipelines](#pipelines)
87 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
88 * [Fan-out](#fan-out)
89 * [Fan-in](#fan-in)
90 * [Buffered channels](#buffered-channels)
Roman Elizarovb0517ba2017-02-27 14:03:14 +030091 * [Channels are fair](#channels-are-fair)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030092* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
93 * [The problem](#the-problem)
Roman Elizarov1e459602017-02-27 11:05:17 +030094 * [Volatiles are of no help](#volatiles-are-of-no-help)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030095 * [Thread-safe data structures](#thread-safe-data-structures)
Roman Elizarov1e459602017-02-27 11:05:17 +030096 * [Thread confinement fine-grained](#thread-confinement-fine-grained)
97 * [Thread confinement coarse-grained](#thread-confinement-coarse-grained)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030098 * [Mutual exclusion](#mutual-exclusion)
99 * [Actors](#actors)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +0300100* [Select expression](#select-expression)
101 * [Selecting from channels](#selecting-from-channels)
102 * [Selecting on close](#selecting-on-close)
103 * [Selecting to send](#selecting-to-send)
104 * [Selecting deferred values](#selecting-deferred-values)
105 * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values)
Roman Elizarov8db17332017-03-09 12:40:45 +0300106* [Further reading](#further-reading)
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300107
Roman Elizarova5e653f2017-02-13 13:49:55 +0300108<!--- END_TOC -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300109
110## Coroutine basics
111
112This section covers basic coroutine concepts.
113
114### Your first coroutine
Roman Elizarov7deefb82017-01-31 10:33:17 +0300115
116Run the following code:
117
118```kotlin
119fun main(args: Array<String>) {
120 launch(CommonPool) { // create new coroutine in common thread pool
121 delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
122 println("World!") // print after delay
123 }
124 println("Hello,") // main function continues while coroutine is delayed
125 Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
126}
127```
128
Roman Elizarove8d79342017-08-29 15:21:21 +0300129> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300130
131Run this code:
132
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300133```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300134Hello,
135World!
136```
137
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300138<!--- TEST -->
139
Roman Elizarov419a6c82017-02-09 18:36:22 +0300140Essentially, coroutines are light-weight threads.
141They are launched with [launch] _coroutine builder_.
142You can achieve the same result replacing
Roman Elizarov7deefb82017-01-31 10:33:17 +0300143`launch(CommonPool) { ... }` with `thread { ... }` and `delay(...)` with `Thread.sleep(...)`. Try it.
144
145If you start by replacing `launch(CommonPool)` by `thread`, the compiler produces the following error:
146
147```
148Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
149```
150
Roman Elizarov419a6c82017-02-09 18:36:22 +0300151That is because [delay] is a special _suspending function_ that does not block a thread, but _suspends_
Roman Elizarov7deefb82017-01-31 10:33:17 +0300152coroutine and it can be only used from a coroutine.
153
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300154### Bridging blocking and non-blocking worlds
Roman Elizarov7deefb82017-01-31 10:33:17 +0300155
156The first example mixes _non-blocking_ `delay(...)` and _blocking_ `Thread.sleep(...)` in the same
157code of `main` function. It is easy to get lost. Let's cleanly separate blocking and non-blocking
Roman Elizarov419a6c82017-02-09 18:36:22 +0300158worlds by using [runBlocking]:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300159
160```kotlin
161fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
162 launch(CommonPool) { // create new coroutine in common thread pool
163 delay(1000L)
164 println("World!")
165 }
166 println("Hello,") // main coroutine continues while child is delayed
167 delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
168}
169```
170
Roman Elizarove8d79342017-08-29 15:21:21 +0300171> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300172
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300173<!--- TEST
174Hello,
175World!
176-->
177
Roman Elizarov419a6c82017-02-09 18:36:22 +0300178The result is the same, but this code uses only non-blocking [delay].
Roman Elizarov7deefb82017-01-31 10:33:17 +0300179
180`runBlocking { ... }` works as an adaptor that is used here to start the top-level main coroutine.
181The regular code outside of `runBlocking` _blocks_, until the coroutine inside `runBlocking` is active.
182
183This is also a way to write unit-tests for suspending functions:
184
185```kotlin
186class MyTest {
187 @Test
188 fun testMySuspendingFunction() = runBlocking<Unit> {
189 // here we can use suspending functions using any assertion style that we like
190 }
191}
192```
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300193
194<!--- CLEAR -->
Roman Elizarov7deefb82017-01-31 10:33:17 +0300195
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300196### Waiting for a job
Roman Elizarov7deefb82017-01-31 10:33:17 +0300197
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300198Delaying for a time while another coroutine is working is not a good approach. Let's explicitly
Roman Elizarov419a6c82017-02-09 18:36:22 +0300199wait (in a non-blocking way) until the background [Job] that we have launched is complete:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300200
201```kotlin
202fun main(args: Array<String>) = runBlocking<Unit> {
203 val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
204 delay(1000L)
205 println("World!")
206 }
207 println("Hello,")
208 job.join() // wait until child coroutine completes
209}
210```
211
Roman Elizarove8d79342017-08-29 15:21:21 +0300212> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300213
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300214<!--- TEST
215Hello,
216World!
217-->
218
Roman Elizarov7deefb82017-01-31 10:33:17 +0300219Now the result is still the same, but the code of the main coroutine is not tied to the duration of
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300220the background job in any way. Much better.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300221
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300222### Extract function refactoring
Roman Elizarov7deefb82017-01-31 10:33:17 +0300223
Kafji48fd6282017-05-28 08:25:10 +0700224Let's extract the block of code inside `launch(CommonPool) { ... }` into a separate function. When you
Roman Elizarov7deefb82017-01-31 10:33:17 +0300225perform "Extract function" refactoring on this code you get a new function with `suspend` modifier.
226That is your first _suspending function_. Suspending functions can be used inside coroutines
227just like regular functions, but their additional feature is that they can, in turn,
228use other suspending functions, like `delay` in this example, to _suspend_ execution of a coroutine.
229
230```kotlin
231fun main(args: Array<String>) = runBlocking<Unit> {
232 val job = launch(CommonPool) { doWorld() }
233 println("Hello,")
234 job.join()
235}
236
237// this is your first suspending function
238suspend fun doWorld() {
239 delay(1000L)
240 println("World!")
241}
242```
243
Roman Elizarove8d79342017-08-29 15:21:21 +0300244> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-04.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300245
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300246<!--- TEST
247Hello,
248World!
249-->
250
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300251### Coroutines ARE light-weight
Roman Elizarov7deefb82017-01-31 10:33:17 +0300252
253Run the following code:
254
255```kotlin
256fun main(args: Array<String>) = runBlocking<Unit> {
257 val jobs = List(100_000) { // create a lot of coroutines and list their jobs
258 launch(CommonPool) {
259 delay(1000L)
260 print(".")
261 }
262 }
263 jobs.forEach { it.join() } // wait for all jobs to complete
264}
265```
266
Roman Elizarove8d79342017-08-29 15:21:21 +0300267> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-05.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300268
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300269<!--- TEST lines.size == 1 && lines[0] == ".".repeat(100_000) -->
270
Roman Elizarov7deefb82017-01-31 10:33:17 +0300271It starts 100K coroutines and, after a second, each coroutine prints a dot.
272Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
273
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300274### Coroutines are like daemon threads
Roman Elizarov7deefb82017-01-31 10:33:17 +0300275
276The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300277returns from the main function after some delay:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300278
279```kotlin
280fun main(args: Array<String>) = runBlocking<Unit> {
281 launch(CommonPool) {
282 repeat(1000) { i ->
283 println("I'm sleeping $i ...")
284 delay(500L)
285 }
286 }
287 delay(1300L) // just quit after delay
288}
289```
290
Roman Elizarove8d79342017-08-29 15:21:21 +0300291> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-06.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300292
293You can run and see that it prints three lines and terminates:
294
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300295```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300296I'm sleeping 0 ...
297I'm sleeping 1 ...
298I'm sleeping 2 ...
299```
300
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300301<!--- TEST -->
302
Roman Elizarov7deefb82017-01-31 10:33:17 +0300303Active coroutines do not keep the process alive. They are like daemon threads.
304
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300305## Cancellation and timeouts
306
307This section covers coroutine cancellation and timeouts.
308
309### Cancelling coroutine execution
Roman Elizarov7deefb82017-01-31 10:33:17 +0300310
311In small application the return from "main" method might sound like a good idea to get all coroutines
312implicitly terminated. In a larger, long-running application, you need finer-grained control.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300313The [launch] function returns a [Job] that can be used to cancel running coroutine:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300314
315```kotlin
316fun main(args: Array<String>) = runBlocking<Unit> {
317 val job = launch(CommonPool) {
318 repeat(1000) { i ->
319 println("I'm sleeping $i ...")
320 delay(500L)
321 }
322 }
323 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300324 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300325 job.cancel() // cancels the job
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300326 job.join() // waits for job's completion
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300327 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300328}
329```
330
Roman Elizarove8d79342017-08-29 15:21:21 +0300331> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300332
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300333It produces the following output:
334
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300335```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300336I'm sleeping 0 ...
337I'm sleeping 1 ...
338I'm sleeping 2 ...
339main: I'm tired of waiting!
340main: Now I can quit.
341```
342
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300343<!--- TEST -->
344
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300345As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
Roman Elizarov88396732017-09-27 21:30:47 +0300346There is also a [Job] extension function [cancelAndJoin]
347that combines [cancel][Job.cancel] and [join][Job.join] invocations.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300348
349### Cancellation is cooperative
Roman Elizarov7deefb82017-01-31 10:33:17 +0300350
Tair Rzayevaf734622017-02-01 22:30:16 +0200351Coroutine cancellation is _cooperative_. A coroutine code has to cooperate to be cancellable.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300352All the suspending functions in `kotlinx.coroutines` are _cancellable_. They check for cancellation of
Roman Elizarov419a6c82017-02-09 18:36:22 +0300353coroutine and throw [CancellationException] when cancelled. However, if a coroutine is working in
Roman Elizarov7deefb82017-01-31 10:33:17 +0300354a computation and does not check for cancellation, then it cannot be cancelled, like the following
355example shows:
356
357```kotlin
358fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov24cd6542017-08-03 21:20:04 -0700359 val startTime = System.currentTimeMillis()
Roman Elizarov7deefb82017-01-31 10:33:17 +0300360 val job = launch(CommonPool) {
Roman Elizarov24cd6542017-08-03 21:20:04 -0700361 var nextPrintTime = startTime
Roman Elizarov7deefb82017-01-31 10:33:17 +0300362 var i = 0
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300363 while (i < 5) { // computation loop, just wastes CPU
Roman Elizarov24cd6542017-08-03 21:20:04 -0700364 // print a message twice a second
365 if (System.currentTimeMillis() >= nextPrintTime) {
Roman Elizarov7deefb82017-01-31 10:33:17 +0300366 println("I'm sleeping ${i++} ...")
Roman Elizarov35d2c342017-07-20 14:54:39 +0300367 nextPrintTime += 500L
Roman Elizarov7deefb82017-01-31 10:33:17 +0300368 }
369 }
370 }
371 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300372 println("main: I'm tired of waiting!")
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300373 job.cancelAndJoin() // cancels the job and waits for its completion
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300374 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300375}
376```
377
Roman Elizarove8d79342017-08-29 15:21:21 +0300378> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300379
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300380Run it to see that it continues to print "I'm sleeping" even after cancellation
381until the job completes by itself after five iterations.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300382
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300383<!--- TEST
384I'm sleeping 0 ...
385I'm sleeping 1 ...
386I'm sleeping 2 ...
387main: I'm tired of waiting!
388I'm sleeping 3 ...
389I'm sleeping 4 ...
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300390main: Now I can quit.
391-->
392
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300393### Making computation code cancellable
Roman Elizarov7deefb82017-01-31 10:33:17 +0300394
395There are two approaches to making computation code cancellable. The first one is to periodically
Roman Elizarov419a6c82017-02-09 18:36:22 +0300396invoke a suspending function. There is a [yield] function that is a good choice for that purpose.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300397The other one is to explicitly check the cancellation status. Let us try the later approach.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300398
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300399Replace `while (i < 5)` in the previous example with `while (isActive)` and rerun it.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300400
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300401```kotlin
402fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov24cd6542017-08-03 21:20:04 -0700403 val startTime = System.currentTimeMillis()
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300404 val job = launch(CommonPool) {
Roman Elizarov24cd6542017-08-03 21:20:04 -0700405 var nextPrintTime = startTime
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300406 var i = 0
407 while (isActive) { // cancellable computation loop
Roman Elizarov24cd6542017-08-03 21:20:04 -0700408 // print a message twice a second
409 if (System.currentTimeMillis() >= nextPrintTime) {
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300410 println("I'm sleeping ${i++} ...")
Roman Elizarov24cd6542017-08-03 21:20:04 -0700411 nextPrintTime += 500L
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300412 }
413 }
414 }
415 delay(1300L) // delay a bit
416 println("main: I'm tired of waiting!")
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300417 job.cancelAndJoin() // cancels the job and waits for its completion
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300418 println("main: Now I can quit.")
419}
420```
421
Roman Elizarove8d79342017-08-29 15:21:21 +0300422> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300423
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300424As you can see, now this loop is cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
Roman Elizarov419a6c82017-02-09 18:36:22 +0300425the code of coroutines via [CoroutineScope] object.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300426
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300427<!--- TEST
428I'm sleeping 0 ...
429I'm sleeping 1 ...
430I'm sleeping 2 ...
431main: I'm tired of waiting!
432main: Now I can quit.
433-->
434
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300435### Closing resources with finally
436
Roman Elizarov419a6c82017-02-09 18:36:22 +0300437Cancellable suspending functions throw [CancellationException] on cancellation which can be handled in
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300438all the usual way. For example, the `try {...} finally {...}` and Kotlin `use` function execute their
439finalization actions normally when coroutine is cancelled:
440
441```kotlin
442fun main(args: Array<String>) = runBlocking<Unit> {
443 val job = launch(CommonPool) {
444 try {
445 repeat(1000) { i ->
446 println("I'm sleeping $i ...")
447 delay(500L)
448 }
449 } finally {
450 println("I'm running finally")
451 }
452 }
453 delay(1300L) // delay a bit
454 println("main: I'm tired of waiting!")
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300455 job.cancelAndJoin() // cancels the job and waits for its completion
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300456 println("main: Now I can quit.")
457}
458```
459
Roman Elizarove8d79342017-08-29 15:21:21 +0300460> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300461
Roman Elizarov88396732017-09-27 21:30:47 +0300462Both [join][Job.join] and [cancelAndJoin] wait for all the finalization actions to complete,
463so the example above produces the following output:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300464
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300465```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300466I'm sleeping 0 ...
467I'm sleeping 1 ...
468I'm sleeping 2 ...
469main: I'm tired of waiting!
470I'm running finally
471main: Now I can quit.
472```
473
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300474<!--- TEST -->
475
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300476### Run non-cancellable block
477
478Any attempt to use a suspending function in the `finally` block of the previous example will cause
Roman Elizarov419a6c82017-02-09 18:36:22 +0300479[CancellationException], because the coroutine running this code is cancelled. Usually, this is not a
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300480problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
481communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
482rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
Roman Elizarov419a6c82017-02-09 18:36:22 +0300483`run(NonCancellable) {...}` using [run] function and [NonCancellable] context as the following example shows:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300484
485```kotlin
486fun main(args: Array<String>) = runBlocking<Unit> {
487 val job = launch(CommonPool) {
488 try {
489 repeat(1000) { i ->
490 println("I'm sleeping $i ...")
491 delay(500L)
492 }
493 } finally {
494 run(NonCancellable) {
495 println("I'm running finally")
496 delay(1000L)
497 println("And I've just delayed for 1 sec because I'm non-cancellable")
498 }
499 }
500 }
501 delay(1300L) // delay a bit
502 println("main: I'm tired of waiting!")
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300503 job.cancelAndJoin() // cancels the job and waits for its completion
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300504 println("main: Now I can quit.")
505}
506```
507
Roman Elizarove8d79342017-08-29 15:21:21 +0300508> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300509
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300510<!--- TEST
511I'm sleeping 0 ...
512I'm sleeping 1 ...
513I'm sleeping 2 ...
514main: I'm tired of waiting!
515I'm running finally
516And I've just delayed for 1 sec because I'm non-cancellable
517main: Now I can quit.
518-->
519
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300520### Timeout
521
522The most obvious reason to cancel coroutine execution in practice,
523is because its execution time has exceeded some timeout.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300524While you can manually track the reference to the corresponding [Job] and launch a separate coroutine to cancel
525the tracked one after delay, there is a ready to use [withTimeout] function that does it.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300526Look at the following example:
527
528```kotlin
529fun main(args: Array<String>) = runBlocking<Unit> {
530 withTimeout(1300L) {
531 repeat(1000) { i ->
532 println("I'm sleeping $i ...")
533 delay(500L)
534 }
535 }
536}
537```
538
Roman Elizarove8d79342017-08-29 15:21:21 +0300539> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-06.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300540
541It produces the following output:
542
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300543```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300544I'm sleeping 0 ...
545I'm sleeping 1 ...
546I'm sleeping 2 ...
Roman Elizarov63f6ea22017-09-06 18:42:34 +0300547Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300548```
549
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300550<!--- TEST STARTS_WITH -->
551
Roman Elizarov63f6ea22017-09-06 18:42:34 +0300552The `TimeoutCancellationException` that is thrown by [withTimeout] is a subclass of [CancellationException].
Roman Elizarovca9d5be2017-04-20 19:23:18 +0300553We have not seen its stack trace printed on the console before. That is because
Roman Elizarov7c864d82017-02-27 10:17:50 +0300554inside a cancelled coroutine `CancellationException` is considered to be a normal reason for coroutine completion.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300555However, in this example we have used `withTimeout` right inside the `main` function.
556
557Because cancellation is just an exception, all the resources will be closed in a usual way.
Roman Elizarov63f6ea22017-09-06 18:42:34 +0300558You can wrap the code with timeout in `try {...} catch (e: TimeoutCancellationException) {...}` block if
559you need to do some additional action specifically on any kind of timeout or use [withTimeoutOrNull] function
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300560that is similar to [withTimeout], but returns `null` on timeout instead of throwing an exception:
561
562```kotlin
563fun main(args: Array<String>) = runBlocking<Unit> {
564 val result = withTimeoutOrNull(1300L) {
565 repeat(1000) { i ->
566 println("I'm sleeping $i ...")
567 delay(500L)
568 }
569 "Done" // will get cancelled before it produces this result
570 }
571 println("Result is $result")
572}
573```
574
575> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-07.kt)
576
577There is no longer an exception when running this code:
578
579```text
580I'm sleeping 0 ...
581I'm sleeping 1 ...
582I'm sleeping 2 ...
583Result is null
584```
585
586<!--- TEST -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300587
588## Composing suspending functions
589
590This section covers various approaches to composition of suspending functions.
591
592### Sequential by default
593
594Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300595remote service call or computation. We just pretend they are useful, but actually each one just
596delays for a second for the purpose of this example:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300597
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300598<!--- INCLUDE .*/example-compose-([0-9]+).kt
599import kotlin.system.measureTimeMillis
600-->
601
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300602```kotlin
603suspend fun doSomethingUsefulOne(): Int {
604 delay(1000L) // pretend we are doing something useful here
605 return 13
606}
607
608suspend fun doSomethingUsefulTwo(): Int {
609 delay(1000L) // pretend we are doing something useful here, too
610 return 29
611}
612```
613
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300614<!--- INCLUDE .*/example-compose-([0-9]+).kt -->
615
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300616What do we do if need to invoke them _sequentially_ -- first `doSomethingUsefulOne` _and then_
617`doSomethingUsefulTwo` and compute the sum of their results?
618In practise we do this if we use the results of the first function to make a decision on whether we need
619to invoke the second one or to decide on how to invoke it.
620
621We just use a normal sequential invocation, because the code in the coroutine, just like in the regular
Roman Elizarov32d95322017-02-09 15:57:31 +0300622code, is _sequential_ by default. The following example demonstrates it by measuring the total
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300623time it takes to execute both suspending functions:
624
625```kotlin
626fun main(args: Array<String>) = runBlocking<Unit> {
627 val time = measureTimeMillis {
628 val one = doSomethingUsefulOne()
629 val two = doSomethingUsefulTwo()
630 println("The answer is ${one + two}")
631 }
632 println("Completed in $time ms")
633}
634```
635
Roman Elizarove8d79342017-08-29 15:21:21 +0300636> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-01.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300637
638It produces something like this:
639
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300640```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300641The answer is 42
642Completed in 2017 ms
643```
644
Roman Elizarov35d2c342017-07-20 14:54:39 +0300645<!--- TEST ARBITRARY_TIME -->
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300646
Roman Elizarov32d95322017-02-09 15:57:31 +0300647### Concurrent using async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300648
649What if there are no dependencies between invocation of `doSomethingUsefulOne` and `doSomethingUsefulTwo` and
Roman Elizarov419a6c82017-02-09 18:36:22 +0300650we want to get the answer faster, by doing both _concurrently_? This is where [async] comes to help.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300651
Roman Elizarov419a6c82017-02-09 18:36:22 +0300652Conceptually, [async] is just like [launch]. It starts a separate coroutine which is a light-weight thread
653that works concurrently with all the other coroutines. The difference is that `launch` returns a [Job] and
654does not carry any resulting value, while `async` returns a [Deferred] -- a light-weight non-blocking future
Roman Elizarov32d95322017-02-09 15:57:31 +0300655that represents a promise to provide a result later. You can use `.await()` on a deferred value to get its eventual result,
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300656but `Deferred` is also a `Job`, so you can cancel it if needed.
657
658```kotlin
659fun main(args: Array<String>) = runBlocking<Unit> {
660 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300661 val one = async(CommonPool) { doSomethingUsefulOne() }
662 val two = async(CommonPool) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300663 println("The answer is ${one.await() + two.await()}")
664 }
665 println("Completed in $time ms")
666}
667```
668
Roman Elizarove8d79342017-08-29 15:21:21 +0300669> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-02.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300670
671It produces something like this:
672
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300673```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300674The answer is 42
675Completed in 1017 ms
676```
677
Roman Elizarov35d2c342017-07-20 14:54:39 +0300678<!--- TEST ARBITRARY_TIME -->
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300679
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300680This is twice as fast, because we have concurrent execution of two coroutines.
681Note, that concurrency with coroutines is always explicit.
682
Roman Elizarov32d95322017-02-09 15:57:31 +0300683### Lazily started async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300684
Roman Elizarovecda27f2017-04-06 23:06:26 +0300685There is a laziness option to [async] with [CoroutineStart.LAZY] parameter.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300686It starts coroutine only when its result is needed by some
687[await][Deferred.await] or if a [start][Job.start] function
Roman Elizarov32d95322017-02-09 15:57:31 +0300688is invoked. Run the following example that differs from the previous one only by this option:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300689
690```kotlin
691fun main(args: Array<String>) = runBlocking<Unit> {
692 val time = measureTimeMillis {
Roman Elizarovecda27f2017-04-06 23:06:26 +0300693 val one = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulOne() }
694 val two = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300695 println("The answer is ${one.await() + two.await()}")
696 }
697 println("Completed in $time ms")
698}
699```
700
Roman Elizarove8d79342017-08-29 15:21:21 +0300701> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-03.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300702
703It produces something like this:
704
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300705```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300706The answer is 42
707Completed in 2017 ms
708```
709
Roman Elizarov35d2c342017-07-20 14:54:39 +0300710<!--- TEST ARBITRARY_TIME -->
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300711
Roman Elizarov32d95322017-02-09 15:57:31 +0300712So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
713for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
714the standard `lazy` function in cases when computation of the value involves suspending functions.
715
716### Async-style functions
717
718We can define async-style functions that invoke `doSomethingUsefulOne` and `doSomethingUsefulTwo`
Roman Elizarov419a6c82017-02-09 18:36:22 +0300719_asynchronously_ using [async] coroutine builder. It is a good style to name such functions with
Roman Elizarov32d95322017-02-09 15:57:31 +0300720either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous
721computation and one needs to use the resulting deferred value to get the result.
722
723```kotlin
724// The result type of asyncSomethingUsefulOne is Deferred<Int>
725fun asyncSomethingUsefulOne() = async(CommonPool) {
726 doSomethingUsefulOne()
727}
728
729// The result type of asyncSomethingUsefulTwo is Deferred<Int>
730fun asyncSomethingUsefulTwo() = async(CommonPool) {
731 doSomethingUsefulTwo()
732}
733```
734
735Note, that these `asyncXXX` function are **not** _suspending_ functions. They can be used from anywhere.
736However, their use always implies asynchronous (here meaning _concurrent_) execution of their action
737with the invoking code.
738
739The following example shows their use outside of coroutine:
740
741```kotlin
742// note, that we don't have `runBlocking` to the right of `main` in this example
743fun main(args: Array<String>) {
744 val time = measureTimeMillis {
745 // we can initiate async actions outside of a coroutine
746 val one = asyncSomethingUsefulOne()
747 val two = asyncSomethingUsefulTwo()
748 // but waiting for a result must involve either suspending or blocking.
749 // here we use `runBlocking { ... }` to block the main thread while waiting for the result
750 runBlocking {
751 println("The answer is ${one.await() + two.await()}")
752 }
753 }
754 println("Completed in $time ms")
755}
756```
757
Roman Elizarove8d79342017-08-29 15:21:21 +0300758> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300759
Roman Elizarov35d2c342017-07-20 14:54:39 +0300760<!--- TEST ARBITRARY_TIME
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300761The answer is 42
762Completed in 1085 ms
763-->
764
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300765## Coroutine context and dispatchers
766
Roman Elizarov32d95322017-02-09 15:57:31 +0300767We've already seen `launch(CommonPool) {...}`, `async(CommonPool) {...}`, `run(NonCancellable) {...}`, etc.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300768In these code snippets [CommonPool] and [NonCancellable] are _coroutine contexts_.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300769This section covers other available choices.
770
771### Dispatchers and threads
772
Roman Elizarov419a6c82017-02-09 18:36:22 +0300773Coroutine context includes a [_coroutine dispatcher_][CoroutineDispatcher] which determines what thread or threads
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300774the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
775to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:
776
777```kotlin
778fun main(args: Array<String>) = runBlocking<Unit> {
779 val jobs = arrayListOf<Job>()
780 jobs += launch(Unconfined) { // not confined -- will work with main thread
Roman Elizarov43e3af72017-07-21 16:01:31 +0300781 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300782 }
Roman Elizarov43e3af72017-07-21 16:01:31 +0300783 jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
784 println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300785 }
786 jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
Roman Elizarov43e3af72017-07-21 16:01:31 +0300787 println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300788 }
789 jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
Roman Elizarov43e3af72017-07-21 16:01:31 +0300790 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300791 }
792 jobs.forEach { it.join() }
793}
794```
795
Roman Elizarove8d79342017-08-29 15:21:21 +0300796> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-01.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300797
798It produces the following output (maybe in different order):
799
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300800```text
Roman Elizarov43e3af72017-07-21 16:01:31 +0300801 'Unconfined': I'm working in thread main
802 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
803 'newSTC': I'm working in thread MyOwnThread
804'coroutineContext': I'm working in thread main
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300805```
806
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300807<!--- TEST LINES_START_UNORDERED -->
808
Roman Elizarov43e3af72017-07-21 16:01:31 +0300809The difference between parent [coroutineContext][CoroutineScope.coroutineContext] and
810[Unconfined] context will be shown later.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300811
812### Unconfined vs confined dispatcher
813
Roman Elizarov419a6c82017-02-09 18:36:22 +0300814The [Unconfined] coroutine dispatcher starts coroutine in the caller thread, but only until the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300815first suspension point. After suspension it resumes in the thread that is fully determined by the
816suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not
817consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
818
Roman Elizarov43e3af72017-07-21 16:01:31 +0300819On the other side, [coroutineContext][CoroutineScope.coroutineContext] property that is available inside the block of any coroutine
Roman Elizarov419a6c82017-02-09 18:36:22 +0300820via [CoroutineScope] interface, is a reference to a context of this particular coroutine.
821This way, a parent context can be inherited. The default context of [runBlocking], in particular,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300822is confined to be invoker thread, so inheriting it has the effect of confining execution to
823this thread with a predictable FIFO scheduling.
824
825```kotlin
826fun main(args: Array<String>) = runBlocking<Unit> {
827 val jobs = arrayListOf<Job>()
828 jobs += launch(Unconfined) { // not confined -- will work with main thread
Roman Elizarov43e3af72017-07-21 16:01:31 +0300829 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarovd0021622017-03-10 15:43:38 +0300830 delay(500)
Roman Elizarov43e3af72017-07-21 16:01:31 +0300831 println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300832 }
Roman Elizarov43e3af72017-07-21 16:01:31 +0300833 jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
834 println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300835 delay(1000)
Roman Elizarov43e3af72017-07-21 16:01:31 +0300836 println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300837 }
838 jobs.forEach { it.join() }
839}
840```
841
Roman Elizarove8d79342017-08-29 15:21:21 +0300842> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-02.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300843
844Produces the output:
845
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300846```text
Roman Elizarov43e3af72017-07-21 16:01:31 +0300847 'Unconfined': I'm working in thread main
848'coroutineContext': I'm working in thread main
849 'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
850'coroutineContext': After delay in thread main
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300851```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300852
853<!--- TEST LINES_START -->
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300854
Roman Elizarov43e3af72017-07-21 16:01:31 +0300855So, the coroutine that had inherited `coroutineContext` of `runBlocking {...}` continues to execute
856in the `main` thread, while the unconfined one had resumed in the default executor thread that [delay]
857function is using.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300858
859### Debugging coroutines and threads
860
Roman Elizarov419a6c82017-02-09 18:36:22 +0300861Coroutines can suspend on one thread and resume on another thread with [Unconfined] dispatcher or
862with a multi-threaded dispatcher like [CommonPool]. Even with a single-threaded dispatcher it might be hard to
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300863figure out what coroutine was doing what, where, and when. The common approach to debugging applications with
864threads is to print the thread name in the log file on each log statement. This feature is universally supported
865by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
866`kotlinx.coroutines` includes debugging facilities to make it easier.
867
868Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
869
870```kotlin
871fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
872
873fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov43e3af72017-07-21 16:01:31 +0300874 val a = async(coroutineContext) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300875 log("I'm computing a piece of the answer")
876 6
877 }
Roman Elizarov43e3af72017-07-21 16:01:31 +0300878 val b = async(coroutineContext) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300879 log("I'm computing another piece of the answer")
880 7
881 }
882 log("The answer is ${a.await() * b.await()}")
883}
884```
885
Roman Elizarove8d79342017-08-29 15:21:21 +0300886> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300887
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300888There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300889and two coroutines computing deferred values `a` (#2) and `b` (#3).
890They are all executing in the context of `runBlocking` and are confined to the main thread.
891The output of this code is:
892
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300893```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300894[main @coroutine#2] I'm computing a piece of the answer
895[main @coroutine#3] I'm computing another piece of the answer
896[main @coroutine#1] The answer is 42
897```
898
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300899<!--- TEST -->
900
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300901The `log` function prints the name of the thread in square brackets and you can see, that it is the `main`
902thread, but the identifier of the currently executing coroutine is appended to it. This identifier
903is consecutively assigned to all created coroutines when debugging mode is turned on.
904
Roman Elizarov419a6c82017-02-09 18:36:22 +0300905You can read more about debugging facilities in the documentation for [newCoroutineContext] function.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300906
907### Jumping between threads
908
909Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
910
911```kotlin
912fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
913
914fun main(args: Array<String>) {
915 val ctx1 = newSingleThreadContext("Ctx1")
916 val ctx2 = newSingleThreadContext("Ctx2")
917 runBlocking(ctx1) {
918 log("Started in ctx1")
919 run(ctx2) {
920 log("Working in ctx2")
921 }
922 log("Back to ctx1")
923 }
924}
925```
926
Roman Elizarove8d79342017-08-29 15:21:21 +0300927> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300928
Roman Elizarov419a6c82017-02-09 18:36:22 +0300929It demonstrates two new techniques. One is using [runBlocking] with an explicitly specified context, and
930the second one is using [run] function to change a context of a coroutine while still staying in the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300931same coroutine as you can see in the output below:
932
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300933```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300934[Ctx1 @coroutine#1] Started in ctx1
935[Ctx2 @coroutine#1] Working in ctx2
936[Ctx1 @coroutine#1] Back to ctx1
937```
938
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300939<!--- TEST -->
940
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300941### Job in the context
942
Roman Elizarov419a6c82017-02-09 18:36:22 +0300943The coroutine [Job] is part of its context. The coroutine can retrieve it from its own context
Roman Elizarov43e3af72017-07-21 16:01:31 +0300944using `coroutineContext[Job]` expression:
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300945
946```kotlin
947fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov43e3af72017-07-21 16:01:31 +0300948 println("My job is ${coroutineContext[Job]}")
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300949}
950```
951
Roman Elizarove8d79342017-08-29 15:21:21 +0300952> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-05.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300953
shifujun81a6f232017-06-18 15:37:59 +0800954It produces something like
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300955
956```
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300957My job is "coroutine#1":BlockingCoroutine{Active}@6d311334
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300958```
959
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300960<!--- TEST lines.size == 1 && lines[0].startsWith("My job is \"coroutine#1\":BlockingCoroutine{Active}@") -->
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300961
Roman Elizarov43e3af72017-07-21 16:01:31 +0300962So, [isActive][CoroutineScope.isActive] in [CoroutineScope] is just a convenient shortcut for
963`coroutineContext[Job]!!.isActive`.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300964
965### Children of a coroutine
966
Roman Elizarov43e3af72017-07-21 16:01:31 +0300967When [coroutineContext][CoroutineScope.coroutineContext] of a coroutine is used to launch another coroutine,
Roman Elizarov419a6c82017-02-09 18:36:22 +0300968the [Job] of the new coroutine becomes
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300969a _child_ of the parent coroutine's job. When the parent coroutine is cancelled, all its children
970are recursively cancelled, too.
971
972```kotlin
973fun main(args: Array<String>) = runBlocking<Unit> {
974 // start a coroutine to process some kind of incoming request
975 val request = launch(CommonPool) {
976 // it spawns two other jobs, one with its separate context
977 val job1 = launch(CommonPool) {
978 println("job1: I have my own context and execute independently!")
979 delay(1000)
980 println("job1: I am not affected by cancellation of the request")
981 }
982 // and the other inherits the parent context
Roman Elizarov43e3af72017-07-21 16:01:31 +0300983 val job2 = launch(coroutineContext) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300984 println("job2: I am a child of the request coroutine")
985 delay(1000)
986 println("job2: I will not execute this line if my parent request is cancelled")
987 }
988 // request completes when both its sub-jobs complete:
989 job1.join()
990 job2.join()
991 }
992 delay(500)
993 request.cancel() // cancel processing of the request
994 delay(1000) // delay a second to see what happens
995 println("main: Who has survived request cancellation?")
996}
997```
998
Roman Elizarove8d79342017-08-29 15:21:21 +0300999> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-06.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001000
1001The output of this code is:
1002
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001003```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001004job1: I have my own context and execute independently!
1005job2: I am a child of the request coroutine
1006job1: I am not affected by cancellation of the request
1007main: Who has survived request cancellation?
1008```
1009
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001010<!--- TEST -->
1011
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001012### Combining contexts
1013
1014Coroutine context can be combined using `+` operator. The context on the right-hand side replaces relevant entries
Roman Elizarov419a6c82017-02-09 18:36:22 +03001015of the context on the left-hand side. For example, a [Job] of the parent coroutine can be inherited, while
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001016its dispatcher replaced:
1017
1018```kotlin
1019fun main(args: Array<String>) = runBlocking<Unit> {
1020 // start a coroutine to process some kind of incoming request
Roman Elizarov43e3af72017-07-21 16:01:31 +03001021 val request = launch(coroutineContext) { // use the context of `runBlocking`
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001022 // spawns CPU-intensive child job in CommonPool !!!
Roman Elizarov43e3af72017-07-21 16:01:31 +03001023 val job = launch(coroutineContext + CommonPool) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001024 println("job: I am a child of the request coroutine, but with a different dispatcher")
1025 delay(1000)
1026 println("job: I will not execute this line if my parent request is cancelled")
1027 }
1028 job.join() // request completes when its sub-job completes
1029 }
1030 delay(500)
1031 request.cancel() // cancel processing of the request
1032 delay(1000) // delay a second to see what happens
1033 println("main: Who has survived request cancellation?")
1034}
1035```
1036
Roman Elizarove8d79342017-08-29 15:21:21 +03001037> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-07.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001038
1039The expected outcome of this code is:
1040
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001041```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001042job: I am a child of the request coroutine, but with a different dispatcher
1043main: Who has survived request cancellation?
1044```
1045
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001046<!--- TEST -->
1047
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001048### Parental responsibilities
1049
1050A parent coroutine always waits for completion of all its children. Parent does not have to explicitly track
Roman Elizarov88396732017-09-27 21:30:47 +03001051all the children it launches and it does not have to use [Job.join] to wait for them at the end:
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001052
1053```kotlin
1054fun main(args: Array<String>) = runBlocking<Unit> {
1055 // start a coroutine to process some kind of incoming request
1056 val request = launch(CommonPool) {
1057 repeat(3) { i -> // launch a few children jobs
1058 launch(coroutineContext) {
1059 delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
1060 println("Coroutine $i is done")
1061 }
1062 }
1063 println("request: I'm done and I don't explicitly join my children that are still active")
1064 }
1065 request.join() // wait for completion of the request, including all its children
1066 println("Now processing of the request is complete")
1067}
1068```
1069
1070> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
1071
1072The result is going to be:
1073
1074```text
1075request: I'm done and I don't explicitly join my children that are still active
1076Coroutine 0 is done
1077Coroutine 1 is done
1078Coroutine 2 is done
1079Now processing of the request is complete
1080```
1081
1082<!--- TEST -->
1083
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001084### Naming coroutines for debugging
1085
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001086Automatically assigned ids are good when coroutines log often and you just need to correlate log records
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001087coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
1088or doing some specific background task, it is better to name it explicitly for debugging purposes.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001089[CoroutineName] serves the same function as a thread name. It'll get displayed in the thread name that
Victor Osolovskiyed5ed492017-06-25 00:03:35 +03001090is executing this coroutine when debugging mode is turned on.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001091
1092The following example demonstrates this concept:
1093
1094```kotlin
1095fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
1096
1097fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
1098 log("Started main coroutine")
1099 // run two background value computations
Roman Elizarov32d95322017-02-09 15:57:31 +03001100 val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001101 log("Computing v1")
1102 delay(500)
1103 252
1104 }
Roman Elizarov32d95322017-02-09 15:57:31 +03001105 val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001106 log("Computing v2")
1107 delay(1000)
1108 6
1109 }
1110 log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
1111}
1112```
1113
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001114> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001115
1116The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
1117
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001118```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001119[main @main#1] Started main coroutine
1120[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
1121[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
1122[main @main#1] The answer for v1 / v2 = 42
1123```
Roman Elizarov1293ccd2017-02-01 18:49:54 +03001124
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001125<!--- TEST FLEXIBLE_THREAD -->
1126
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001127### Cancellation via explicit job
1128
1129Let us put our knowledge about contexts, children and jobs together. Assume that our application has
1130an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application
1131and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch
1132and update data, do animations, etc. All of these coroutines must be cancelled when activity is destroyed
1133to avoid memory leaks.
1134
1135We can manage a lifecycle of our coroutines by creating an instance of [Job] that is tied to
Roman Elizarov88396732017-09-27 21:30:47 +03001136the lifecycle of our activity. A job instance is created using [Job()] factory function
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001137as the following example shows. We need to make sure that all the coroutines are started
1138with this job in their context and then a single invocation of [Job.cancel] terminates them all.
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001139Moreover, [Job.join] waits for all of them to complete, so we can also use [cancelAndJoin] here in
1140this example:
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001141
1142```kotlin
1143fun main(args: Array<String>) = runBlocking<Unit> {
1144 val job = Job() // create a job object to manage our lifecycle
1145 // now launch ten coroutines for a demo, each working for a different time
1146 val coroutines = List(10) { i ->
1147 // they are all children of our job object
Roman Elizarov43e3af72017-07-21 16:01:31 +03001148 launch(coroutineContext + job) { // we use the context of main runBlocking thread, but with our own job object
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001149 delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001150 println("Coroutine $i is done")
1151 }
1152 }
1153 println("Launched ${coroutines.size} coroutines")
1154 delay(500L) // delay for half a second
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001155 println("Cancelling the job!")
1156 job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001157}
1158```
1159
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001160> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-10.kt)
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001161
1162The output of this example is:
1163
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001164```text
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001165Launched 10 coroutines
1166Coroutine 0 is done
1167Coroutine 1 is done
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001168Cancelling the job!
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001169```
1170
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001171<!--- TEST -->
1172
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001173As you can see, only the first three coroutines had printed a message and the others were cancelled
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001174by a single invocation of `job.cancelAndJoin()`. So all we need to do in our hypothetical Android
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001175application is to create a parent job object when activity is created, use it for child coroutines,
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001176and cancel it when activity is destroyed. We cannot `join` them in the case of Android lifecycle,
1177since it is synchronous, but this joining ability is useful when building backend services to ensure bounded
1178resource usage.
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001179
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001180## Channels
Roman Elizarov7deefb82017-01-31 10:33:17 +03001181
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001182Deferred values provide a convenient way to transfer a single value between coroutines.
1183Channels provide a way to transfer a stream of values.
1184
1185<!--- INCLUDE .*/example-channel-([0-9]+).kt
1186import kotlinx.coroutines.experimental.channels.*
1187-->
1188
1189### Channel basics
1190
Roman Elizarov419a6c82017-02-09 18:36:22 +03001191A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
1192instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
1193a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001194
1195```kotlin
1196fun main(args: Array<String>) = runBlocking<Unit> {
1197 val channel = Channel<Int>()
1198 launch(CommonPool) {
1199 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
1200 for (x in 1..5) channel.send(x * x)
1201 }
1202 // here we print five received integers:
1203 repeat(5) { println(channel.receive()) }
1204 println("Done!")
1205}
1206```
1207
Roman Elizarove8d79342017-08-29 15:21:21 +03001208> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001209
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001210The output of this code is:
1211
1212```text
12131
12144
12159
121616
121725
1218Done!
1219```
1220
1221<!--- TEST -->
1222
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001223### Closing and iteration over channels
1224
1225Unlike a queue, a channel can be closed to indicate that no more elements are coming.
1226On the receiver side it is convenient to use a regular `for` loop to receive elements
1227from the channel.
1228
Roman Elizarov419a6c82017-02-09 18:36:22 +03001229Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001230The iteration stops as soon as this close token is received, so there is a guarantee
1231that all previously sent elements before the close are received:
1232
1233```kotlin
1234fun main(args: Array<String>) = runBlocking<Unit> {
1235 val channel = Channel<Int>()
1236 launch(CommonPool) {
1237 for (x in 1..5) channel.send(x * x)
1238 channel.close() // we're done sending
1239 }
1240 // here we print received values using `for` loop (until the channel is closed)
1241 for (y in channel) println(y)
1242 println("Done!")
1243}
1244```
1245
Roman Elizarove8d79342017-08-29 15:21:21 +03001246> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001247
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001248<!--- TEST
12491
12504
12519
125216
125325
1254Done!
1255-->
1256
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001257### Building channel producers
1258
Roman Elizarova5e653f2017-02-13 13:49:55 +03001259The pattern where a coroutine is producing a sequence of elements is quite common.
1260This is a part of _producer-consumer_ pattern that is often found in concurrent code.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001261You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
Roman Elizarova5e653f2017-02-13 13:49:55 +03001262to common sense that results must be returned from functions.
1263
Roman Elizarov86349be2017-03-17 16:47:37 +03001264There is a convenience coroutine builder named [produce] that makes it easy to do it right on producer side,
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001265and an extension function [consumeEach], that replaces a `for` loop on the consumer side:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001266
1267```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001268fun produceSquares() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001269 for (x in 1..5) send(x * x)
1270}
1271
1272fun main(args: Array<String>) = runBlocking<Unit> {
1273 val squares = produceSquares()
Roman Elizarov86349be2017-03-17 16:47:37 +03001274 squares.consumeEach { println(it) }
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001275 println("Done!")
1276}
1277```
1278
Roman Elizarove8d79342017-08-29 15:21:21 +03001279> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001280
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001281<!--- TEST
12821
12834
12849
128516
128625
1287Done!
1288-->
1289
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001290### Pipelines
1291
1292Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
1293
1294```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001295fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001296 var x = 1
1297 while (true) send(x++) // infinite stream of integers starting from 1
1298}
1299```
1300
Roman Elizarova5e653f2017-02-13 13:49:55 +03001301And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001302In the below example the numbers are just squared:
1303
1304```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001305fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001306 for (x in numbers) send(x * x)
1307}
1308```
1309
Roman Elizarova5e653f2017-02-13 13:49:55 +03001310The main code starts and connects the whole pipeline:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001311
1312```kotlin
1313fun main(args: Array<String>) = runBlocking<Unit> {
1314 val numbers = produceNumbers() // produces integers from 1 and on
1315 val squares = square(numbers) // squares integers
1316 for (i in 1..5) println(squares.receive()) // print first five
1317 println("Done!") // we are done
1318 squares.cancel() // need to cancel these coroutines in a larger app
1319 numbers.cancel()
1320}
1321```
1322
Roman Elizarove8d79342017-08-29 15:21:21 +03001323> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001324
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001325<!--- TEST
13261
13274
13289
132916
133025
1331Done!
1332-->
1333
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001334We don't have to cancel these coroutines in this example app, because
1335[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
1336but in a larger app we'll need to stop our pipeline if we don't need it anymore.
1337Alternatively, we could have run pipeline coroutines as
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001338[children of a coroutine](#children-of-a-coroutine) as is demonstrated in the following example.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001339
1340### Prime numbers with pipeline
1341
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001342Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001343of coroutines. We start with an infinite sequence of numbers. This time we introduce an
1344explicit context parameter, so that caller can control where our coroutines run:
1345
Roman Elizarove8d79342017-08-29 15:21:21 +03001346<!--- INCLUDE core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001347import kotlin.coroutines.experimental.CoroutineContext
1348-->
1349
1350```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001351fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001352 var x = start
1353 while (true) send(x++) // infinite stream of integers from start
1354}
1355```
1356
1357The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1358that are divisible by the given prime number:
1359
1360```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001361fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001362 for (x in numbers) if (x % prime != 0) send(x)
1363}
1364```
1365
1366Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001367and launching new pipeline stage for each prime number found:
1368
1369```
Roman Elizarova5e653f2017-02-13 13:49:55 +03001370numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
Roman Elizarov62500ba2017-02-09 18:55:40 +03001371```
1372
1373The following example prints the first ten prime numbers,
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001374running the whole pipeline in the context of the main thread. Since all the coroutines are launched as
1375children of the main [runBlocking] coroutine in its [coroutineContext][CoroutineScope.coroutineContext],
1376we don't have to keep an explicit list of all the coroutine we have created.
Roman Elizarov88396732017-09-27 21:30:47 +03001377We use [cancelChildren] extension function to cancel all the children coroutines.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001378
1379```kotlin
1380fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov43e3af72017-07-21 16:01:31 +03001381 var cur = numbersFrom(coroutineContext, 2)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001382 for (i in 1..10) {
1383 val prime = cur.receive()
1384 println(prime)
Roman Elizarov43e3af72017-07-21 16:01:31 +03001385 cur = filter(coroutineContext, cur, prime)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001386 }
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001387 coroutineContext.cancelChildren() // cancel all children to let main finish
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001388}
1389```
1390
Roman Elizarove8d79342017-08-29 15:21:21 +03001391> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001392
1393The output of this code is:
1394
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001395```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +030013962
13973
13985
13997
140011
140113
140217
140319
140423
140529
1406```
1407
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001408<!--- TEST -->
1409
Roman Elizarova5e653f2017-02-13 13:49:55 +03001410Note, that you can build the same pipeline using `buildIterator` coroutine builder from the standard library.
1411Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001412`ReceiveChannel` with `Iterator`, and get rid of the context. You will not need `runBlocking` either.
1413However, the benefit of a pipeline that uses channels as shown above is that it can actually use
1414multiple CPU cores if you run it in [CommonPool] context.
1415
Roman Elizarova5e653f2017-02-13 13:49:55 +03001416Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
Roman Elizarov62500ba2017-02-09 18:55:40 +03001417other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
1418built using `buildSeqeunce`/`buildIterator`, because they do not allow arbitrary suspension, unlike
Roman Elizarova5e653f2017-02-13 13:49:55 +03001419`produce` which is fully asynchronous.
Roman Elizarov62500ba2017-02-09 18:55:40 +03001420
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001421### Fan-out
1422
1423Multiple coroutines may receive from the same channel, distributing work between themselves.
1424Let us start with a producer coroutine that is periodically producing integers
1425(ten numbers per second):
1426
1427```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001428fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001429 var x = 1 // start from 1
1430 while (true) {
1431 send(x++) // produce next
1432 delay(100) // wait 0.1s
1433 }
1434}
1435```
1436
1437Then we can have several processor coroutines. In this example, they just print their id and
1438received number:
1439
1440```kotlin
1441fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
Roman Elizarov86349be2017-03-17 16:47:37 +03001442 channel.consumeEach {
1443 println("Processor #$id received $it")
Roman Elizarovec9384c2017-03-02 22:09:08 +03001444 }
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001445}
1446```
1447
Roman Elizarov35d2c342017-07-20 14:54:39 +03001448Now let us launch five processors and let them work for almost a second. See what happens:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001449
1450```kotlin
1451fun main(args: Array<String>) = runBlocking<Unit> {
1452 val producer = produceNumbers()
1453 repeat(5) { launchProcessor(it, producer) }
Roman Elizarov35d2c342017-07-20 14:54:39 +03001454 delay(950)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001455 producer.cancel() // cancel producer coroutine and thus kill them all
1456}
1457```
1458
Roman Elizarove8d79342017-08-29 15:21:21 +03001459> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001460
1461The output will be similar to the the following one, albeit the processor ids that receive
1462each specific integer may be different:
1463
1464```
1465Processor #2 received 1
1466Processor #4 received 2
1467Processor #0 received 3
1468Processor #1 received 4
1469Processor #3 received 5
1470Processor #2 received 6
1471Processor #4 received 7
1472Processor #0 received 8
1473Processor #1 received 9
1474Processor #3 received 10
1475```
1476
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001477<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
1478
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001479Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1480over the channel that processor coroutines are doing.
1481
1482### Fan-in
1483
1484Multiple coroutines may send to the same channel.
1485For example, let us have a channel of strings, and a suspending function that
1486repeatedly sends a specified string to this channel with a specified delay:
1487
1488```kotlin
1489suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1490 while (true) {
1491 delay(time)
1492 channel.send(s)
1493 }
1494}
1495```
1496
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001497Now, let us see what happens if we launch a couple of coroutines sending strings
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001498(in this example we launch them in the context of the main thread as main coroutine's children):
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001499
1500```kotlin
1501fun main(args: Array<String>) = runBlocking<Unit> {
1502 val channel = Channel<String>()
Roman Elizarov43e3af72017-07-21 16:01:31 +03001503 launch(coroutineContext) { sendString(channel, "foo", 200L) }
1504 launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001505 repeat(6) { // receive first six
1506 println(channel.receive())
1507 }
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001508 coroutineContext.cancelChildren() // cancel all children to let main finish
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001509}
1510```
1511
Roman Elizarove8d79342017-08-29 15:21:21 +03001512> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001513
1514The output is:
1515
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001516```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001517foo
1518foo
1519BAR!
1520foo
1521foo
1522BAR!
1523```
1524
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001525<!--- TEST -->
1526
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001527### Buffered channels
1528
1529The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1530meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1531if receive is invoked first, it is suspended until send is invoked.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001532
Roman Elizarov88396732017-09-27 21:30:47 +03001533Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001534specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1535similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1536
1537Take a look at the behavior of the following code:
1538
1539```kotlin
1540fun main(args: Array<String>) = runBlocking<Unit> {
1541 val channel = Channel<Int>(4) // create buffered channel
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001542 val sender = launch(coroutineContext) { // launch sender coroutine
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001543 repeat(10) {
1544 println("Sending $it") // print before sending each element
1545 channel.send(it) // will suspend when buffer is full
1546 }
1547 }
1548 // don't receive anything... just wait....
1549 delay(1000)
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001550 sender.cancel() // cancel sender coroutine
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001551}
1552```
1553
Roman Elizarove8d79342017-08-29 15:21:21 +03001554> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001555
1556It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1557
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001558```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001559Sending 0
1560Sending 1
1561Sending 2
1562Sending 3
1563Sending 4
1564```
1565
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001566<!--- TEST -->
1567
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001568The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001569
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001570### Channels are fair
1571
1572Send and receive operations to channels are _fair_ with respect to the order of their invocation from
1573multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
1574gets the element. In the following example two coroutines "ping" and "pong" are
1575receiving the "ball" object from the shared "table" channel.
1576
1577```kotlin
1578data class Ball(var hits: Int)
1579
1580fun main(args: Array<String>) = runBlocking<Unit> {
1581 val table = Channel<Ball>() // a shared table
Roman Elizarov43e3af72017-07-21 16:01:31 +03001582 launch(coroutineContext) { player("ping", table) }
1583 launch(coroutineContext) { player("pong", table) }
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001584 table.send(Ball(0)) // serve the ball
1585 delay(1000) // delay 1 second
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001586 coroutineContext.cancelChildren() // game over, cancel them
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001587}
1588
1589suspend fun player(name: String, table: Channel<Ball>) {
1590 for (ball in table) { // receive the ball in a loop
1591 ball.hits++
1592 println("$name $ball")
Roman Elizarovf526b132017-03-10 16:07:14 +03001593 delay(300) // wait a bit
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001594 table.send(ball) // send the ball back
1595 }
1596}
1597```
1598
Roman Elizarove8d79342017-08-29 15:21:21 +03001599> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-09.kt)
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001600
1601The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
1602coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
1603received by the "pong" coroutine, because it was already waiting for it:
1604
1605```text
1606ping Ball(hits=1)
1607pong Ball(hits=2)
1608ping Ball(hits=3)
1609pong Ball(hits=4)
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001610```
1611
1612<!--- TEST -->
1613
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001614Note, that sometimes channels may produce executions that look unfair due to the nature of the executor
1615that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details.
1616
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001617## Shared mutable state and concurrency
1618
1619Coroutines can be executed concurrently using a multi-threaded dispatcher like [CommonPool]. It presents
1620all the usual concurrency problems. The main problem being synchronization of access to **shared mutable state**.
1621Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
1622but others are unique.
1623
1624### The problem
1625
Roman Elizarov1e459602017-02-27 11:05:17 +03001626Let us launch a thousand coroutines all doing the same action thousand times (for a total of a million executions).
1627We'll also measure their completion time for further comparisons:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001628
Roman Elizarov43e90112017-05-10 11:25:20 +03001629<!--- INCLUDE .*/example-sync-([0-9a-z]+).kt
Roman Elizarov1e459602017-02-27 11:05:17 +03001630import kotlin.coroutines.experimental.CoroutineContext
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001631import kotlin.system.measureTimeMillis
1632-->
1633
Roman Elizarov1e459602017-02-27 11:05:17 +03001634<!--- INCLUDE .*/example-sync-03.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001635import java.util.concurrent.atomic.AtomicInteger
1636-->
1637
Roman Elizarov1e459602017-02-27 11:05:17 +03001638<!--- INCLUDE .*/example-sync-06.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001639import kotlinx.coroutines.experimental.sync.Mutex
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001640import kotlinx.coroutines.experimental.sync.withLock
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001641-->
1642
Roman Elizarov1e459602017-02-27 11:05:17 +03001643<!--- INCLUDE .*/example-sync-07.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001644import kotlinx.coroutines.experimental.channels.*
1645-->
1646
1647```kotlin
Roman Elizarov1e459602017-02-27 11:05:17 +03001648suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
1649 val n = 1000 // number of coroutines to launch
1650 val k = 1000 // times an action is repeated by each coroutine
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001651 val time = measureTimeMillis {
1652 val jobs = List(n) {
Roman Elizarov1e459602017-02-27 11:05:17 +03001653 launch(context) {
1654 repeat(k) { action() }
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001655 }
1656 }
1657 jobs.forEach { it.join() }
1658 }
Roman Elizarov1e459602017-02-27 11:05:17 +03001659 println("Completed ${n * k} actions in $time ms")
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001660}
1661```
1662
Roman Elizarov43e90112017-05-10 11:25:20 +03001663<!--- INCLUDE .*/example-sync-([0-9a-z]+).kt -->
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001664
Roman Elizarov1e459602017-02-27 11:05:17 +03001665We start with a very simple action that increments a shared mutable variable using
1666multi-threaded [CommonPool] context.
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001667
1668```kotlin
1669var counter = 0
1670
1671fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001672 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001673 counter++
1674 }
1675 println("Counter = $counter")
1676}
1677```
1678
Roman Elizarove8d79342017-08-29 15:21:21 +03001679> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001680
Roman Elizarov1e459602017-02-27 11:05:17 +03001681<!--- TEST LINES_START
1682Completed 1000000 actions in
1683Counter =
1684-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001685
Roman Elizarov1e459602017-02-27 11:05:17 +03001686What does it print at the end? It is highly unlikely to ever print "Counter = 1000000", because a thousand coroutines
1687increment the `counter` concurrently from multiple threads without any synchronization.
1688
Roman Elizarov43e90112017-05-10 11:25:20 +03001689> Note: if you have an old system with 2 or fewer CPUs, then you _will_ consistently see 1000000, because
1690`CommonPool` is running in only one thread in this case. To reproduce the problem you'll need to make the
1691following change:
1692
1693```kotlin
1694val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
1695var counter = 0
1696
1697fun main(args: Array<String>) = runBlocking<Unit> {
1698 massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
1699 counter++
1700 }
1701 println("Counter = $counter")
1702}
1703```
1704
Roman Elizarove8d79342017-08-29 15:21:21 +03001705> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01b.kt)
Roman Elizarov43e90112017-05-10 11:25:20 +03001706
1707<!--- TEST LINES_START
1708Completed 1000000 actions in
1709Counter =
1710-->
1711
Roman Elizarov1e459602017-02-27 11:05:17 +03001712### Volatiles are of no help
1713
1714There is common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
1715
1716```kotlin
1717@Volatile // in Kotlin `volatile` is an annotation
1718var counter = 0
1719
1720fun main(args: Array<String>) = runBlocking<Unit> {
1721 massiveRun(CommonPool) {
1722 counter++
1723 }
1724 println("Counter = $counter")
1725}
1726```
1727
Roman Elizarove8d79342017-08-29 15:21:21 +03001728> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt)
Roman Elizarov1e459602017-02-27 11:05:17 +03001729
1730<!--- TEST LINES_START
1731Completed 1000000 actions in
1732Counter =
1733-->
1734
1735This code works slower, but we still don't get "Counter = 1000000" at the end, because volatile variables guarantee
1736linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but
1737do not provide atomicity of larger actions (increment in our case).
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001738
1739### Thread-safe data structures
1740
1741The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
1742linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
1743operations that needs to be performed on a shared state.
Roman Elizarov1e459602017-02-27 11:05:17 +03001744In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001745
1746```kotlin
1747var counter = AtomicInteger()
1748
1749fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001750 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001751 counter.incrementAndGet()
1752 }
1753 println("Counter = ${counter.get()}")
1754}
1755```
1756
Roman Elizarove8d79342017-08-29 15:21:21 +03001757> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001758
Roman Elizarov1e459602017-02-27 11:05:17 +03001759<!--- TEST ARBITRARY_TIME
1760Completed 1000000 actions in xxx ms
1761Counter = 1000000
1762-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001763
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001764This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
1765standard data structures and basic operations on them. However, it does not easily scale to complex
1766state or to complex operations that do not have ready-to-use thread-safe implementations.
1767
Roman Elizarov1e459602017-02-27 11:05:17 +03001768### Thread confinement fine-grained
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001769
Roman Elizarov1e459602017-02-27 11:05:17 +03001770_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001771state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
1772the single event-dispatch/application thread. It is easy to apply with coroutines by using a
1773single-threaded context:
1774
1775```kotlin
1776val counterContext = newSingleThreadContext("CounterContext")
1777var counter = 0
1778
1779fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001780 massiveRun(CommonPool) { // run each coroutine in CommonPool
1781 run(counterContext) { // but confine each increment to the single-threaded context
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001782 counter++
1783 }
1784 }
1785 println("Counter = $counter")
1786}
1787```
1788
Roman Elizarove8d79342017-08-29 15:21:21 +03001789> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001790
Roman Elizarov1e459602017-02-27 11:05:17 +03001791<!--- TEST ARBITRARY_TIME
1792Completed 1000000 actions in xxx ms
1793Counter = 1000000
1794-->
1795
1796This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches
1797from multi-threaded `CommonPool` context to the single-threaded context using [run] block.
1798
1799### Thread confinement coarse-grained
1800
1801In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic
1802are confined to the single thread. The following example does it like that, running each coroutine in
1803the single-threaded context to start with.
1804
1805```kotlin
1806val counterContext = newSingleThreadContext("CounterContext")
1807var counter = 0
1808
1809fun main(args: Array<String>) = runBlocking<Unit> {
1810 massiveRun(counterContext) { // run each coroutine in the single-threaded context
1811 counter++
1812 }
1813 println("Counter = $counter")
1814}
1815```
1816
Roman Elizarove8d79342017-08-29 15:21:21 +03001817> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt)
Roman Elizarov1e459602017-02-27 11:05:17 +03001818
1819<!--- TEST ARBITRARY_TIME
1820Completed 1000000 actions in xxx ms
1821Counter = 1000000
1822-->
1823
1824This now works much faster and produces correct result.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001825
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001826### Mutual exclusion
1827
1828Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
1829that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
1830Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
1831delimit a critical section. The key difference is that `Mutex.lock` is a suspending function. It does not block a thread.
1832
Roman Elizarov88396732017-09-27 21:30:47 +03001833There is also [withLock] extension function that conveniently represents
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001834`mutex.lock(); try { ... } finally { mutex.unlock() }` pattern:
1835
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001836```kotlin
1837val mutex = Mutex()
1838var counter = 0
1839
1840fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001841 massiveRun(CommonPool) {
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001842 mutex.withLock {
1843 counter++
1844 }
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001845 }
1846 println("Counter = $counter")
1847}
1848```
1849
Roman Elizarove8d79342017-08-29 15:21:21 +03001850> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001851
Roman Elizarov1e459602017-02-27 11:05:17 +03001852<!--- TEST ARBITRARY_TIME
1853Completed 1000000 actions in xxx ms
1854Counter = 1000000
1855-->
1856
1857The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations
1858where you absolutely must modify some shared state periodically, but there is no natural thread that this state
1859is confined to.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001860
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001861### Actors
1862
1863An actor is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine,
1864and a channel to communicate with other coroutines. A simple actor can be written as a function,
1865but an actor with a complex state is better suited for a class.
1866
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001867There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its
1868scope to receive messages from and combines the send channel into the resulting job object, so that a
1869single reference to the actor can be carried around as its handle.
1870
Roman Elizarov256812a2017-07-22 01:00:30 +03001871The first step of using an actor is to define a class of messages that an actor is going to process.
1872Kotlin's [sealed classes](https://kotlinlang.org/docs/reference/sealed-classes.html) are well suited for that purpose.
1873We define `CounterMsg` sealed class with `IncCounter` message to increment a counter and `GetCounter` message
1874to get its value. The later needs to send a response. A [CompletableDeferred] communication
1875primitive, that represents a single value that will be known (communicated) in the future,
1876is used here for that purpose.
1877
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001878```kotlin
1879// Message types for counterActor
1880sealed class CounterMsg
1881object IncCounter : CounterMsg() // one-way message to increment counter
Roman Elizarov256812a2017-07-22 01:00:30 +03001882class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
1883```
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001884
Roman Elizarov256812a2017-07-22 01:00:30 +03001885Then we define a function that launches an actor using an [actor] coroutine builder:
1886
1887```kotlin
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001888// This function launches a new counter actor
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001889fun counterActor() = actor<CounterMsg>(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001890 var counter = 0 // actor state
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001891 for (msg in channel) { // iterate over incoming messages
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001892 when (msg) {
1893 is IncCounter -> counter++
Roman Elizarov256812a2017-07-22 01:00:30 +03001894 is GetCounter -> msg.response.complete(counter)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001895 }
1896 }
1897}
Roman Elizarov256812a2017-07-22 01:00:30 +03001898```
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001899
Roman Elizarov256812a2017-07-22 01:00:30 +03001900The main code is straightforward:
1901
1902```kotlin
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001903fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001904 val counter = counterActor() // create the actor
Roman Elizarov1e459602017-02-27 11:05:17 +03001905 massiveRun(CommonPool) {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001906 counter.send(IncCounter)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001907 }
Roman Elizarov256812a2017-07-22 01:00:30 +03001908 // send a message to get a counter value from an actor
1909 val response = CompletableDeferred<Int>()
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001910 counter.send(GetCounter(response))
Roman Elizarov256812a2017-07-22 01:00:30 +03001911 println("Counter = ${response.await()}")
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001912 counter.close() // shutdown the actor
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001913}
1914```
1915
Roman Elizarove8d79342017-08-29 15:21:21 +03001916> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001917
Roman Elizarov1e459602017-02-27 11:05:17 +03001918<!--- TEST ARBITRARY_TIME
1919Completed 1000000 actions in xxx ms
1920Counter = 1000000
1921-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001922
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001923It does not matter (for correctness) what context the actor itself is executed in. An actor is
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001924a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
1925works as a solution to the problem of shared mutable state.
1926
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001927Actor is more efficient than locking under load, because in this case it always has work to do and it does not
1928have to switch to a different context at all.
1929
1930> Note, that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated
1931 with the channel that it receives messages from, while a producer is associated with the channel that it
1932 sends elements to.
Roman Elizarov1e459602017-02-27 11:05:17 +03001933
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001934## Select expression
1935
Roman Elizarova84730b2017-02-22 11:58:50 +03001936Select expression makes it possible to await multiple suspending functions simultaneously and _select_
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001937the first one that becomes available.
1938
1939<!--- INCLUDE .*/example-select-([0-9]+).kt
1940import kotlinx.coroutines.experimental.channels.*
1941import kotlinx.coroutines.experimental.selects.*
1942-->
1943
1944### Selecting from channels
1945
Roman Elizarov57857202017-03-02 23:17:25 +03001946Let us have two producers of strings: `fizz` and `buzz`. The `fizz` produces "Fizz" string every 300 ms:
1947
1948<!--- INCLUDE .*/example-select-01.kt
1949import kotlin.coroutines.experimental.CoroutineContext
1950-->
1951
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001952```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001953fun fizz(context: CoroutineContext) = produce<String>(context) {
1954 while (true) { // sends "Fizz" every 300 ms
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001955 delay(300)
1956 send("Fizz")
1957 }
1958}
1959```
1960
Roman Elizarov57857202017-03-02 23:17:25 +03001961And the `buzz` produces "Buzz!" string every 500 ms:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001962
1963```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001964fun buzz(context: CoroutineContext) = produce<String>(context) {
1965 while (true) { // sends "Buzz!" every 500 ms
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001966 delay(500)
1967 send("Buzz!")
1968 }
1969}
1970```
1971
1972Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the
1973other. But [select] expression allows us to receive from _both_ simultaneously using its
Roman Elizarov8a5564d2017-09-06 18:48:22 +03001974[onReceive][ReceiveChannel.onReceive] clauses:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001975
1976```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001977suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001978 select<Unit> { // <Unit> means that this select expression does not produce any result
1979 fizz.onReceive { value -> // this is the first select clause
1980 println("fizz -> '$value'")
1981 }
1982 buzz.onReceive { value -> // this is the second select clause
1983 println("buzz -> '$value'")
1984 }
1985 }
1986}
1987```
1988
Roman Elizarov57857202017-03-02 23:17:25 +03001989Let us run it all seven times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001990
1991```kotlin
1992fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov43e3af72017-07-21 16:01:31 +03001993 val fizz = fizz(coroutineContext)
1994 val buzz = buzz(coroutineContext)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001995 repeat(7) {
Roman Elizarov57857202017-03-02 23:17:25 +03001996 selectFizzBuzz(fizz, buzz)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001997 }
Roman Elizarov8b38fa22017-09-27 17:44:31 +03001998 coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001999}
2000```
2001
Roman Elizarove8d79342017-08-29 15:21:21 +03002002> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002003
2004The result of this code is:
2005
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002006```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002007fizz -> 'Fizz'
2008buzz -> 'Buzz!'
2009fizz -> 'Fizz'
2010fizz -> 'Fizz'
2011buzz -> 'Buzz!'
2012fizz -> 'Fizz'
2013buzz -> 'Buzz!'
2014```
2015
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002016<!--- TEST -->
2017
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002018### Selecting on close
2019
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002020The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed and the corresponding
2021`select` throws an exception. We can use [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] clause to perform a
Roman Elizarova84730b2017-02-22 11:58:50 +03002022specific action when the channel is closed. The following example also shows that `select` is an expression that returns
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002023the result of its selected clause:
2024
2025```kotlin
2026suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
2027 select<String> {
2028 a.onReceiveOrNull { value ->
2029 if (value == null)
2030 "Channel 'a' is closed"
2031 else
2032 "a -> '$value'"
2033 }
2034 b.onReceiveOrNull { value ->
2035 if (value == null)
2036 "Channel 'b' is closed"
2037 else
2038 "b -> '$value'"
2039 }
2040 }
2041```
2042
Roman Elizarova84730b2017-02-22 11:58:50 +03002043Let's use it with channel `a` that produces "Hello" string four times and
2044channel `b` that produces "World" four times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002045
2046```kotlin
2047fun main(args: Array<String>) = runBlocking<Unit> {
2048 // we are using the context of the main thread in this example for predictability ...
Roman Elizarov43e3af72017-07-21 16:01:31 +03002049 val a = produce<String>(coroutineContext) {
Roman Elizarova84730b2017-02-22 11:58:50 +03002050 repeat(4) { send("Hello $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002051 }
Roman Elizarov43e3af72017-07-21 16:01:31 +03002052 val b = produce<String>(coroutineContext) {
Roman Elizarova84730b2017-02-22 11:58:50 +03002053 repeat(4) { send("World $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002054 }
2055 repeat(8) { // print first eight results
2056 println(selectAorB(a, b))
2057 }
Roman Elizarov8b38fa22017-09-27 17:44:31 +03002058 coroutineContext.cancelChildren()
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002059}
2060```
2061
Roman Elizarove8d79342017-08-29 15:21:21 +03002062> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002063
Roman Elizarova84730b2017-02-22 11:58:50 +03002064The result of this code is quite interesting, so we'll analyze it in mode detail:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002065
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002066```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002067a -> 'Hello 0'
2068a -> 'Hello 1'
2069b -> 'World 0'
2070a -> 'Hello 2'
2071a -> 'Hello 3'
2072b -> 'World 1'
2073Channel 'a' is closed
2074Channel 'a' is closed
2075```
2076
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002077<!--- TEST -->
2078
Roman Elizarova84730b2017-02-22 11:58:50 +03002079There are couple of observations to make out of it.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002080
2081First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time,
2082the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel,
Roman Elizarova84730b2017-02-22 11:58:50 +03002083being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002084time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
2085
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002086The second observation, is that [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] gets immediately selected when the
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002087channel is already closed.
2088
2089### Selecting to send
2090
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002091Select expression has [onSend][SendChannel.onSend] clause that can be used for a great good in combination
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002092with a biased nature of selection.
2093
Roman Elizarova84730b2017-02-22 11:58:50 +03002094Let us write an example of producer of integers that sends its values to a `side` channel when
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002095the consumers on its primary channel cannot keep up with it:
2096
Roman Elizarov8b38fa22017-09-27 17:44:31 +03002097<!--- INCLUDE
2098import kotlin.coroutines.experimental.CoroutineContext
2099-->
2100
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002101```kotlin
Roman Elizarov8b38fa22017-09-27 17:44:31 +03002102fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) {
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002103 for (num in 1..10) { // produce 10 numbers from 1 to 10
2104 delay(100) // every 100 ms
2105 select<Unit> {
Roman Elizarova84730b2017-02-22 11:58:50 +03002106 onSend(num) {} // Send to the primary channel
2107 side.onSend(num) {} // or to the side channel
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002108 }
2109 }
2110}
2111```
2112
2113Consumer is going to be quite slow, taking 250 ms to process each number:
2114
2115```kotlin
2116fun main(args: Array<String>) = runBlocking<Unit> {
2117 val side = Channel<Int>() // allocate side channel
Roman Elizarov43e3af72017-07-21 16:01:31 +03002118 launch(coroutineContext) { // this is a very fast consumer for the side channel
Roman Elizarov86349be2017-03-17 16:47:37 +03002119 side.consumeEach { println("Side channel has $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002120 }
Roman Elizarov8b38fa22017-09-27 17:44:31 +03002121 produceNumbers(coroutineContext, side).consumeEach {
Roman Elizarov86349be2017-03-17 16:47:37 +03002122 println("Consuming $it")
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002123 delay(250) // let us digest the consumed number properly, do not hurry
2124 }
2125 println("Done consuming")
Roman Elizarov8b38fa22017-09-27 17:44:31 +03002126 coroutineContext.cancelChildren()
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002127}
2128```
2129
Roman Elizarove8d79342017-08-29 15:21:21 +03002130> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002131
2132So let us see what happens:
2133
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002134```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002135Consuming 1
2136Side channel has 2
2137Side channel has 3
2138Consuming 4
2139Side channel has 5
2140Side channel has 6
2141Consuming 7
2142Side channel has 8
2143Side channel has 9
2144Consuming 10
2145Done consuming
2146```
2147
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002148<!--- TEST -->
2149
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002150### Selecting deferred values
2151
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002152Deferred values can be selected using [onAwait][Deferred.onAwait] clause.
Roman Elizarova84730b2017-02-22 11:58:50 +03002153Let us start with an async function that returns a deferred string value after
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002154a random delay:
2155
2156<!--- INCLUDE .*/example-select-04.kt
2157import java.util.*
2158-->
2159
2160```kotlin
2161fun asyncString(time: Int) = async(CommonPool) {
2162 delay(time.toLong())
2163 "Waited for $time ms"
2164}
2165```
2166
Roman Elizarova84730b2017-02-22 11:58:50 +03002167Let us start a dozen of them with a random delay.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002168
2169```kotlin
2170fun asyncStringsList(): List<Deferred<String>> {
2171 val random = Random(3)
Roman Elizarova84730b2017-02-22 11:58:50 +03002172 return List(12) { asyncString(random.nextInt(1000)) }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002173}
2174```
2175
Roman Elizarova84730b2017-02-22 11:58:50 +03002176Now the main function awaits for the first of them to complete and counts the number of deferred values
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002177that are still active. Note, that we've used here the fact that `select` expression is a Kotlin DSL,
Roman Elizarova84730b2017-02-22 11:58:50 +03002178so we can provide clauses for it using an arbitrary code. In this case we iterate over a list
2179of deferred values to provide `onAwait` clause for each deferred value.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002180
2181```kotlin
2182fun main(args: Array<String>) = runBlocking<Unit> {
2183 val list = asyncStringsList()
2184 val result = select<String> {
2185 list.withIndex().forEach { (index, deferred) ->
2186 deferred.onAwait { answer ->
2187 "Deferred $index produced answer '$answer'"
2188 }
2189 }
2190 }
2191 println(result)
Roman Elizarov7c864d82017-02-27 10:17:50 +03002192 val countActive = list.count { it.isActive }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002193 println("$countActive coroutines are still active")
2194}
2195```
2196
Roman Elizarove8d79342017-08-29 15:21:21 +03002197> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-04.kt)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002198
2199The output is:
2200
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002201```text
Roman Elizarova84730b2017-02-22 11:58:50 +03002202Deferred 4 produced answer 'Waited for 128 ms'
Roman Elizarovd4dcbe22017-02-22 09:57:46 +0300220311 coroutines are still active
2204```
2205
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002206<!--- TEST -->
2207
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002208### Switch over a channel of deferred values
2209
Roman Elizarova84730b2017-02-22 11:58:50 +03002210Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
2211deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002212[onReceiveOrNull][ReceiveChannel.onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002213
2214```kotlin
2215fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String>(CommonPool) {
Roman Elizarova84730b2017-02-22 11:58:50 +03002216 var current = input.receive() // start with first received deferred value
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002217 while (isActive) { // loop while not cancelled/closed
2218 val next = select<Deferred<String>?> { // return next deferred value from this select or null
2219 input.onReceiveOrNull { update ->
2220 update // replaces next value to wait
2221 }
2222 current.onAwait { value ->
2223 send(value) // send value that current deferred has produced
2224 input.receiveOrNull() // and use the next deferred from the input channel
2225 }
2226 }
2227 if (next == null) {
2228 println("Channel was closed")
2229 break // out of loop
2230 } else {
2231 current = next
2232 }
2233 }
2234}
2235```
2236
2237To test it, we'll use a simple async function that resolves to a specified string after a specified time:
2238
2239```kotlin
2240fun asyncString(str: String, time: Long) = async(CommonPool) {
2241 delay(time)
2242 str
2243}
2244```
2245
2246The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
2247data to it:
2248
2249```kotlin
2250fun main(args: Array<String>) = runBlocking<Unit> {
2251 val chan = Channel<Deferred<String>>() // the channel for test
Roman Elizarov43e3af72017-07-21 16:01:31 +03002252 launch(coroutineContext) { // launch printing coroutine
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002253 for (s in switchMapDeferreds(chan))
2254 println(s) // print each received string
2255 }
2256 chan.send(asyncString("BEGIN", 100))
2257 delay(200) // enough time for "BEGIN" to be produced
2258 chan.send(asyncString("Slow", 500))
Roman Elizarova84730b2017-02-22 11:58:50 +03002259 delay(100) // not enough time to produce slow
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002260 chan.send(asyncString("Replace", 100))
Roman Elizarova84730b2017-02-22 11:58:50 +03002261 delay(500) // give it time before the last one
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002262 chan.send(asyncString("END", 500))
2263 delay(1000) // give it time to process
Roman Elizarova84730b2017-02-22 11:58:50 +03002264 chan.close() // close the channel ...
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002265 delay(500) // and wait some time to let it finish
2266}
2267```
2268
Roman Elizarove8d79342017-08-29 15:21:21 +03002269> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-05.kt)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002270
2271The result of this code:
2272
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002273```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002274BEGIN
2275Replace
2276END
2277Channel was closed
2278```
2279
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002280<!--- TEST -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002281
Roman Elizarov8db17332017-03-09 12:40:45 +03002282## Further reading
2283
2284* [Guide to UI programming with coroutines](ui/coroutines-guide-ui.md)
Roman Elizarov8a4a8e12017-03-09 19:52:58 +03002285* [Guide to reactive streams with coroutines](reactive/coroutines-guide-reactive.md)
Roman Elizarov8db17332017-03-09 12:40:45 +03002286* [Coroutines design document (KEEP)](https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md)
2287* [Full kotlinx.coroutines API reference](http://kotlin.github.io/kotlinx.coroutines)
2288
Roman Elizarove7e2ad12017-05-17 14:47:31 +03002289<!--- MODULE kotlinx-coroutines-core -->
Roman Elizarove0c817d2017-02-10 10:22:01 +03002290<!--- INDEX kotlinx.coroutines.experimental -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002291[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
2292[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/delay.html
2293[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
Roman Elizarove82dee72017-08-18 16:49:09 +03002294[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
Roman Elizarov8b38fa22017-09-27 17:44:31 +03002295[cancelAndJoin]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/cancel-and-join.html
Roman Elizarov88396732017-09-27 21:30:47 +03002296[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/cancel.html
2297[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/join.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002298[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-cancellation-exception.html
2299[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002300[CoroutineScope.isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/is-active.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002301[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
2302[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
2303[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
2304[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
Roman Elizarov63f6ea22017-09-06 18:42:34 +03002305[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout-or-null.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002306[async]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/async.html
2307[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
Roman Elizarovecda27f2017-04-06 23:06:26 +03002308[CoroutineStart.LAZY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-start/-l-a-z-y.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002309[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html
2310[Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/start.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002311[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
2312[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
Roman Elizarov43e3af72017-07-21 16:01:31 +03002313[CoroutineScope.coroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/coroutine-context.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002314[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002315[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002316[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
Roman Elizarov88396732017-09-27 21:30:47 +03002317[Job()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job.html
2318[cancelChildren]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/kotlin.coroutines.experimental.-coroutine-context/cancel-children.html
Roman Elizarove82dee72017-08-18 16:49:09 +03002319[CompletableDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-completable-deferred/index.html
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002320[Deferred.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/on-await.html
Roman Elizarovf5bc0472017-02-22 11:38:13 +03002321<!--- INDEX kotlinx.coroutines.experimental.sync -->
Roman Elizarove82dee72017-08-18 16:49:09 +03002322[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002323[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/lock.html
2324[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/unlock.html
Roman Elizarov88396732017-09-27 21:30:47 +03002325[withLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/with-lock.html
Roman Elizarove0c817d2017-02-10 10:22:01 +03002326<!--- INDEX kotlinx.coroutines.experimental.channels -->
Roman Elizarove82dee72017-08-18 16:49:09 +03002327[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002328[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
2329[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
2330[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html
Roman Elizarova5e653f2017-02-13 13:49:55 +03002331[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
Roman Elizarov86349be2017-03-17 16:47:37 +03002332[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
Roman Elizarov88396732017-09-27 21:30:47 +03002333[Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel.html
Roman Elizarovc0e19f82017-02-27 11:59:14 +03002334[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
Roman Elizarov8a5564d2017-09-06 18:48:22 +03002335[ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive.html
2336[ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive-or-null.html
2337[SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/on-send.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002338<!--- INDEX kotlinx.coroutines.experimental.selects -->
2339[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002340<!--- END -->