blob: edb6374c76d8dcdffbf70af52a647685a69288c8 [file] [log] [blame] [view]
Roman Elizarova5e653f2017-02-13 13:49:55 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9]+)\.kt
2/*
3 * Copyright 2016-2017 JetBrains s.r.o.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
Roman Elizarovf16fd272017-02-07 11:26:00 +030017
Roman Elizarova5e653f2017-02-13 13:49:55 +030018// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
19package guide.$$1.example$$2
Roman Elizarovf16fd272017-02-07 11:26:00 +030020
Roman Elizarova5e653f2017-02-13 13:49:55 +030021import kotlinx.coroutines.experimental.*
Roman Elizarovf16fd272017-02-07 11:26:00 +030022-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +030023<!--- KNIT kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
24<!--- TEST_OUT kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
25// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
26package guide.test
27
28import org.junit.Test
29
30class GuideTest {
31-->
Roman Elizarovf16fd272017-02-07 11:26:00 +030032
Roman Elizarov7deefb82017-01-31 10:33:17 +030033# Guide to kotlinx.coroutines by example
34
35This is a short guide on core features of `kotlinx.coroutines` with a series of examples.
36
Roman Elizarov2a638922017-03-04 10:22:43 +030037## Introduction and setup
38
39Kotlin, as a language, provides only minimal low-level APIs in its standard library to enable various other
40libraries to utilize coroutines. Unlike many other languages with similar capabilities, `async` and `await`
41are not keywords in Kotlin and are not even part of its standard library.
42
43`kotlinx.coroutines` in one such rich library. It contains a number of high-level
44coroutine-enabled primitives that this guide covers, including `async` and `await`.
45You need to add a dependency on `kotlinx-coroutines-core` module as explained
46[here](README.md#using-in-your-projects) to use primitives from this guide in your projects.
47
Roman Elizarov1293ccd2017-02-01 18:49:54 +030048## Table of contents
49
Roman Elizarovfa7723e2017-02-06 11:17:51 +030050<!--- TOC -->
51
Roman Elizarov1293ccd2017-02-01 18:49:54 +030052* [Coroutine basics](#coroutine-basics)
53 * [Your first coroutine](#your-first-coroutine)
54 * [Bridging blocking and non-blocking worlds](#bridging-blocking-and-non-blocking-worlds)
55 * [Waiting for a job](#waiting-for-a-job)
56 * [Extract function refactoring](#extract-function-refactoring)
57 * [Coroutines ARE light-weight](#coroutines-are-light-weight)
58 * [Coroutines are like daemon threads](#coroutines-are-like-daemon-threads)
59* [Cancellation and timeouts](#cancellation-and-timeouts)
60 * [Cancelling coroutine execution](#cancelling-coroutine-execution)
61 * [Cancellation is cooperative](#cancellation-is-cooperative)
62 * [Making computation code cancellable](#making-computation-code-cancellable)
63 * [Closing resources with finally](#closing-resources-with-finally)
64 * [Run non-cancellable block](#run-non-cancellable-block)
65 * [Timeout](#timeout)
66* [Composing suspending functions](#composing-suspending-functions)
67 * [Sequential by default](#sequential-by-default)
Roman Elizarov32d95322017-02-09 15:57:31 +030068 * [Concurrent using async](#concurrent-using-async)
69 * [Lazily started async](#lazily-started-async)
70 * [Async-style functions](#async-style-functions)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030071* [Coroutine context and dispatchers](#coroutine-context-and-dispatchers)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030072 * [Dispatchers and threads](#dispatchers-and-threads)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030073 * [Unconfined vs confined dispatcher](#unconfined-vs-confined-dispatcher)
74 * [Debugging coroutines and threads](#debugging-coroutines-and-threads)
75 * [Jumping between threads](#jumping-between-threads)
76 * [Job in the context](#job-in-the-context)
77 * [Children of a coroutine](#children-of-a-coroutine)
78 * [Combining contexts](#combining-contexts)
79 * [Naming coroutines for debugging](#naming-coroutines-for-debugging)
Roman Elizarov2fd7cb32017-02-11 23:18:59 +030080 * [Cancellation via explicit job](#cancellation-via-explicit-job)
Roman Elizarovb7721cf2017-02-03 19:23:08 +030081* [Channels](#channels)
82 * [Channel basics](#channel-basics)
83 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
84 * [Building channel producers](#building-channel-producers)
85 * [Pipelines](#pipelines)
86 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
87 * [Fan-out](#fan-out)
88 * [Fan-in](#fan-in)
89 * [Buffered channels](#buffered-channels)
Roman Elizarovb0517ba2017-02-27 14:03:14 +030090 * [Channels are fair](#channels-are-fair)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030091* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
92 * [The problem](#the-problem)
Roman Elizarov1e459602017-02-27 11:05:17 +030093 * [Volatiles are of no help](#volatiles-are-of-no-help)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030094 * [Thread-safe data structures](#thread-safe-data-structures)
Roman Elizarov1e459602017-02-27 11:05:17 +030095 * [Thread confinement fine-grained](#thread-confinement-fine-grained)
96 * [Thread confinement coarse-grained](#thread-confinement-coarse-grained)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030097 * [Mutual exclusion](#mutual-exclusion)
98 * [Actors](#actors)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +030099* [Select expression](#select-expression)
100 * [Selecting from channels](#selecting-from-channels)
101 * [Selecting on close](#selecting-on-close)
102 * [Selecting to send](#selecting-to-send)
103 * [Selecting deferred values](#selecting-deferred-values)
104 * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values)
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300105
Roman Elizarova5e653f2017-02-13 13:49:55 +0300106<!--- END_TOC -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300107
108## Coroutine basics
109
110This section covers basic coroutine concepts.
111
112### Your first coroutine
Roman Elizarov7deefb82017-01-31 10:33:17 +0300113
114Run the following code:
115
116```kotlin
117fun main(args: Array<String>) {
118 launch(CommonPool) { // create new coroutine in common thread pool
119 delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
120 println("World!") // print after delay
121 }
122 println("Hello,") // main function continues while coroutine is delayed
123 Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
124}
125```
126
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300127> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300128
129Run this code:
130
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300131```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300132Hello,
133World!
134```
135
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300136<!--- TEST -->
137
Roman Elizarov419a6c82017-02-09 18:36:22 +0300138Essentially, coroutines are light-weight threads.
139They are launched with [launch] _coroutine builder_.
140You can achieve the same result replacing
Roman Elizarov7deefb82017-01-31 10:33:17 +0300141`launch(CommonPool) { ... }` with `thread { ... }` and `delay(...)` with `Thread.sleep(...)`. Try it.
142
143If you start by replacing `launch(CommonPool)` by `thread`, the compiler produces the following error:
144
145```
146Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
147```
148
Roman Elizarov419a6c82017-02-09 18:36:22 +0300149That is because [delay] is a special _suspending function_ that does not block a thread, but _suspends_
Roman Elizarov7deefb82017-01-31 10:33:17 +0300150coroutine and it can be only used from a coroutine.
151
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300152### Bridging blocking and non-blocking worlds
Roman Elizarov7deefb82017-01-31 10:33:17 +0300153
154The first example mixes _non-blocking_ `delay(...)` and _blocking_ `Thread.sleep(...)` in the same
155code of `main` function. It is easy to get lost. Let's cleanly separate blocking and non-blocking
Roman Elizarov419a6c82017-02-09 18:36:22 +0300156worlds by using [runBlocking]:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300157
158```kotlin
159fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
160 launch(CommonPool) { // create new coroutine in common thread pool
161 delay(1000L)
162 println("World!")
163 }
164 println("Hello,") // main coroutine continues while child is delayed
165 delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
166}
167```
168
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300169> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300170
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300171<!--- TEST
172Hello,
173World!
174-->
175
Roman Elizarov419a6c82017-02-09 18:36:22 +0300176The result is the same, but this code uses only non-blocking [delay].
Roman Elizarov7deefb82017-01-31 10:33:17 +0300177
178`runBlocking { ... }` works as an adaptor that is used here to start the top-level main coroutine.
179The regular code outside of `runBlocking` _blocks_, until the coroutine inside `runBlocking` is active.
180
181This is also a way to write unit-tests for suspending functions:
182
183```kotlin
184class MyTest {
185 @Test
186 fun testMySuspendingFunction() = runBlocking<Unit> {
187 // here we can use suspending functions using any assertion style that we like
188 }
189}
190```
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300191
192<!--- CLEAR -->
Roman Elizarov7deefb82017-01-31 10:33:17 +0300193
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300194### Waiting for a job
Roman Elizarov7deefb82017-01-31 10:33:17 +0300195
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300196Delaying for a time while another coroutine is working is not a good approach. Let's explicitly
Roman Elizarov419a6c82017-02-09 18:36:22 +0300197wait (in a non-blocking way) until the background [Job] that we have launched is complete:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300198
199```kotlin
200fun main(args: Array<String>) = runBlocking<Unit> {
201 val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
202 delay(1000L)
203 println("World!")
204 }
205 println("Hello,")
206 job.join() // wait until child coroutine completes
207}
208```
209
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300210> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300211
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300212<!--- TEST
213Hello,
214World!
215-->
216
Roman Elizarov7deefb82017-01-31 10:33:17 +0300217Now the result is still the same, but the code of the main coroutine is not tied to the duration of
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300218the background job in any way. Much better.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300219
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300220### Extract function refactoring
Roman Elizarov7deefb82017-01-31 10:33:17 +0300221
222Let's extract the block of code inside `launch(CommonPool} { ... }` into a separate function. When you
223perform "Extract function" refactoring on this code you get a new function with `suspend` modifier.
224That is your first _suspending function_. Suspending functions can be used inside coroutines
225just like regular functions, but their additional feature is that they can, in turn,
226use other suspending functions, like `delay` in this example, to _suspend_ execution of a coroutine.
227
228```kotlin
229fun main(args: Array<String>) = runBlocking<Unit> {
230 val job = launch(CommonPool) { doWorld() }
231 println("Hello,")
232 job.join()
233}
234
235// this is your first suspending function
236suspend fun doWorld() {
237 delay(1000L)
238 println("World!")
239}
240```
241
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300242> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-04.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300243
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300244<!--- TEST
245Hello,
246World!
247-->
248
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300249### Coroutines ARE light-weight
Roman Elizarov7deefb82017-01-31 10:33:17 +0300250
251Run the following code:
252
253```kotlin
254fun main(args: Array<String>) = runBlocking<Unit> {
255 val jobs = List(100_000) { // create a lot of coroutines and list their jobs
256 launch(CommonPool) {
257 delay(1000L)
258 print(".")
259 }
260 }
261 jobs.forEach { it.join() } // wait for all jobs to complete
262}
263```
264
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300265> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-05.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300266
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300267<!--- TEST lines.size == 1 && lines[0] == ".".repeat(100_000) -->
268
Roman Elizarov7deefb82017-01-31 10:33:17 +0300269It starts 100K coroutines and, after a second, each coroutine prints a dot.
270Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
271
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300272### Coroutines are like daemon threads
Roman Elizarov7deefb82017-01-31 10:33:17 +0300273
274The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300275returns from the main function after some delay:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300276
277```kotlin
278fun main(args: Array<String>) = runBlocking<Unit> {
279 launch(CommonPool) {
280 repeat(1000) { i ->
281 println("I'm sleeping $i ...")
282 delay(500L)
283 }
284 }
285 delay(1300L) // just quit after delay
286}
287```
288
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300289> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-06.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300290
291You can run and see that it prints three lines and terminates:
292
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300293```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300294I'm sleeping 0 ...
295I'm sleeping 1 ...
296I'm sleeping 2 ...
297```
298
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300299<!--- TEST -->
300
Roman Elizarov7deefb82017-01-31 10:33:17 +0300301Active coroutines do not keep the process alive. They are like daemon threads.
302
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300303## Cancellation and timeouts
304
305This section covers coroutine cancellation and timeouts.
306
307### Cancelling coroutine execution
Roman Elizarov7deefb82017-01-31 10:33:17 +0300308
309In small application the return from "main" method might sound like a good idea to get all coroutines
310implicitly terminated. In a larger, long-running application, you need finer-grained control.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300311The [launch] function returns a [Job] that can be used to cancel running coroutine:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300312
313```kotlin
314fun main(args: Array<String>) = runBlocking<Unit> {
315 val job = launch(CommonPool) {
316 repeat(1000) { i ->
317 println("I'm sleeping $i ...")
318 delay(500L)
319 }
320 }
321 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300322 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300323 job.cancel() // cancels the job
324 delay(1300L) // delay a bit to ensure it was cancelled indeed
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300325 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300326}
327```
328
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300329> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300330
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300331It produces the following output:
332
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300333```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300334I'm sleeping 0 ...
335I'm sleeping 1 ...
336I'm sleeping 2 ...
337main: I'm tired of waiting!
338main: Now I can quit.
339```
340
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300341<!--- TEST -->
342
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300343As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
344
345### Cancellation is cooperative
Roman Elizarov7deefb82017-01-31 10:33:17 +0300346
Tair Rzayevaf734622017-02-01 22:30:16 +0200347Coroutine cancellation is _cooperative_. A coroutine code has to cooperate to be cancellable.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300348All the suspending functions in `kotlinx.coroutines` are _cancellable_. They check for cancellation of
Roman Elizarov419a6c82017-02-09 18:36:22 +0300349coroutine and throw [CancellationException] when cancelled. However, if a coroutine is working in
Roman Elizarov7deefb82017-01-31 10:33:17 +0300350a computation and does not check for cancellation, then it cannot be cancelled, like the following
351example shows:
352
353```kotlin
354fun main(args: Array<String>) = runBlocking<Unit> {
355 val job = launch(CommonPool) {
356 var nextPrintTime = 0L
357 var i = 0
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300358 while (i < 10) { // computation loop
Roman Elizarov7deefb82017-01-31 10:33:17 +0300359 val currentTime = System.currentTimeMillis()
360 if (currentTime >= nextPrintTime) {
361 println("I'm sleeping ${i++} ...")
362 nextPrintTime = currentTime + 500L
363 }
364 }
365 }
366 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300367 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300368 job.cancel() // cancels the job
369 delay(1300L) // delay a bit to see if it was cancelled....
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300370 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300371}
372```
373
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300374> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300375
376Run it to see that it continues to print "I'm sleeping" even after cancellation.
377
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300378<!--- TEST
379I'm sleeping 0 ...
380I'm sleeping 1 ...
381I'm sleeping 2 ...
382main: I'm tired of waiting!
383I'm sleeping 3 ...
384I'm sleeping 4 ...
385I'm sleeping 5 ...
386main: Now I can quit.
387-->
388
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300389### Making computation code cancellable
Roman Elizarov7deefb82017-01-31 10:33:17 +0300390
391There are two approaches to making computation code cancellable. The first one is to periodically
Roman Elizarov419a6c82017-02-09 18:36:22 +0300392invoke a suspending function. There is a [yield] function that is a good choice for that purpose.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300393The other one is to explicitly check the cancellation status. Let us try the later approach.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300394
395Replace `while (true)` in the previous example with `while (isActive)` and rerun it.
396
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300397```kotlin
398fun main(args: Array<String>) = runBlocking<Unit> {
399 val job = launch(CommonPool) {
400 var nextPrintTime = 0L
401 var i = 0
402 while (isActive) { // cancellable computation loop
403 val currentTime = System.currentTimeMillis()
404 if (currentTime >= nextPrintTime) {
405 println("I'm sleeping ${i++} ...")
406 nextPrintTime = currentTime + 500L
407 }
408 }
409 }
410 delay(1300L) // delay a bit
411 println("main: I'm tired of waiting!")
412 job.cancel() // cancels the job
413 delay(1300L) // delay a bit to see if it was cancelled....
414 println("main: Now I can quit.")
415}
416```
417
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300418> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300419
Roman Elizarov419a6c82017-02-09 18:36:22 +0300420As you can see, now this loop can be cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
421the code of coroutines via [CoroutineScope] object.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300422
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300423<!--- TEST
424I'm sleeping 0 ...
425I'm sleeping 1 ...
426I'm sleeping 2 ...
427main: I'm tired of waiting!
428main: Now I can quit.
429-->
430
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300431### Closing resources with finally
432
Roman Elizarov419a6c82017-02-09 18:36:22 +0300433Cancellable suspending functions throw [CancellationException] on cancellation which can be handled in
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300434all the usual way. For example, the `try {...} finally {...}` and Kotlin `use` function execute their
435finalization actions normally when coroutine is cancelled:
436
437```kotlin
438fun main(args: Array<String>) = runBlocking<Unit> {
439 val job = launch(CommonPool) {
440 try {
441 repeat(1000) { i ->
442 println("I'm sleeping $i ...")
443 delay(500L)
444 }
445 } finally {
446 println("I'm running finally")
447 }
448 }
449 delay(1300L) // delay a bit
450 println("main: I'm tired of waiting!")
451 job.cancel() // cancels the job
452 delay(1300L) // delay a bit to ensure it was cancelled indeed
453 println("main: Now I can quit.")
454}
455```
456
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300457> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300458
459The example above produces the following output:
460
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300461```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300462I'm sleeping 0 ...
463I'm sleeping 1 ...
464I'm sleeping 2 ...
465main: I'm tired of waiting!
466I'm running finally
467main: Now I can quit.
468```
469
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300470<!--- TEST -->
471
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300472### Run non-cancellable block
473
474Any attempt to use a suspending function in the `finally` block of the previous example will cause
Roman Elizarov419a6c82017-02-09 18:36:22 +0300475[CancellationException], because the coroutine running this code is cancelled. Usually, this is not a
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300476problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
477communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
478rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
Roman Elizarov419a6c82017-02-09 18:36:22 +0300479`run(NonCancellable) {...}` using [run] function and [NonCancellable] context as the following example shows:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300480
481```kotlin
482fun main(args: Array<String>) = runBlocking<Unit> {
483 val job = launch(CommonPool) {
484 try {
485 repeat(1000) { i ->
486 println("I'm sleeping $i ...")
487 delay(500L)
488 }
489 } finally {
490 run(NonCancellable) {
491 println("I'm running finally")
492 delay(1000L)
493 println("And I've just delayed for 1 sec because I'm non-cancellable")
494 }
495 }
496 }
497 delay(1300L) // delay a bit
498 println("main: I'm tired of waiting!")
499 job.cancel() // cancels the job
500 delay(1300L) // delay a bit to ensure it was cancelled indeed
501 println("main: Now I can quit.")
502}
503```
504
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300505> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300506
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300507<!--- TEST
508I'm sleeping 0 ...
509I'm sleeping 1 ...
510I'm sleeping 2 ...
511main: I'm tired of waiting!
512I'm running finally
513And I've just delayed for 1 sec because I'm non-cancellable
514main: Now I can quit.
515-->
516
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300517### Timeout
518
519The most obvious reason to cancel coroutine execution in practice,
520is because its execution time has exceeded some timeout.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300521While you can manually track the reference to the corresponding [Job] and launch a separate coroutine to cancel
522the tracked one after delay, there is a ready to use [withTimeout] function that does it.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300523Look at the following example:
524
525```kotlin
526fun main(args: Array<String>) = runBlocking<Unit> {
527 withTimeout(1300L) {
528 repeat(1000) { i ->
529 println("I'm sleeping $i ...")
530 delay(500L)
531 }
532 }
533}
534```
535
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300536> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-06.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300537
538It produces the following output:
539
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300540```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300541I'm sleeping 0 ...
542I'm sleeping 1 ...
543I'm sleeping 2 ...
544Exception in thread "main" java.util.concurrent.CancellationException: Timed out waiting for 1300 MILLISECONDS
545```
546
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300547<!--- TEST STARTS_WITH -->
548
Roman Elizarov419a6c82017-02-09 18:36:22 +0300549We have not seen the [CancellationException] stack trace printed on the console before. That is because
Roman Elizarov7c864d82017-02-27 10:17:50 +0300550inside a cancelled coroutine `CancellationException` is considered to be a normal reason for coroutine completion.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300551However, in this example we have used `withTimeout` right inside the `main` function.
552
553Because cancellation is just an exception, all the resources will be closed in a usual way.
554You can wrap the code with timeout in `try {...} catch (e: CancellationException) {...}` block if
555you need to do some additional action specifically on timeout.
556
557## Composing suspending functions
558
559This section covers various approaches to composition of suspending functions.
560
561### Sequential by default
562
563Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300564remote service call or computation. We just pretend they are useful, but actually each one just
565delays for a second for the purpose of this example:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300566
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300567<!--- INCLUDE .*/example-compose-([0-9]+).kt
568import kotlin.system.measureTimeMillis
569-->
570
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300571```kotlin
572suspend fun doSomethingUsefulOne(): Int {
573 delay(1000L) // pretend we are doing something useful here
574 return 13
575}
576
577suspend fun doSomethingUsefulTwo(): Int {
578 delay(1000L) // pretend we are doing something useful here, too
579 return 29
580}
581```
582
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300583<!--- INCLUDE .*/example-compose-([0-9]+).kt -->
584
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300585What do we do if need to invoke them _sequentially_ -- first `doSomethingUsefulOne` _and then_
586`doSomethingUsefulTwo` and compute the sum of their results?
587In practise we do this if we use the results of the first function to make a decision on whether we need
588to invoke the second one or to decide on how to invoke it.
589
590We just use a normal sequential invocation, because the code in the coroutine, just like in the regular
Roman Elizarov32d95322017-02-09 15:57:31 +0300591code, is _sequential_ by default. The following example demonstrates it by measuring the total
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300592time it takes to execute both suspending functions:
593
594```kotlin
595fun main(args: Array<String>) = runBlocking<Unit> {
596 val time = measureTimeMillis {
597 val one = doSomethingUsefulOne()
598 val two = doSomethingUsefulTwo()
599 println("The answer is ${one + two}")
600 }
601 println("Completed in $time ms")
602}
603```
604
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300605> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-01.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300606
607It produces something like this:
608
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300609```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300610The answer is 42
611Completed in 2017 ms
612```
613
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300614<!--- TEST FLEXIBLE_TIME -->
615
Roman Elizarov32d95322017-02-09 15:57:31 +0300616### Concurrent using async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300617
618What if there are no dependencies between invocation of `doSomethingUsefulOne` and `doSomethingUsefulTwo` and
Roman Elizarov419a6c82017-02-09 18:36:22 +0300619we want to get the answer faster, by doing both _concurrently_? This is where [async] comes to help.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300620
Roman Elizarov419a6c82017-02-09 18:36:22 +0300621Conceptually, [async] is just like [launch]. It starts a separate coroutine which is a light-weight thread
622that works concurrently with all the other coroutines. The difference is that `launch` returns a [Job] and
623does not carry any resulting value, while `async` returns a [Deferred] -- a light-weight non-blocking future
Roman Elizarov32d95322017-02-09 15:57:31 +0300624that represents a promise to provide a result later. You can use `.await()` on a deferred value to get its eventual result,
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300625but `Deferred` is also a `Job`, so you can cancel it if needed.
626
627```kotlin
628fun main(args: Array<String>) = runBlocking<Unit> {
629 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300630 val one = async(CommonPool) { doSomethingUsefulOne() }
631 val two = async(CommonPool) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300632 println("The answer is ${one.await() + two.await()}")
633 }
634 println("Completed in $time ms")
635}
636```
637
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300638> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-02.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300639
640It produces something like this:
641
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300642```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300643The answer is 42
644Completed in 1017 ms
645```
646
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300647<!--- TEST FLEXIBLE_TIME -->
648
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300649This is twice as fast, because we have concurrent execution of two coroutines.
650Note, that concurrency with coroutines is always explicit.
651
Roman Elizarov32d95322017-02-09 15:57:31 +0300652### Lazily started async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300653
Roman Elizarov419a6c82017-02-09 18:36:22 +0300654There is a laziness option to [async] with `start = false` parameter.
655It starts coroutine only when its result is needed by some
656[await][Deferred.await] or if a [start][Job.start] function
Roman Elizarov32d95322017-02-09 15:57:31 +0300657is invoked. Run the following example that differs from the previous one only by this option:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300658
659```kotlin
660fun main(args: Array<String>) = runBlocking<Unit> {
661 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300662 val one = async(CommonPool, start = false) { doSomethingUsefulOne() }
663 val two = async(CommonPool, start = false) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300664 println("The answer is ${one.await() + two.await()}")
665 }
666 println("Completed in $time ms")
667}
668```
669
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300670> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-03.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300671
672It produces something like this:
673
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300674```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300675The answer is 42
676Completed in 2017 ms
677```
678
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300679<!--- TEST FLEXIBLE_TIME -->
680
Roman Elizarov32d95322017-02-09 15:57:31 +0300681So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
682for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
683the standard `lazy` function in cases when computation of the value involves suspending functions.
684
685### Async-style functions
686
687We can define async-style functions that invoke `doSomethingUsefulOne` and `doSomethingUsefulTwo`
Roman Elizarov419a6c82017-02-09 18:36:22 +0300688_asynchronously_ using [async] coroutine builder. It is a good style to name such functions with
Roman Elizarov32d95322017-02-09 15:57:31 +0300689either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous
690computation and one needs to use the resulting deferred value to get the result.
691
692```kotlin
693// The result type of asyncSomethingUsefulOne is Deferred<Int>
694fun asyncSomethingUsefulOne() = async(CommonPool) {
695 doSomethingUsefulOne()
696}
697
698// The result type of asyncSomethingUsefulTwo is Deferred<Int>
699fun asyncSomethingUsefulTwo() = async(CommonPool) {
700 doSomethingUsefulTwo()
701}
702```
703
704Note, that these `asyncXXX` function are **not** _suspending_ functions. They can be used from anywhere.
705However, their use always implies asynchronous (here meaning _concurrent_) execution of their action
706with the invoking code.
707
708The following example shows their use outside of coroutine:
709
710```kotlin
711// note, that we don't have `runBlocking` to the right of `main` in this example
712fun main(args: Array<String>) {
713 val time = measureTimeMillis {
714 // we can initiate async actions outside of a coroutine
715 val one = asyncSomethingUsefulOne()
716 val two = asyncSomethingUsefulTwo()
717 // but waiting for a result must involve either suspending or blocking.
718 // here we use `runBlocking { ... }` to block the main thread while waiting for the result
719 runBlocking {
720 println("The answer is ${one.await() + two.await()}")
721 }
722 }
723 println("Completed in $time ms")
724}
725```
726
727> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300728
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300729<!--- TEST FLEXIBLE_TIME
730The answer is 42
731Completed in 1085 ms
732-->
733
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300734## Coroutine context and dispatchers
735
Roman Elizarov32d95322017-02-09 15:57:31 +0300736We've already seen `launch(CommonPool) {...}`, `async(CommonPool) {...}`, `run(NonCancellable) {...}`, etc.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300737In these code snippets [CommonPool] and [NonCancellable] are _coroutine contexts_.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300738This section covers other available choices.
739
740### Dispatchers and threads
741
Roman Elizarov419a6c82017-02-09 18:36:22 +0300742Coroutine context includes a [_coroutine dispatcher_][CoroutineDispatcher] which determines what thread or threads
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300743the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
744to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:
745
746```kotlin
747fun main(args: Array<String>) = runBlocking<Unit> {
748 val jobs = arrayListOf<Job>()
749 jobs += launch(Unconfined) { // not confined -- will work with main thread
750 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
751 }
752 jobs += launch(context) { // context of the parent, runBlocking coroutine
753 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
754 }
755 jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
756 println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
757 }
758 jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
759 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
760 }
761 jobs.forEach { it.join() }
762}
763```
764
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300765> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-01.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300766
767It produces the following output (maybe in different order):
768
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300769```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300770 'Unconfined': I'm working in thread main
771 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
772 'newSTC': I'm working in thread MyOwnThread
773 'context': I'm working in thread main
774```
775
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300776<!--- TEST LINES_START_UNORDERED -->
777
Roman Elizarov419a6c82017-02-09 18:36:22 +0300778The difference between parent [context][CoroutineScope.context] and [Unconfined] context will be shown later.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300779
780### Unconfined vs confined dispatcher
781
Roman Elizarov419a6c82017-02-09 18:36:22 +0300782The [Unconfined] coroutine dispatcher starts coroutine in the caller thread, but only until the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300783first suspension point. After suspension it resumes in the thread that is fully determined by the
784suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not
785consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
786
Roman Elizarov419a6c82017-02-09 18:36:22 +0300787On the other side, [context][CoroutineScope.context] property that is available inside the block of any coroutine
788via [CoroutineScope] interface, is a reference to a context of this particular coroutine.
789This way, a parent context can be inherited. The default context of [runBlocking], in particular,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300790is confined to be invoker thread, so inheriting it has the effect of confining execution to
791this thread with a predictable FIFO scheduling.
792
793```kotlin
794fun main(args: Array<String>) = runBlocking<Unit> {
795 val jobs = arrayListOf<Job>()
796 jobs += launch(Unconfined) { // not confined -- will work with main thread
797 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
798 delay(1000)
799 println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
800 }
801 jobs += launch(context) { // context of the parent, runBlocking coroutine
802 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
803 delay(1000)
804 println(" 'context': After delay in thread ${Thread.currentThread().name}")
805 }
806 jobs.forEach { it.join() }
807}
808```
809
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300810> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-contest-02.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300811
812Produces the output:
813
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300814```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300815 'Unconfined': I'm working in thread main
816 'context': I'm working in thread main
817 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
818 'context': After delay in thread main
819```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300820
821<!--- TEST LINES_START -->
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300822
Roman Elizarov7c864d82017-02-27 10:17:50 +0300823So, the coroutine that had inherited `context` of `runBlocking {...}` continues to execute in the `main` thread,
Roman Elizarov419a6c82017-02-09 18:36:22 +0300824while the unconfined one had resumed in the scheduler thread that [delay] function is using.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300825
826### Debugging coroutines and threads
827
Roman Elizarov419a6c82017-02-09 18:36:22 +0300828Coroutines can suspend on one thread and resume on another thread with [Unconfined] dispatcher or
829with a multi-threaded dispatcher like [CommonPool]. Even with a single-threaded dispatcher it might be hard to
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300830figure out what coroutine was doing what, where, and when. The common approach to debugging applications with
831threads is to print the thread name in the log file on each log statement. This feature is universally supported
832by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
833`kotlinx.coroutines` includes debugging facilities to make it easier.
834
835Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
836
837```kotlin
838fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
839
840fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov32d95322017-02-09 15:57:31 +0300841 val a = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300842 log("I'm computing a piece of the answer")
843 6
844 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300845 val b = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300846 log("I'm computing another piece of the answer")
847 7
848 }
849 log("The answer is ${a.await() * b.await()}")
850}
851```
852
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300853> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300854
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300855There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300856and two coroutines computing deferred values `a` (#2) and `b` (#3).
857They are all executing in the context of `runBlocking` and are confined to the main thread.
858The output of this code is:
859
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300860```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300861[main @coroutine#2] I'm computing a piece of the answer
862[main @coroutine#3] I'm computing another piece of the answer
863[main @coroutine#1] The answer is 42
864```
865
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300866<!--- TEST -->
867
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300868The `log` function prints the name of the thread in square brackets and you can see, that it is the `main`
869thread, but the identifier of the currently executing coroutine is appended to it. This identifier
870is consecutively assigned to all created coroutines when debugging mode is turned on.
871
Roman Elizarov419a6c82017-02-09 18:36:22 +0300872You can read more about debugging facilities in the documentation for [newCoroutineContext] function.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300873
874### Jumping between threads
875
876Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
877
878```kotlin
879fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
880
881fun main(args: Array<String>) {
882 val ctx1 = newSingleThreadContext("Ctx1")
883 val ctx2 = newSingleThreadContext("Ctx2")
884 runBlocking(ctx1) {
885 log("Started in ctx1")
886 run(ctx2) {
887 log("Working in ctx2")
888 }
889 log("Back to ctx1")
890 }
891}
892```
893
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300894> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300895
Roman Elizarov419a6c82017-02-09 18:36:22 +0300896It demonstrates two new techniques. One is using [runBlocking] with an explicitly specified context, and
897the second one is using [run] function to change a context of a coroutine while still staying in the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300898same coroutine as you can see in the output below:
899
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300900```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300901[Ctx1 @coroutine#1] Started in ctx1
902[Ctx2 @coroutine#1] Working in ctx2
903[Ctx1 @coroutine#1] Back to ctx1
904```
905
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300906<!--- TEST -->
907
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300908### Job in the context
909
Roman Elizarov419a6c82017-02-09 18:36:22 +0300910The coroutine [Job] is part of its context. The coroutine can retrieve it from its own context
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300911using `context[Job]` expression:
912
913```kotlin
914fun main(args: Array<String>) = runBlocking<Unit> {
915 println("My job is ${context[Job]}")
916}
917```
918
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300919> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-05.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300920
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300921It produces somethine like
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300922
923```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300924My job is BlockingCoroutine{Active}@65ae6ba4
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300925```
926
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300927<!--- TEST lines.size == 1 && lines[0].startsWith("My job is BlockingCoroutine{Active}@") -->
928
Roman Elizarov419a6c82017-02-09 18:36:22 +0300929So, [isActive][CoroutineScope.isActive] in [CoroutineScope] is just a convenient shortcut for `context[Job]!!.isActive`.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300930
931### Children of a coroutine
932
Roman Elizarov419a6c82017-02-09 18:36:22 +0300933When [context][CoroutineScope.context] of a coroutine is used to launch another coroutine,
934the [Job] of the new coroutine becomes
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300935a _child_ of the parent coroutine's job. When the parent coroutine is cancelled, all its children
936are recursively cancelled, too.
937
938```kotlin
939fun main(args: Array<String>) = runBlocking<Unit> {
940 // start a coroutine to process some kind of incoming request
941 val request = launch(CommonPool) {
942 // it spawns two other jobs, one with its separate context
943 val job1 = launch(CommonPool) {
944 println("job1: I have my own context and execute independently!")
945 delay(1000)
946 println("job1: I am not affected by cancellation of the request")
947 }
948 // and the other inherits the parent context
949 val job2 = launch(context) {
950 println("job2: I am a child of the request coroutine")
951 delay(1000)
952 println("job2: I will not execute this line if my parent request is cancelled")
953 }
954 // request completes when both its sub-jobs complete:
955 job1.join()
956 job2.join()
957 }
958 delay(500)
959 request.cancel() // cancel processing of the request
960 delay(1000) // delay a second to see what happens
961 println("main: Who has survived request cancellation?")
962}
963```
964
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300965> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-06.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300966
967The output of this code is:
968
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300969```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300970job1: I have my own context and execute independently!
971job2: I am a child of the request coroutine
972job1: I am not affected by cancellation of the request
973main: Who has survived request cancellation?
974```
975
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300976<!--- TEST -->
977
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300978### Combining contexts
979
980Coroutine context can be combined using `+` operator. The context on the right-hand side replaces relevant entries
Roman Elizarov419a6c82017-02-09 18:36:22 +0300981of the context on the left-hand side. For example, a [Job] of the parent coroutine can be inherited, while
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300982its dispatcher replaced:
983
984```kotlin
985fun main(args: Array<String>) = runBlocking<Unit> {
986 // start a coroutine to process some kind of incoming request
987 val request = launch(context) { // use the context of `runBlocking`
988 // spawns CPU-intensive child job in CommonPool !!!
989 val job = launch(context + CommonPool) {
990 println("job: I am a child of the request coroutine, but with a different dispatcher")
991 delay(1000)
992 println("job: I will not execute this line if my parent request is cancelled")
993 }
994 job.join() // request completes when its sub-job completes
995 }
996 delay(500)
997 request.cancel() // cancel processing of the request
998 delay(1000) // delay a second to see what happens
999 println("main: Who has survived request cancellation?")
1000}
1001```
1002
Roman Elizarovfa7723e2017-02-06 11:17:51 +03001003> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-07.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001004
1005The expected outcome of this code is:
1006
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001007```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001008job: I am a child of the request coroutine, but with a different dispatcher
1009main: Who has survived request cancellation?
1010```
1011
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001012<!--- TEST -->
1013
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001014### Naming coroutines for debugging
1015
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001016Automatically assigned ids are good when coroutines log often and you just need to correlate log records
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001017coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
1018or doing some specific background task, it is better to name it explicitly for debugging purposes.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001019[CoroutineName] serves the same function as a thread name. It'll get displayed in the thread name that
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001020is executing this coroutine when debugging more is turned on.
1021
1022The following example demonstrates this concept:
1023
1024```kotlin
1025fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
1026
1027fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
1028 log("Started main coroutine")
1029 // run two background value computations
Roman Elizarov32d95322017-02-09 15:57:31 +03001030 val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001031 log("Computing v1")
1032 delay(500)
1033 252
1034 }
Roman Elizarov32d95322017-02-09 15:57:31 +03001035 val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001036 log("Computing v2")
1037 delay(1000)
1038 6
1039 }
1040 log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
1041}
1042```
1043
Roman Elizarovfa7723e2017-02-06 11:17:51 +03001044> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001045
1046The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
1047
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001048```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001049[main @main#1] Started main coroutine
1050[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
1051[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
1052[main @main#1] The answer for v1 / v2 = 42
1053```
Roman Elizarov1293ccd2017-02-01 18:49:54 +03001054
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001055<!--- TEST FLEXIBLE_THREAD -->
1056
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001057### Cancellation via explicit job
1058
1059Let us put our knowledge about contexts, children and jobs together. Assume that our application has
1060an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application
1061and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch
1062and update data, do animations, etc. All of these coroutines must be cancelled when activity is destroyed
1063to avoid memory leaks.
1064
1065We can manage a lifecycle of our coroutines by creating an instance of [Job] that is tied to
1066the lifecycle of our activity. A job instance is created using [Job()][Job.invoke] factory function
1067as the following example shows. We need to make sure that all the coroutines are started
1068with this job in their context and then a single invocation of [Job.cancel] terminates them all.
1069
1070```kotlin
1071fun main(args: Array<String>) = runBlocking<Unit> {
1072 val job = Job() // create a job object to manage our lifecycle
1073 // now launch ten coroutines for a demo, each working for a different time
1074 val coroutines = List(10) { i ->
1075 // they are all children of our job object
1076 launch(context + job) { // we use the context of main runBlocking thread, but with our own job object
1077 delay(i * 200L) // variable delay 0ms, 200ms, 400ms, ... etc
1078 println("Coroutine $i is done")
1079 }
1080 }
1081 println("Launched ${coroutines.size} coroutines")
1082 delay(500L) // delay for half a second
1083 println("Cancelling job!")
1084 job.cancel() // cancel our job.. !!!
1085 delay(1000L) // delay for more to see if our coroutines are still working
1086}
1087```
1088
1089> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt)
1090
1091The output of this example is:
1092
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001093```text
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001094Launched 10 coroutines
1095Coroutine 0 is done
1096Coroutine 1 is done
1097Coroutine 2 is done
1098Cancelling job!
1099```
1100
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001101<!--- TEST -->
1102
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001103As you can see, only the first three coroutines had printed a message and the others were cancelled
1104by a single invocation of `job.cancel()`. So all we need to do in our hypothetical Android
1105application is to create a parent job object when activity is created, use it for child coroutines,
1106and cancel it when activity is destroyed.
1107
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001108## Channels
Roman Elizarov7deefb82017-01-31 10:33:17 +03001109
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001110Deferred values provide a convenient way to transfer a single value between coroutines.
1111Channels provide a way to transfer a stream of values.
1112
1113<!--- INCLUDE .*/example-channel-([0-9]+).kt
1114import kotlinx.coroutines.experimental.channels.*
1115-->
1116
1117### Channel basics
1118
Roman Elizarov419a6c82017-02-09 18:36:22 +03001119A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
1120instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
1121a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001122
1123```kotlin
1124fun main(args: Array<String>) = runBlocking<Unit> {
1125 val channel = Channel<Int>()
1126 launch(CommonPool) {
1127 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
1128 for (x in 1..5) channel.send(x * x)
1129 }
1130 // here we print five received integers:
1131 repeat(5) { println(channel.receive()) }
1132 println("Done!")
1133}
1134```
1135
1136> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
1137
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001138The output of this code is:
1139
1140```text
11411
11424
11439
114416
114525
1146Done!
1147```
1148
1149<!--- TEST -->
1150
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001151### Closing and iteration over channels
1152
1153Unlike a queue, a channel can be closed to indicate that no more elements are coming.
1154On the receiver side it is convenient to use a regular `for` loop to receive elements
1155from the channel.
1156
Roman Elizarov419a6c82017-02-09 18:36:22 +03001157Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001158The iteration stops as soon as this close token is received, so there is a guarantee
1159that all previously sent elements before the close are received:
1160
1161```kotlin
1162fun main(args: Array<String>) = runBlocking<Unit> {
1163 val channel = Channel<Int>()
1164 launch(CommonPool) {
1165 for (x in 1..5) channel.send(x * x)
1166 channel.close() // we're done sending
1167 }
1168 // here we print received values using `for` loop (until the channel is closed)
1169 for (y in channel) println(y)
1170 println("Done!")
1171}
1172```
1173
1174> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
1175
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001176<!--- TEST
11771
11784
11799
118016
118125
1182Done!
1183-->
1184
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001185### Building channel producers
1186
Roman Elizarova5e653f2017-02-13 13:49:55 +03001187The pattern where a coroutine is producing a sequence of elements is quite common.
1188This is a part of _producer-consumer_ pattern that is often found in concurrent code.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001189You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
Roman Elizarova5e653f2017-02-13 13:49:55 +03001190to common sense that results must be returned from functions.
1191
1192There is a convenience coroutine builder named [produce] that makes it easy to do it right:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001193
1194```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001195fun produceSquares() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001196 for (x in 1..5) send(x * x)
1197}
1198
1199fun main(args: Array<String>) = runBlocking<Unit> {
1200 val squares = produceSquares()
1201 for (y in squares) println(y)
1202 println("Done!")
1203}
1204```
1205
1206> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
1207
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001208<!--- TEST
12091
12104
12119
121216
121325
1214Done!
1215-->
1216
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001217### Pipelines
1218
1219Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
1220
1221```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001222fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001223 var x = 1
1224 while (true) send(x++) // infinite stream of integers starting from 1
1225}
1226```
1227
Roman Elizarova5e653f2017-02-13 13:49:55 +03001228And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001229In the below example the numbers are just squared:
1230
1231```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001232fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001233 for (x in numbers) send(x * x)
1234}
1235```
1236
Roman Elizarova5e653f2017-02-13 13:49:55 +03001237The main code starts and connects the whole pipeline:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001238
1239```kotlin
1240fun main(args: Array<String>) = runBlocking<Unit> {
1241 val numbers = produceNumbers() // produces integers from 1 and on
1242 val squares = square(numbers) // squares integers
1243 for (i in 1..5) println(squares.receive()) // print first five
1244 println("Done!") // we are done
1245 squares.cancel() // need to cancel these coroutines in a larger app
1246 numbers.cancel()
1247}
1248```
1249
1250> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
1251
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001252<!--- TEST
12531
12544
12559
125616
125725
1258Done!
1259-->
1260
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001261We don't have to cancel these coroutines in this example app, because
1262[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
1263but in a larger app we'll need to stop our pipeline if we don't need it anymore.
1264Alternatively, we could have run pipeline coroutines as
1265[children of a coroutine](#children-of-a-coroutine).
1266
1267### Prime numbers with pipeline
1268
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001269Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001270of coroutines. We start with an infinite sequence of numbers. This time we introduce an
1271explicit context parameter, so that caller can control where our coroutines run:
1272
1273<!--- INCLUDE kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
1274import kotlin.coroutines.experimental.CoroutineContext
1275-->
1276
1277```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001278fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001279 var x = start
1280 while (true) send(x++) // infinite stream of integers from start
1281}
1282```
1283
1284The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1285that are divisible by the given prime number:
1286
1287```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001288fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001289 for (x in numbers) if (x % prime != 0) send(x)
1290}
1291```
1292
1293Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001294and launching new pipeline stage for each prime number found:
1295
1296```
Roman Elizarova5e653f2017-02-13 13:49:55 +03001297numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
Roman Elizarov62500ba2017-02-09 18:55:40 +03001298```
1299
1300The following example prints the first ten prime numbers,
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001301running the whole pipeline in the context of the main thread:
1302
1303```kotlin
1304fun main(args: Array<String>) = runBlocking<Unit> {
1305 var cur = numbersFrom(context, 2)
1306 for (i in 1..10) {
1307 val prime = cur.receive()
1308 println(prime)
1309 cur = filter(context, cur, prime)
1310 }
1311}
1312```
1313
1314> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
1315
1316The output of this code is:
1317
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001318```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +030013192
13203
13215
13227
132311
132413
132517
132619
132723
132829
1329```
1330
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001331<!--- TEST -->
1332
Roman Elizarova5e653f2017-02-13 13:49:55 +03001333Note, that you can build the same pipeline using `buildIterator` coroutine builder from the standard library.
1334Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001335`ReceiveChannel` with `Iterator`, and get rid of the context. You will not need `runBlocking` either.
1336However, the benefit of a pipeline that uses channels as shown above is that it can actually use
1337multiple CPU cores if you run it in [CommonPool] context.
1338
Roman Elizarova5e653f2017-02-13 13:49:55 +03001339Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
Roman Elizarov62500ba2017-02-09 18:55:40 +03001340other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
1341built using `buildSeqeunce`/`buildIterator`, because they do not allow arbitrary suspension, unlike
Roman Elizarova5e653f2017-02-13 13:49:55 +03001342`produce` which is fully asynchronous.
Roman Elizarov62500ba2017-02-09 18:55:40 +03001343
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001344### Fan-out
1345
1346Multiple coroutines may receive from the same channel, distributing work between themselves.
1347Let us start with a producer coroutine that is periodically producing integers
1348(ten numbers per second):
1349
1350```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001351fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001352 var x = 1 // start from 1
1353 while (true) {
1354 send(x++) // produce next
1355 delay(100) // wait 0.1s
1356 }
1357}
1358```
1359
1360Then we can have several processor coroutines. In this example, they just print their id and
1361received number:
1362
1363```kotlin
1364fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
Roman Elizarovec9384c2017-03-02 22:09:08 +03001365 for (x in channel) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001366 println("Processor #$id received $x")
Roman Elizarovec9384c2017-03-02 22:09:08 +03001367 }
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001368}
1369```
1370
1371Now let us launch five processors and let them work for a second. See what happens:
1372
1373```kotlin
1374fun main(args: Array<String>) = runBlocking<Unit> {
1375 val producer = produceNumbers()
1376 repeat(5) { launchProcessor(it, producer) }
1377 delay(1000)
1378 producer.cancel() // cancel producer coroutine and thus kill them all
1379}
1380```
1381
1382> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
1383
1384The output will be similar to the the following one, albeit the processor ids that receive
1385each specific integer may be different:
1386
1387```
1388Processor #2 received 1
1389Processor #4 received 2
1390Processor #0 received 3
1391Processor #1 received 4
1392Processor #3 received 5
1393Processor #2 received 6
1394Processor #4 received 7
1395Processor #0 received 8
1396Processor #1 received 9
1397Processor #3 received 10
1398```
1399
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001400<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
1401
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001402Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1403over the channel that processor coroutines are doing.
1404
1405### Fan-in
1406
1407Multiple coroutines may send to the same channel.
1408For example, let us have a channel of strings, and a suspending function that
1409repeatedly sends a specified string to this channel with a specified delay:
1410
1411```kotlin
1412suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1413 while (true) {
1414 delay(time)
1415 channel.send(s)
1416 }
1417}
1418```
1419
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001420Now, let us see what happens if we launch a couple of coroutines sending strings
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001421(in this example we launch them in the context of the main thread):
1422
1423```kotlin
1424fun main(args: Array<String>) = runBlocking<Unit> {
1425 val channel = Channel<String>()
1426 launch(context) { sendString(channel, "foo", 200L) }
1427 launch(context) { sendString(channel, "BAR!", 500L) }
1428 repeat(6) { // receive first six
1429 println(channel.receive())
1430 }
1431}
1432```
1433
1434> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
1435
1436The output is:
1437
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001438```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001439foo
1440foo
1441BAR!
1442foo
1443foo
1444BAR!
1445```
1446
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001447<!--- TEST -->
1448
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001449### Buffered channels
1450
1451The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1452meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1453if receive is invoked first, it is suspended until send is invoked.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001454
Roman Elizarova5e653f2017-02-13 13:49:55 +03001455Both [Channel()][Channel.invoke] factory function and [produce] builder take an optional `capacity` parameter to
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001456specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1457similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1458
1459Take a look at the behavior of the following code:
1460
1461```kotlin
1462fun main(args: Array<String>) = runBlocking<Unit> {
1463 val channel = Channel<Int>(4) // create buffered channel
1464 launch(context) { // launch sender coroutine
1465 repeat(10) {
1466 println("Sending $it") // print before sending each element
1467 channel.send(it) // will suspend when buffer is full
1468 }
1469 }
1470 // don't receive anything... just wait....
1471 delay(1000)
1472}
1473```
1474
1475> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
1476
1477It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1478
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001479```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001480Sending 0
1481Sending 1
1482Sending 2
1483Sending 3
1484Sending 4
1485```
1486
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001487<!--- TEST -->
1488
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001489The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001490
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001491
1492### Channels are fair
1493
1494Send and receive operations to channels are _fair_ with respect to the order of their invocation from
1495multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
1496gets the element. In the following example two coroutines "ping" and "pong" are
1497receiving the "ball" object from the shared "table" channel.
1498
1499```kotlin
1500data class Ball(var hits: Int)
1501
1502fun main(args: Array<String>) = runBlocking<Unit> {
1503 val table = Channel<Ball>() // a shared table
1504 launch(context) { player("ping", table) }
1505 launch(context) { player("pong", table) }
1506 table.send(Ball(0)) // serve the ball
1507 delay(1000) // delay 1 second
1508 table.receive() // game over, grab the ball
1509}
1510
1511suspend fun player(name: String, table: Channel<Ball>) {
1512 for (ball in table) { // receive the ball in a loop
1513 ball.hits++
1514 println("$name $ball")
1515 delay(200) // wait a bit
1516 table.send(ball) // send the ball back
1517 }
1518}
1519```
1520
1521> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-09.kt)
1522
1523The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
1524coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
1525received by the "pong" coroutine, because it was already waiting for it:
1526
1527```text
1528ping Ball(hits=1)
1529pong Ball(hits=2)
1530ping Ball(hits=3)
1531pong Ball(hits=4)
1532ping Ball(hits=5)
1533pong Ball(hits=6)
1534```
1535
1536<!--- TEST -->
1537
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001538## Shared mutable state and concurrency
1539
1540Coroutines can be executed concurrently using a multi-threaded dispatcher like [CommonPool]. It presents
1541all the usual concurrency problems. The main problem being synchronization of access to **shared mutable state**.
1542Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
1543but others are unique.
1544
1545### The problem
1546
Roman Elizarov1e459602017-02-27 11:05:17 +03001547Let us launch a thousand coroutines all doing the same action thousand times (for a total of a million executions).
1548We'll also measure their completion time for further comparisons:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001549
1550<!--- INCLUDE .*/example-sync-([0-9]+).kt
Roman Elizarov1e459602017-02-27 11:05:17 +03001551import kotlin.coroutines.experimental.CoroutineContext
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001552import kotlin.system.measureTimeMillis
1553-->
1554
Roman Elizarov1e459602017-02-27 11:05:17 +03001555<!--- INCLUDE .*/example-sync-03.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001556import java.util.concurrent.atomic.AtomicInteger
1557-->
1558
Roman Elizarov1e459602017-02-27 11:05:17 +03001559<!--- INCLUDE .*/example-sync-06.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001560import kotlinx.coroutines.experimental.sync.Mutex
1561-->
1562
Roman Elizarov1e459602017-02-27 11:05:17 +03001563<!--- INCLUDE .*/example-sync-07.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001564import kotlinx.coroutines.experimental.channels.*
1565-->
1566
1567```kotlin
Roman Elizarov1e459602017-02-27 11:05:17 +03001568suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
1569 val n = 1000 // number of coroutines to launch
1570 val k = 1000 // times an action is repeated by each coroutine
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001571 val time = measureTimeMillis {
1572 val jobs = List(n) {
Roman Elizarov1e459602017-02-27 11:05:17 +03001573 launch(context) {
1574 repeat(k) { action() }
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001575 }
1576 }
1577 jobs.forEach { it.join() }
1578 }
Roman Elizarov1e459602017-02-27 11:05:17 +03001579 println("Completed ${n * k} actions in $time ms")
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001580}
1581```
1582
1583<!--- INCLUDE .*/example-sync-([0-9]+).kt -->
1584
Roman Elizarov1e459602017-02-27 11:05:17 +03001585We start with a very simple action that increments a shared mutable variable using
1586multi-threaded [CommonPool] context.
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001587
1588```kotlin
1589var counter = 0
1590
1591fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001592 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001593 counter++
1594 }
1595 println("Counter = $counter")
1596}
1597```
1598
1599> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt)
1600
Roman Elizarov1e459602017-02-27 11:05:17 +03001601<!--- TEST LINES_START
1602Completed 1000000 actions in
1603Counter =
1604-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001605
Roman Elizarov1e459602017-02-27 11:05:17 +03001606What does it print at the end? It is highly unlikely to ever print "Counter = 1000000", because a thousand coroutines
1607increment the `counter` concurrently from multiple threads without any synchronization.
1608
1609### Volatiles are of no help
1610
1611There is common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
1612
1613```kotlin
1614@Volatile // in Kotlin `volatile` is an annotation
1615var counter = 0
1616
1617fun main(args: Array<String>) = runBlocking<Unit> {
1618 massiveRun(CommonPool) {
1619 counter++
1620 }
1621 println("Counter = $counter")
1622}
1623```
1624
1625> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt)
1626
1627<!--- TEST LINES_START
1628Completed 1000000 actions in
1629Counter =
1630-->
1631
1632This code works slower, but we still don't get "Counter = 1000000" at the end, because volatile variables guarantee
1633linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but
1634do not provide atomicity of larger actions (increment in our case).
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001635
1636### Thread-safe data structures
1637
1638The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
1639linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
1640operations that needs to be performed on a shared state.
Roman Elizarov1e459602017-02-27 11:05:17 +03001641In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001642
1643```kotlin
1644var counter = AtomicInteger()
1645
1646fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001647 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001648 counter.incrementAndGet()
1649 }
1650 println("Counter = ${counter.get()}")
1651}
1652```
1653
Roman Elizarov1e459602017-02-27 11:05:17 +03001654> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001655
Roman Elizarov1e459602017-02-27 11:05:17 +03001656<!--- TEST ARBITRARY_TIME
1657Completed 1000000 actions in xxx ms
1658Counter = 1000000
1659-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001660
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001661This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
1662standard data structures and basic operations on them. However, it does not easily scale to complex
1663state or to complex operations that do not have ready-to-use thread-safe implementations.
1664
Roman Elizarov1e459602017-02-27 11:05:17 +03001665### Thread confinement fine-grained
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001666
Roman Elizarov1e459602017-02-27 11:05:17 +03001667_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001668state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
1669the single event-dispatch/application thread. It is easy to apply with coroutines by using a
1670single-threaded context:
1671
1672```kotlin
1673val counterContext = newSingleThreadContext("CounterContext")
1674var counter = 0
1675
1676fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001677 massiveRun(CommonPool) { // run each coroutine in CommonPool
1678 run(counterContext) { // but confine each increment to the single-threaded context
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001679 counter++
1680 }
1681 }
1682 println("Counter = $counter")
1683}
1684```
1685
Roman Elizarov1e459602017-02-27 11:05:17 +03001686> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001687
Roman Elizarov1e459602017-02-27 11:05:17 +03001688<!--- TEST ARBITRARY_TIME
1689Completed 1000000 actions in xxx ms
1690Counter = 1000000
1691-->
1692
1693This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches
1694from multi-threaded `CommonPool` context to the single-threaded context using [run] block.
1695
1696### Thread confinement coarse-grained
1697
1698In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic
1699are confined to the single thread. The following example does it like that, running each coroutine in
1700the single-threaded context to start with.
1701
1702```kotlin
1703val counterContext = newSingleThreadContext("CounterContext")
1704var counter = 0
1705
1706fun main(args: Array<String>) = runBlocking<Unit> {
1707 massiveRun(counterContext) { // run each coroutine in the single-threaded context
1708 counter++
1709 }
1710 println("Counter = $counter")
1711}
1712```
1713
1714> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt)
1715
1716<!--- TEST ARBITRARY_TIME
1717Completed 1000000 actions in xxx ms
1718Counter = 1000000
1719-->
1720
1721This now works much faster and produces correct result.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001722
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001723### Mutual exclusion
1724
1725Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
1726that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
1727Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
1728delimit a critical section. The key difference is that `Mutex.lock` is a suspending function. It does not block a thread.
1729
1730```kotlin
1731val mutex = Mutex()
1732var counter = 0
1733
1734fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001735 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001736 mutex.lock()
1737 try { counter++ }
1738 finally { mutex.unlock() }
1739 }
1740 println("Counter = $counter")
1741}
1742```
1743
Roman Elizarov1e459602017-02-27 11:05:17 +03001744> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001745
Roman Elizarov1e459602017-02-27 11:05:17 +03001746<!--- TEST ARBITRARY_TIME
1747Completed 1000000 actions in xxx ms
1748Counter = 1000000
1749-->
1750
1751The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations
1752where you absolutely must modify some shared state periodically, but there is no natural thread that this state
1753is confined to.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001754
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001755### Actors
1756
1757An actor is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine,
1758and a channel to communicate with other coroutines. A simple actor can be written as a function,
1759but an actor with a complex state is better suited for a class.
1760
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001761There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its
1762scope to receive messages from and combines the send channel into the resulting job object, so that a
1763single reference to the actor can be carried around as its handle.
1764
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001765```kotlin
1766// Message types for counterActor
1767sealed class CounterMsg
1768object IncCounter : CounterMsg() // one-way message to increment counter
1769class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply
1770
1771// This function launches a new counter actor
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001772fun counterActor() = actor<CounterMsg>(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001773 var counter = 0 // actor state
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001774 for (msg in channel) { // iterate over incoming messages
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001775 when (msg) {
1776 is IncCounter -> counter++
1777 is GetCounter -> msg.response.send(counter)
1778 }
1779 }
1780}
1781
1782fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001783 val counter = counterActor() // create the actor
Roman Elizarov1e459602017-02-27 11:05:17 +03001784 massiveRun(CommonPool) {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001785 counter.send(IncCounter)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001786 }
1787 val response = Channel<Int>()
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001788 counter.send(GetCounter(response))
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001789 println("Counter = ${response.receive()}")
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001790 counter.close() // shutdown the actor
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001791}
1792```
1793
Roman Elizarov1e459602017-02-27 11:05:17 +03001794> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001795
Roman Elizarov1e459602017-02-27 11:05:17 +03001796<!--- TEST ARBITRARY_TIME
1797Completed 1000000 actions in xxx ms
1798Counter = 1000000
1799-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001800
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001801It does not matter (for correctness) what context the actor itself is executed in. An actor is
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001802a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
1803works as a solution to the problem of shared mutable state.
1804
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001805Actor is more efficient than locking under load, because in this case it always has work to do and it does not
1806have to switch to a different context at all.
1807
1808> Note, that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated
1809 with the channel that it receives messages from, while a producer is associated with the channel that it
1810 sends elements to.
Roman Elizarov1e459602017-02-27 11:05:17 +03001811
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001812## Select expression
1813
Roman Elizarova84730b2017-02-22 11:58:50 +03001814Select expression makes it possible to await multiple suspending functions simultaneously and _select_
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001815the first one that becomes available.
1816
1817<!--- INCLUDE .*/example-select-([0-9]+).kt
1818import kotlinx.coroutines.experimental.channels.*
1819import kotlinx.coroutines.experimental.selects.*
1820-->
1821
1822### Selecting from channels
1823
Roman Elizarov57857202017-03-02 23:17:25 +03001824Let us have two producers of strings: `fizz` and `buzz`. The `fizz` produces "Fizz" string every 300 ms:
1825
1826<!--- INCLUDE .*/example-select-01.kt
1827import kotlin.coroutines.experimental.CoroutineContext
1828-->
1829
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001830```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001831fun fizz(context: CoroutineContext) = produce<String>(context) {
1832 while (true) { // sends "Fizz" every 300 ms
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001833 delay(300)
1834 send("Fizz")
1835 }
1836}
1837```
1838
Roman Elizarov57857202017-03-02 23:17:25 +03001839And the `buzz` produces "Buzz!" string every 500 ms:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001840
1841```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001842fun buzz(context: CoroutineContext) = produce<String>(context) {
1843 while (true) { // sends "Buzz!" every 500 ms
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001844 delay(500)
1845 send("Buzz!")
1846 }
1847}
1848```
1849
1850Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the
1851other. But [select] expression allows us to receive from _both_ simultaneously using its
1852[onReceive][SelectBuilder.onReceive] clauses:
1853
1854```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001855suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001856 select<Unit> { // <Unit> means that this select expression does not produce any result
1857 fizz.onReceive { value -> // this is the first select clause
1858 println("fizz -> '$value'")
1859 }
1860 buzz.onReceive { value -> // this is the second select clause
1861 println("buzz -> '$value'")
1862 }
1863 }
1864}
1865```
1866
Roman Elizarov57857202017-03-02 23:17:25 +03001867Let us run it all seven times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001868
1869```kotlin
1870fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov57857202017-03-02 23:17:25 +03001871 val fizz = fizz(context)
1872 val buzz = buzz(context)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001873 repeat(7) {
Roman Elizarov57857202017-03-02 23:17:25 +03001874 selectFizzBuzz(fizz, buzz)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001875 }
1876}
1877```
1878
1879> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt)
1880
1881The result of this code is:
1882
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001883```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001884fizz -> 'Fizz'
1885buzz -> 'Buzz!'
1886fizz -> 'Fizz'
1887fizz -> 'Fizz'
1888buzz -> 'Buzz!'
1889fizz -> 'Fizz'
1890buzz -> 'Buzz!'
1891```
1892
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001893<!--- TEST -->
1894
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001895### Selecting on close
1896
1897The [onReceive][SelectBuilder.onReceive] clause in `select` fails when the channel is closed and the corresponding
1898`select` throws an exception. We can use [onReceiveOrNull][SelectBuilder.onReceiveOrNull] clause to perform a
Roman Elizarova84730b2017-02-22 11:58:50 +03001899specific action when the channel is closed. The following example also shows that `select` is an expression that returns
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001900the result of its selected clause:
1901
1902```kotlin
1903suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
1904 select<String> {
1905 a.onReceiveOrNull { value ->
1906 if (value == null)
1907 "Channel 'a' is closed"
1908 else
1909 "a -> '$value'"
1910 }
1911 b.onReceiveOrNull { value ->
1912 if (value == null)
1913 "Channel 'b' is closed"
1914 else
1915 "b -> '$value'"
1916 }
1917 }
1918```
1919
Roman Elizarova84730b2017-02-22 11:58:50 +03001920Let's use it with channel `a` that produces "Hello" string four times and
1921channel `b` that produces "World" four times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001922
1923```kotlin
1924fun main(args: Array<String>) = runBlocking<Unit> {
1925 // we are using the context of the main thread in this example for predictability ...
1926 val a = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001927 repeat(4) { send("Hello $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001928 }
1929 val b = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001930 repeat(4) { send("World $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001931 }
1932 repeat(8) { // print first eight results
1933 println(selectAorB(a, b))
1934 }
1935}
1936```
1937
1938> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt)
1939
Roman Elizarova84730b2017-02-22 11:58:50 +03001940The result of this code is quite interesting, so we'll analyze it in mode detail:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001941
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001942```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001943a -> 'Hello 0'
1944a -> 'Hello 1'
1945b -> 'World 0'
1946a -> 'Hello 2'
1947a -> 'Hello 3'
1948b -> 'World 1'
1949Channel 'a' is closed
1950Channel 'a' is closed
1951```
1952
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001953<!--- TEST -->
1954
Roman Elizarova84730b2017-02-22 11:58:50 +03001955There are couple of observations to make out of it.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001956
1957First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time,
1958the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel,
Roman Elizarova84730b2017-02-22 11:58:50 +03001959being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001960time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
1961
1962The second observation, is that [onReceiveOrNull][SelectBuilder.onReceiveOrNull] gets immediately selected when the
1963channel is already closed.
1964
1965### Selecting to send
1966
1967Select expression has [onSend][SelectBuilder.onSend] clause that can be used for a great good in combination
1968with a biased nature of selection.
1969
Roman Elizarova84730b2017-02-22 11:58:50 +03001970Let us write an example of producer of integers that sends its values to a `side` channel when
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001971the consumers on its primary channel cannot keep up with it:
1972
1973```kotlin
1974fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
1975 for (num in 1..10) { // produce 10 numbers from 1 to 10
1976 delay(100) // every 100 ms
1977 select<Unit> {
Roman Elizarova84730b2017-02-22 11:58:50 +03001978 onSend(num) {} // Send to the primary channel
1979 side.onSend(num) {} // or to the side channel
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001980 }
1981 }
1982}
1983```
1984
1985Consumer is going to be quite slow, taking 250 ms to process each number:
1986
1987```kotlin
1988fun main(args: Array<String>) = runBlocking<Unit> {
1989 val side = Channel<Int>() // allocate side channel
1990 launch(context) { // this is a very fast consumer for the side channel
1991 for (num in side) println("Side channel has $num")
1992 }
1993 for (num in produceNumbers(side)) {
1994 println("Consuming $num")
1995 delay(250) // let us digest the consumed number properly, do not hurry
1996 }
1997 println("Done consuming")
1998}
1999```
2000
2001> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt)
2002
2003So let us see what happens:
2004
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002005```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002006Consuming 1
2007Side channel has 2
2008Side channel has 3
2009Consuming 4
2010Side channel has 5
2011Side channel has 6
2012Consuming 7
2013Side channel has 8
2014Side channel has 9
2015Consuming 10
2016Done consuming
2017```
2018
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002019<!--- TEST -->
2020
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002021### Selecting deferred values
2022
Roman Elizarova84730b2017-02-22 11:58:50 +03002023Deferred values can be selected using [onAwait][SelectBuilder.onAwait] clause.
2024Let us start with an async function that returns a deferred string value after
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002025a random delay:
2026
2027<!--- INCLUDE .*/example-select-04.kt
2028import java.util.*
2029-->
2030
2031```kotlin
2032fun asyncString(time: Int) = async(CommonPool) {
2033 delay(time.toLong())
2034 "Waited for $time ms"
2035}
2036```
2037
Roman Elizarova84730b2017-02-22 11:58:50 +03002038Let us start a dozen of them with a random delay.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002039
2040```kotlin
2041fun asyncStringsList(): List<Deferred<String>> {
2042 val random = Random(3)
Roman Elizarova84730b2017-02-22 11:58:50 +03002043 return List(12) { asyncString(random.nextInt(1000)) }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002044}
2045```
2046
Roman Elizarova84730b2017-02-22 11:58:50 +03002047Now the main function awaits for the first of them to complete and counts the number of deferred values
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002048that are still active. Note, that we've used here the fact that `select` expression is a Kotlin DSL,
Roman Elizarova84730b2017-02-22 11:58:50 +03002049so we can provide clauses for it using an arbitrary code. In this case we iterate over a list
2050of deferred values to provide `onAwait` clause for each deferred value.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002051
2052```kotlin
2053fun main(args: Array<String>) = runBlocking<Unit> {
2054 val list = asyncStringsList()
2055 val result = select<String> {
2056 list.withIndex().forEach { (index, deferred) ->
2057 deferred.onAwait { answer ->
2058 "Deferred $index produced answer '$answer'"
2059 }
2060 }
2061 }
2062 println(result)
Roman Elizarov7c864d82017-02-27 10:17:50 +03002063 val countActive = list.count { it.isActive }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002064 println("$countActive coroutines are still active")
2065}
2066```
2067
2068> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-04.kt)
2069
2070The output is:
2071
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002072```text
Roman Elizarova84730b2017-02-22 11:58:50 +03002073Deferred 4 produced answer 'Waited for 128 ms'
Roman Elizarovd4dcbe22017-02-22 09:57:46 +0300207411 coroutines are still active
2075```
2076
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002077<!--- TEST -->
2078
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002079### Switch over a channel of deferred values
2080
Roman Elizarova84730b2017-02-22 11:58:50 +03002081Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
2082deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002083[onReceiveOrNull][SelectBuilder.onReceiveOrNull] and [onAwait][SelectBuilder.onAwait] clauses in the same `select`:
2084
2085```kotlin
2086fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String>(CommonPool) {
Roman Elizarova84730b2017-02-22 11:58:50 +03002087 var current = input.receive() // start with first received deferred value
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002088 while (isActive) { // loop while not cancelled/closed
2089 val next = select<Deferred<String>?> { // return next deferred value from this select or null
2090 input.onReceiveOrNull { update ->
2091 update // replaces next value to wait
2092 }
2093 current.onAwait { value ->
2094 send(value) // send value that current deferred has produced
2095 input.receiveOrNull() // and use the next deferred from the input channel
2096 }
2097 }
2098 if (next == null) {
2099 println("Channel was closed")
2100 break // out of loop
2101 } else {
2102 current = next
2103 }
2104 }
2105}
2106```
2107
2108To test it, we'll use a simple async function that resolves to a specified string after a specified time:
2109
2110```kotlin
2111fun asyncString(str: String, time: Long) = async(CommonPool) {
2112 delay(time)
2113 str
2114}
2115```
2116
2117The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
2118data to it:
2119
2120```kotlin
2121fun main(args: Array<String>) = runBlocking<Unit> {
2122 val chan = Channel<Deferred<String>>() // the channel for test
Roman Elizarova84730b2017-02-22 11:58:50 +03002123 launch(context) { // launch printing coroutine
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002124 for (s in switchMapDeferreds(chan))
2125 println(s) // print each received string
2126 }
2127 chan.send(asyncString("BEGIN", 100))
2128 delay(200) // enough time for "BEGIN" to be produced
2129 chan.send(asyncString("Slow", 500))
Roman Elizarova84730b2017-02-22 11:58:50 +03002130 delay(100) // not enough time to produce slow
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002131 chan.send(asyncString("Replace", 100))
Roman Elizarova84730b2017-02-22 11:58:50 +03002132 delay(500) // give it time before the last one
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002133 chan.send(asyncString("END", 500))
2134 delay(1000) // give it time to process
Roman Elizarova84730b2017-02-22 11:58:50 +03002135 chan.close() // close the channel ...
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002136 delay(500) // and wait some time to let it finish
2137}
2138```
2139
2140> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-05.kt)
2141
2142The result of this code:
2143
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002144```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002145BEGIN
2146Replace
2147END
2148Channel was closed
2149```
2150
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002151<!--- TEST -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002152
Roman Elizarove0c817d2017-02-10 10:22:01 +03002153<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core -->
2154<!--- DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
2155<!--- INDEX kotlinx.coroutines.experimental -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002156[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
2157[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/delay.html
2158[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
2159[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
2160[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-cancellation-exception.html
2161[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002162[CoroutineScope.isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/is-active.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002163[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
2164[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
2165[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
2166[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
2167[async]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/async.html
2168[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002169[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html
2170[Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/start.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002171[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
2172[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002173[CoroutineScope.context]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/context.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002174[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002175[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002176[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002177[Job.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/invoke.html
2178[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/cancel.html
Roman Elizarovf5bc0472017-02-22 11:38:13 +03002179<!--- INDEX kotlinx.coroutines.experimental.sync -->
2180[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002181[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/lock.html
2182[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/unlock.html
Roman Elizarove0c817d2017-02-10 10:22:01 +03002183<!--- INDEX kotlinx.coroutines.experimental.channels -->
Roman Elizarov419a6c82017-02-09 18:36:22 +03002184[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002185[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
2186[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
2187[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html
Roman Elizarova5e653f2017-02-13 13:49:55 +03002188[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002189[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/invoke.html
Roman Elizarovc0e19f82017-02-27 11:59:14 +03002190[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002191<!--- INDEX kotlinx.coroutines.experimental.selects -->
2192[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002193[SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive.html
2194[SelectBuilder.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive-or-null.html
2195[SelectBuilder.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-send.html
2196[SelectBuilder.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-await.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002197<!--- END -->