blob: abc4acf52862951831010b34941d1b32418ec300 [file] [log] [blame] [view]
Roman Elizarova5e653f2017-02-13 13:49:55 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9]+)\.kt
2/*
3 * Copyright 2016-2017 JetBrains s.r.o.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
Roman Elizarovf16fd272017-02-07 11:26:00 +030017
Roman Elizarova5e653f2017-02-13 13:49:55 +030018// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
19package guide.$$1.example$$2
Roman Elizarovf16fd272017-02-07 11:26:00 +030020
Roman Elizarova5e653f2017-02-13 13:49:55 +030021import kotlinx.coroutines.experimental.*
Roman Elizarovf16fd272017-02-07 11:26:00 +030022-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +030023<!--- KNIT kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
24<!--- TEST_OUT kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
25// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
26package guide.test
27
28import org.junit.Test
29
30class GuideTest {
31-->
Roman Elizarovf16fd272017-02-07 11:26:00 +030032
Roman Elizarov7deefb82017-01-31 10:33:17 +030033# Guide to kotlinx.coroutines by example
34
35This is a short guide on core features of `kotlinx.coroutines` with a series of examples.
36
Roman Elizarov2a638922017-03-04 10:22:43 +030037## Introduction and setup
38
39Kotlin, as a language, provides only minimal low-level APIs in its standard library to enable various other
40libraries to utilize coroutines. Unlike many other languages with similar capabilities, `async` and `await`
41are not keywords in Kotlin and are not even part of its standard library.
42
43`kotlinx.coroutines` in one such rich library. It contains a number of high-level
44coroutine-enabled primitives that this guide covers, including `async` and `await`.
45You need to add a dependency on `kotlinx-coroutines-core` module as explained
46[here](README.md#using-in-your-projects) to use primitives from this guide in your projects.
47
Roman Elizarov1293ccd2017-02-01 18:49:54 +030048## Table of contents
49
Roman Elizarovfa7723e2017-02-06 11:17:51 +030050<!--- TOC -->
51
Roman Elizarov1293ccd2017-02-01 18:49:54 +030052* [Coroutine basics](#coroutine-basics)
53 * [Your first coroutine](#your-first-coroutine)
54 * [Bridging blocking and non-blocking worlds](#bridging-blocking-and-non-blocking-worlds)
55 * [Waiting for a job](#waiting-for-a-job)
56 * [Extract function refactoring](#extract-function-refactoring)
57 * [Coroutines ARE light-weight](#coroutines-are-light-weight)
58 * [Coroutines are like daemon threads](#coroutines-are-like-daemon-threads)
59* [Cancellation and timeouts](#cancellation-and-timeouts)
60 * [Cancelling coroutine execution](#cancelling-coroutine-execution)
61 * [Cancellation is cooperative](#cancellation-is-cooperative)
62 * [Making computation code cancellable](#making-computation-code-cancellable)
63 * [Closing resources with finally](#closing-resources-with-finally)
64 * [Run non-cancellable block](#run-non-cancellable-block)
65 * [Timeout](#timeout)
66* [Composing suspending functions](#composing-suspending-functions)
67 * [Sequential by default](#sequential-by-default)
Roman Elizarov32d95322017-02-09 15:57:31 +030068 * [Concurrent using async](#concurrent-using-async)
69 * [Lazily started async](#lazily-started-async)
70 * [Async-style functions](#async-style-functions)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030071* [Coroutine context and dispatchers](#coroutine-context-and-dispatchers)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030072 * [Dispatchers and threads](#dispatchers-and-threads)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030073 * [Unconfined vs confined dispatcher](#unconfined-vs-confined-dispatcher)
74 * [Debugging coroutines and threads](#debugging-coroutines-and-threads)
75 * [Jumping between threads](#jumping-between-threads)
76 * [Job in the context](#job-in-the-context)
77 * [Children of a coroutine](#children-of-a-coroutine)
78 * [Combining contexts](#combining-contexts)
79 * [Naming coroutines for debugging](#naming-coroutines-for-debugging)
Roman Elizarov2fd7cb32017-02-11 23:18:59 +030080 * [Cancellation via explicit job](#cancellation-via-explicit-job)
Roman Elizarovb7721cf2017-02-03 19:23:08 +030081* [Channels](#channels)
82 * [Channel basics](#channel-basics)
83 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
84 * [Building channel producers](#building-channel-producers)
85 * [Pipelines](#pipelines)
86 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
87 * [Fan-out](#fan-out)
88 * [Fan-in](#fan-in)
89 * [Buffered channels](#buffered-channels)
Roman Elizarovb0517ba2017-02-27 14:03:14 +030090 * [Channels are fair](#channels-are-fair)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030091* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
92 * [The problem](#the-problem)
Roman Elizarov1e459602017-02-27 11:05:17 +030093 * [Volatiles are of no help](#volatiles-are-of-no-help)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030094 * [Thread-safe data structures](#thread-safe-data-structures)
Roman Elizarov1e459602017-02-27 11:05:17 +030095 * [Thread confinement fine-grained](#thread-confinement-fine-grained)
96 * [Thread confinement coarse-grained](#thread-confinement-coarse-grained)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030097 * [Mutual exclusion](#mutual-exclusion)
98 * [Actors](#actors)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +030099* [Select expression](#select-expression)
100 * [Selecting from channels](#selecting-from-channels)
101 * [Selecting on close](#selecting-on-close)
102 * [Selecting to send](#selecting-to-send)
103 * [Selecting deferred values](#selecting-deferred-values)
104 * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values)
Roman Elizarov8db17332017-03-09 12:40:45 +0300105* [Further reading](#further-reading)
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300106
Roman Elizarova5e653f2017-02-13 13:49:55 +0300107<!--- END_TOC -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300108
109## Coroutine basics
110
111This section covers basic coroutine concepts.
112
113### Your first coroutine
Roman Elizarov7deefb82017-01-31 10:33:17 +0300114
115Run the following code:
116
117```kotlin
118fun main(args: Array<String>) {
119 launch(CommonPool) { // create new coroutine in common thread pool
120 delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
121 println("World!") // print after delay
122 }
123 println("Hello,") // main function continues while coroutine is delayed
124 Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
125}
126```
127
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300128> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300129
130Run this code:
131
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300132```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300133Hello,
134World!
135```
136
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300137<!--- TEST -->
138
Roman Elizarov419a6c82017-02-09 18:36:22 +0300139Essentially, coroutines are light-weight threads.
140They are launched with [launch] _coroutine builder_.
141You can achieve the same result replacing
Roman Elizarov7deefb82017-01-31 10:33:17 +0300142`launch(CommonPool) { ... }` with `thread { ... }` and `delay(...)` with `Thread.sleep(...)`. Try it.
143
144If you start by replacing `launch(CommonPool)` by `thread`, the compiler produces the following error:
145
146```
147Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
148```
149
Roman Elizarov419a6c82017-02-09 18:36:22 +0300150That is because [delay] is a special _suspending function_ that does not block a thread, but _suspends_
Roman Elizarov7deefb82017-01-31 10:33:17 +0300151coroutine and it can be only used from a coroutine.
152
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300153### Bridging blocking and non-blocking worlds
Roman Elizarov7deefb82017-01-31 10:33:17 +0300154
155The first example mixes _non-blocking_ `delay(...)` and _blocking_ `Thread.sleep(...)` in the same
156code of `main` function. It is easy to get lost. Let's cleanly separate blocking and non-blocking
Roman Elizarov419a6c82017-02-09 18:36:22 +0300157worlds by using [runBlocking]:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300158
159```kotlin
160fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
161 launch(CommonPool) { // create new coroutine in common thread pool
162 delay(1000L)
163 println("World!")
164 }
165 println("Hello,") // main coroutine continues while child is delayed
166 delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
167}
168```
169
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300170> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300171
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300172<!--- TEST
173Hello,
174World!
175-->
176
Roman Elizarov419a6c82017-02-09 18:36:22 +0300177The result is the same, but this code uses only non-blocking [delay].
Roman Elizarov7deefb82017-01-31 10:33:17 +0300178
179`runBlocking { ... }` works as an adaptor that is used here to start the top-level main coroutine.
180The regular code outside of `runBlocking` _blocks_, until the coroutine inside `runBlocking` is active.
181
182This is also a way to write unit-tests for suspending functions:
183
184```kotlin
185class MyTest {
186 @Test
187 fun testMySuspendingFunction() = runBlocking<Unit> {
188 // here we can use suspending functions using any assertion style that we like
189 }
190}
191```
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300192
193<!--- CLEAR -->
Roman Elizarov7deefb82017-01-31 10:33:17 +0300194
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300195### Waiting for a job
Roman Elizarov7deefb82017-01-31 10:33:17 +0300196
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300197Delaying for a time while another coroutine is working is not a good approach. Let's explicitly
Roman Elizarov419a6c82017-02-09 18:36:22 +0300198wait (in a non-blocking way) until the background [Job] that we have launched is complete:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300199
200```kotlin
201fun main(args: Array<String>) = runBlocking<Unit> {
202 val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
203 delay(1000L)
204 println("World!")
205 }
206 println("Hello,")
207 job.join() // wait until child coroutine completes
208}
209```
210
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300211> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300212
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300213<!--- TEST
214Hello,
215World!
216-->
217
Roman Elizarov7deefb82017-01-31 10:33:17 +0300218Now the result is still the same, but the code of the main coroutine is not tied to the duration of
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300219the background job in any way. Much better.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300220
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300221### Extract function refactoring
Roman Elizarov7deefb82017-01-31 10:33:17 +0300222
223Let's extract the block of code inside `launch(CommonPool} { ... }` into a separate function. When you
224perform "Extract function" refactoring on this code you get a new function with `suspend` modifier.
225That is your first _suspending function_. Suspending functions can be used inside coroutines
226just like regular functions, but their additional feature is that they can, in turn,
227use other suspending functions, like `delay` in this example, to _suspend_ execution of a coroutine.
228
229```kotlin
230fun main(args: Array<String>) = runBlocking<Unit> {
231 val job = launch(CommonPool) { doWorld() }
232 println("Hello,")
233 job.join()
234}
235
236// this is your first suspending function
237suspend fun doWorld() {
238 delay(1000L)
239 println("World!")
240}
241```
242
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300243> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-04.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300244
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300245<!--- TEST
246Hello,
247World!
248-->
249
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300250### Coroutines ARE light-weight
Roman Elizarov7deefb82017-01-31 10:33:17 +0300251
252Run the following code:
253
254```kotlin
255fun main(args: Array<String>) = runBlocking<Unit> {
256 val jobs = List(100_000) { // create a lot of coroutines and list their jobs
257 launch(CommonPool) {
258 delay(1000L)
259 print(".")
260 }
261 }
262 jobs.forEach { it.join() } // wait for all jobs to complete
263}
264```
265
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300266> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-05.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300267
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300268<!--- TEST lines.size == 1 && lines[0] == ".".repeat(100_000) -->
269
Roman Elizarov7deefb82017-01-31 10:33:17 +0300270It starts 100K coroutines and, after a second, each coroutine prints a dot.
271Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
272
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300273### Coroutines are like daemon threads
Roman Elizarov7deefb82017-01-31 10:33:17 +0300274
275The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300276returns from the main function after some delay:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300277
278```kotlin
279fun main(args: Array<String>) = runBlocking<Unit> {
280 launch(CommonPool) {
281 repeat(1000) { i ->
282 println("I'm sleeping $i ...")
283 delay(500L)
284 }
285 }
286 delay(1300L) // just quit after delay
287}
288```
289
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300290> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-06.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300291
292You can run and see that it prints three lines and terminates:
293
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300294```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300295I'm sleeping 0 ...
296I'm sleeping 1 ...
297I'm sleeping 2 ...
298```
299
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300300<!--- TEST -->
301
Roman Elizarov7deefb82017-01-31 10:33:17 +0300302Active coroutines do not keep the process alive. They are like daemon threads.
303
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300304## Cancellation and timeouts
305
306This section covers coroutine cancellation and timeouts.
307
308### Cancelling coroutine execution
Roman Elizarov7deefb82017-01-31 10:33:17 +0300309
310In small application the return from "main" method might sound like a good idea to get all coroutines
311implicitly terminated. In a larger, long-running application, you need finer-grained control.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300312The [launch] function returns a [Job] that can be used to cancel running coroutine:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300313
314```kotlin
315fun main(args: Array<String>) = runBlocking<Unit> {
316 val job = launch(CommonPool) {
317 repeat(1000) { i ->
318 println("I'm sleeping $i ...")
319 delay(500L)
320 }
321 }
322 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300323 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300324 job.cancel() // cancels the job
325 delay(1300L) // delay a bit to ensure it was cancelled indeed
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300326 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300327}
328```
329
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300330> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300331
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300332It produces the following output:
333
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300334```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300335I'm sleeping 0 ...
336I'm sleeping 1 ...
337I'm sleeping 2 ...
338main: I'm tired of waiting!
339main: Now I can quit.
340```
341
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300342<!--- TEST -->
343
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300344As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
345
346### Cancellation is cooperative
Roman Elizarov7deefb82017-01-31 10:33:17 +0300347
Tair Rzayevaf734622017-02-01 22:30:16 +0200348Coroutine cancellation is _cooperative_. A coroutine code has to cooperate to be cancellable.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300349All the suspending functions in `kotlinx.coroutines` are _cancellable_. They check for cancellation of
Roman Elizarov419a6c82017-02-09 18:36:22 +0300350coroutine and throw [CancellationException] when cancelled. However, if a coroutine is working in
Roman Elizarov7deefb82017-01-31 10:33:17 +0300351a computation and does not check for cancellation, then it cannot be cancelled, like the following
352example shows:
353
354```kotlin
355fun main(args: Array<String>) = runBlocking<Unit> {
356 val job = launch(CommonPool) {
357 var nextPrintTime = 0L
358 var i = 0
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300359 while (i < 10) { // computation loop
Roman Elizarov7deefb82017-01-31 10:33:17 +0300360 val currentTime = System.currentTimeMillis()
361 if (currentTime >= nextPrintTime) {
362 println("I'm sleeping ${i++} ...")
363 nextPrintTime = currentTime + 500L
364 }
365 }
366 }
367 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300368 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300369 job.cancel() // cancels the job
370 delay(1300L) // delay a bit to see if it was cancelled....
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300371 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300372}
373```
374
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300375> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300376
377Run it to see that it continues to print "I'm sleeping" even after cancellation.
378
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300379<!--- TEST
380I'm sleeping 0 ...
381I'm sleeping 1 ...
382I'm sleeping 2 ...
383main: I'm tired of waiting!
384I'm sleeping 3 ...
385I'm sleeping 4 ...
386I'm sleeping 5 ...
387main: Now I can quit.
388-->
389
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300390### Making computation code cancellable
Roman Elizarov7deefb82017-01-31 10:33:17 +0300391
392There are two approaches to making computation code cancellable. The first one is to periodically
Roman Elizarov419a6c82017-02-09 18:36:22 +0300393invoke a suspending function. There is a [yield] function that is a good choice for that purpose.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300394The other one is to explicitly check the cancellation status. Let us try the later approach.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300395
396Replace `while (true)` in the previous example with `while (isActive)` and rerun it.
397
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300398```kotlin
399fun main(args: Array<String>) = runBlocking<Unit> {
400 val job = launch(CommonPool) {
401 var nextPrintTime = 0L
402 var i = 0
403 while (isActive) { // cancellable computation loop
404 val currentTime = System.currentTimeMillis()
405 if (currentTime >= nextPrintTime) {
406 println("I'm sleeping ${i++} ...")
407 nextPrintTime = currentTime + 500L
408 }
409 }
410 }
411 delay(1300L) // delay a bit
412 println("main: I'm tired of waiting!")
413 job.cancel() // cancels the job
414 delay(1300L) // delay a bit to see if it was cancelled....
415 println("main: Now I can quit.")
416}
417```
418
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300419> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300420
Roman Elizarov419a6c82017-02-09 18:36:22 +0300421As you can see, now this loop can be cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
422the code of coroutines via [CoroutineScope] object.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300423
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300424<!--- TEST
425I'm sleeping 0 ...
426I'm sleeping 1 ...
427I'm sleeping 2 ...
428main: I'm tired of waiting!
429main: Now I can quit.
430-->
431
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300432### Closing resources with finally
433
Roman Elizarov419a6c82017-02-09 18:36:22 +0300434Cancellable suspending functions throw [CancellationException] on cancellation which can be handled in
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300435all the usual way. For example, the `try {...} finally {...}` and Kotlin `use` function execute their
436finalization actions normally when coroutine is cancelled:
437
438```kotlin
439fun main(args: Array<String>) = runBlocking<Unit> {
440 val job = launch(CommonPool) {
441 try {
442 repeat(1000) { i ->
443 println("I'm sleeping $i ...")
444 delay(500L)
445 }
446 } finally {
447 println("I'm running finally")
448 }
449 }
450 delay(1300L) // delay a bit
451 println("main: I'm tired of waiting!")
452 job.cancel() // cancels the job
453 delay(1300L) // delay a bit to ensure it was cancelled indeed
454 println("main: Now I can quit.")
455}
456```
457
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300458> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300459
460The example above produces the following output:
461
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300462```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300463I'm sleeping 0 ...
464I'm sleeping 1 ...
465I'm sleeping 2 ...
466main: I'm tired of waiting!
467I'm running finally
468main: Now I can quit.
469```
470
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300471<!--- TEST -->
472
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300473### Run non-cancellable block
474
475Any attempt to use a suspending function in the `finally` block of the previous example will cause
Roman Elizarov419a6c82017-02-09 18:36:22 +0300476[CancellationException], because the coroutine running this code is cancelled. Usually, this is not a
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300477problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
478communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
479rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
Roman Elizarov419a6c82017-02-09 18:36:22 +0300480`run(NonCancellable) {...}` using [run] function and [NonCancellable] context as the following example shows:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300481
482```kotlin
483fun main(args: Array<String>) = runBlocking<Unit> {
484 val job = launch(CommonPool) {
485 try {
486 repeat(1000) { i ->
487 println("I'm sleeping $i ...")
488 delay(500L)
489 }
490 } finally {
491 run(NonCancellable) {
492 println("I'm running finally")
493 delay(1000L)
494 println("And I've just delayed for 1 sec because I'm non-cancellable")
495 }
496 }
497 }
498 delay(1300L) // delay a bit
499 println("main: I'm tired of waiting!")
500 job.cancel() // cancels the job
501 delay(1300L) // delay a bit to ensure it was cancelled indeed
502 println("main: Now I can quit.")
503}
504```
505
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300506> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300507
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300508<!--- TEST
509I'm sleeping 0 ...
510I'm sleeping 1 ...
511I'm sleeping 2 ...
512main: I'm tired of waiting!
513I'm running finally
514And I've just delayed for 1 sec because I'm non-cancellable
515main: Now I can quit.
516-->
517
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300518### Timeout
519
520The most obvious reason to cancel coroutine execution in practice,
521is because its execution time has exceeded some timeout.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300522While you can manually track the reference to the corresponding [Job] and launch a separate coroutine to cancel
523the tracked one after delay, there is a ready to use [withTimeout] function that does it.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300524Look at the following example:
525
526```kotlin
527fun main(args: Array<String>) = runBlocking<Unit> {
528 withTimeout(1300L) {
529 repeat(1000) { i ->
530 println("I'm sleeping $i ...")
531 delay(500L)
532 }
533 }
534}
535```
536
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300537> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-06.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300538
539It produces the following output:
540
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300541```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300542I'm sleeping 0 ...
543I'm sleeping 1 ...
544I'm sleeping 2 ...
545Exception in thread "main" java.util.concurrent.CancellationException: Timed out waiting for 1300 MILLISECONDS
546```
547
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300548<!--- TEST STARTS_WITH -->
549
Roman Elizarov419a6c82017-02-09 18:36:22 +0300550We have not seen the [CancellationException] stack trace printed on the console before. That is because
Roman Elizarov7c864d82017-02-27 10:17:50 +0300551inside a cancelled coroutine `CancellationException` is considered to be a normal reason for coroutine completion.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300552However, in this example we have used `withTimeout` right inside the `main` function.
553
554Because cancellation is just an exception, all the resources will be closed in a usual way.
555You can wrap the code with timeout in `try {...} catch (e: CancellationException) {...}` block if
556you need to do some additional action specifically on timeout.
557
558## Composing suspending functions
559
560This section covers various approaches to composition of suspending functions.
561
562### Sequential by default
563
564Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300565remote service call or computation. We just pretend they are useful, but actually each one just
566delays for a second for the purpose of this example:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300567
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300568<!--- INCLUDE .*/example-compose-([0-9]+).kt
569import kotlin.system.measureTimeMillis
570-->
571
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300572```kotlin
573suspend fun doSomethingUsefulOne(): Int {
574 delay(1000L) // pretend we are doing something useful here
575 return 13
576}
577
578suspend fun doSomethingUsefulTwo(): Int {
579 delay(1000L) // pretend we are doing something useful here, too
580 return 29
581}
582```
583
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300584<!--- INCLUDE .*/example-compose-([0-9]+).kt -->
585
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300586What do we do if need to invoke them _sequentially_ -- first `doSomethingUsefulOne` _and then_
587`doSomethingUsefulTwo` and compute the sum of their results?
588In practise we do this if we use the results of the first function to make a decision on whether we need
589to invoke the second one or to decide on how to invoke it.
590
591We just use a normal sequential invocation, because the code in the coroutine, just like in the regular
Roman Elizarov32d95322017-02-09 15:57:31 +0300592code, is _sequential_ by default. The following example demonstrates it by measuring the total
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300593time it takes to execute both suspending functions:
594
595```kotlin
596fun main(args: Array<String>) = runBlocking<Unit> {
597 val time = measureTimeMillis {
598 val one = doSomethingUsefulOne()
599 val two = doSomethingUsefulTwo()
600 println("The answer is ${one + two}")
601 }
602 println("Completed in $time ms")
603}
604```
605
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300606> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-01.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300607
608It produces something like this:
609
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300610```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300611The answer is 42
612Completed in 2017 ms
613```
614
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300615<!--- TEST FLEXIBLE_TIME -->
616
Roman Elizarov32d95322017-02-09 15:57:31 +0300617### Concurrent using async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300618
619What if there are no dependencies between invocation of `doSomethingUsefulOne` and `doSomethingUsefulTwo` and
Roman Elizarov419a6c82017-02-09 18:36:22 +0300620we want to get the answer faster, by doing both _concurrently_? This is where [async] comes to help.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300621
Roman Elizarov419a6c82017-02-09 18:36:22 +0300622Conceptually, [async] is just like [launch]. It starts a separate coroutine which is a light-weight thread
623that works concurrently with all the other coroutines. The difference is that `launch` returns a [Job] and
624does not carry any resulting value, while `async` returns a [Deferred] -- a light-weight non-blocking future
Roman Elizarov32d95322017-02-09 15:57:31 +0300625that represents a promise to provide a result later. You can use `.await()` on a deferred value to get its eventual result,
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300626but `Deferred` is also a `Job`, so you can cancel it if needed.
627
628```kotlin
629fun main(args: Array<String>) = runBlocking<Unit> {
630 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300631 val one = async(CommonPool) { doSomethingUsefulOne() }
632 val two = async(CommonPool) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300633 println("The answer is ${one.await() + two.await()}")
634 }
635 println("Completed in $time ms")
636}
637```
638
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300639> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-02.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300640
641It produces something like this:
642
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300643```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300644The answer is 42
645Completed in 1017 ms
646```
647
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300648<!--- TEST FLEXIBLE_TIME -->
649
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300650This is twice as fast, because we have concurrent execution of two coroutines.
651Note, that concurrency with coroutines is always explicit.
652
Roman Elizarov32d95322017-02-09 15:57:31 +0300653### Lazily started async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300654
Roman Elizarov419a6c82017-02-09 18:36:22 +0300655There is a laziness option to [async] with `start = false` parameter.
656It starts coroutine only when its result is needed by some
657[await][Deferred.await] or if a [start][Job.start] function
Roman Elizarov32d95322017-02-09 15:57:31 +0300658is invoked. Run the following example that differs from the previous one only by this option:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300659
660```kotlin
661fun main(args: Array<String>) = runBlocking<Unit> {
662 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300663 val one = async(CommonPool, start = false) { doSomethingUsefulOne() }
664 val two = async(CommonPool, start = false) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300665 println("The answer is ${one.await() + two.await()}")
666 }
667 println("Completed in $time ms")
668}
669```
670
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300671> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-03.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300672
673It produces something like this:
674
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300675```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300676The answer is 42
677Completed in 2017 ms
678```
679
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300680<!--- TEST FLEXIBLE_TIME -->
681
Roman Elizarov32d95322017-02-09 15:57:31 +0300682So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
683for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
684the standard `lazy` function in cases when computation of the value involves suspending functions.
685
686### Async-style functions
687
688We can define async-style functions that invoke `doSomethingUsefulOne` and `doSomethingUsefulTwo`
Roman Elizarov419a6c82017-02-09 18:36:22 +0300689_asynchronously_ using [async] coroutine builder. It is a good style to name such functions with
Roman Elizarov32d95322017-02-09 15:57:31 +0300690either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous
691computation and one needs to use the resulting deferred value to get the result.
692
693```kotlin
694// The result type of asyncSomethingUsefulOne is Deferred<Int>
695fun asyncSomethingUsefulOne() = async(CommonPool) {
696 doSomethingUsefulOne()
697}
698
699// The result type of asyncSomethingUsefulTwo is Deferred<Int>
700fun asyncSomethingUsefulTwo() = async(CommonPool) {
701 doSomethingUsefulTwo()
702}
703```
704
705Note, that these `asyncXXX` function are **not** _suspending_ functions. They can be used from anywhere.
706However, their use always implies asynchronous (here meaning _concurrent_) execution of their action
707with the invoking code.
708
709The following example shows their use outside of coroutine:
710
711```kotlin
712// note, that we don't have `runBlocking` to the right of `main` in this example
713fun main(args: Array<String>) {
714 val time = measureTimeMillis {
715 // we can initiate async actions outside of a coroutine
716 val one = asyncSomethingUsefulOne()
717 val two = asyncSomethingUsefulTwo()
718 // but waiting for a result must involve either suspending or blocking.
719 // here we use `runBlocking { ... }` to block the main thread while waiting for the result
720 runBlocking {
721 println("The answer is ${one.await() + two.await()}")
722 }
723 }
724 println("Completed in $time ms")
725}
726```
727
728> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300729
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300730<!--- TEST FLEXIBLE_TIME
731The answer is 42
732Completed in 1085 ms
733-->
734
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300735## Coroutine context and dispatchers
736
Roman Elizarov32d95322017-02-09 15:57:31 +0300737We've already seen `launch(CommonPool) {...}`, `async(CommonPool) {...}`, `run(NonCancellable) {...}`, etc.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300738In these code snippets [CommonPool] and [NonCancellable] are _coroutine contexts_.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300739This section covers other available choices.
740
741### Dispatchers and threads
742
Roman Elizarov419a6c82017-02-09 18:36:22 +0300743Coroutine context includes a [_coroutine dispatcher_][CoroutineDispatcher] which determines what thread or threads
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300744the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
745to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:
746
747```kotlin
748fun main(args: Array<String>) = runBlocking<Unit> {
749 val jobs = arrayListOf<Job>()
750 jobs += launch(Unconfined) { // not confined -- will work with main thread
751 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
752 }
753 jobs += launch(context) { // context of the parent, runBlocking coroutine
754 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
755 }
756 jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
757 println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
758 }
759 jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
760 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
761 }
762 jobs.forEach { it.join() }
763}
764```
765
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300766> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-01.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300767
768It produces the following output (maybe in different order):
769
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300770```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300771 'Unconfined': I'm working in thread main
772 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
773 'newSTC': I'm working in thread MyOwnThread
774 'context': I'm working in thread main
775```
776
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300777<!--- TEST LINES_START_UNORDERED -->
778
Roman Elizarov419a6c82017-02-09 18:36:22 +0300779The difference between parent [context][CoroutineScope.context] and [Unconfined] context will be shown later.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300780
781### Unconfined vs confined dispatcher
782
Roman Elizarov419a6c82017-02-09 18:36:22 +0300783The [Unconfined] coroutine dispatcher starts coroutine in the caller thread, but only until the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300784first suspension point. After suspension it resumes in the thread that is fully determined by the
785suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not
786consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
787
Roman Elizarov419a6c82017-02-09 18:36:22 +0300788On the other side, [context][CoroutineScope.context] property that is available inside the block of any coroutine
789via [CoroutineScope] interface, is a reference to a context of this particular coroutine.
790This way, a parent context can be inherited. The default context of [runBlocking], in particular,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300791is confined to be invoker thread, so inheriting it has the effect of confining execution to
792this thread with a predictable FIFO scheduling.
793
794```kotlin
795fun main(args: Array<String>) = runBlocking<Unit> {
796 val jobs = arrayListOf<Job>()
797 jobs += launch(Unconfined) { // not confined -- will work with main thread
798 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarovd0021622017-03-10 15:43:38 +0300799 delay(500)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300800 println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
801 }
802 jobs += launch(context) { // context of the parent, runBlocking coroutine
803 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
804 delay(1000)
805 println(" 'context': After delay in thread ${Thread.currentThread().name}")
806 }
807 jobs.forEach { it.join() }
808}
809```
810
Roman Elizarovd0021622017-03-10 15:43:38 +0300811> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-02.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300812
813Produces the output:
814
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300815```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300816 'Unconfined': I'm working in thread main
817 'context': I'm working in thread main
818 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
819 'context': After delay in thread main
820```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300821
822<!--- TEST LINES_START -->
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300823
Roman Elizarov7c864d82017-02-27 10:17:50 +0300824So, the coroutine that had inherited `context` of `runBlocking {...}` continues to execute in the `main` thread,
Roman Elizarov419a6c82017-02-09 18:36:22 +0300825while the unconfined one had resumed in the scheduler thread that [delay] function is using.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300826
827### Debugging coroutines and threads
828
Roman Elizarov419a6c82017-02-09 18:36:22 +0300829Coroutines can suspend on one thread and resume on another thread with [Unconfined] dispatcher or
830with a multi-threaded dispatcher like [CommonPool]. Even with a single-threaded dispatcher it might be hard to
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300831figure out what coroutine was doing what, where, and when. The common approach to debugging applications with
832threads is to print the thread name in the log file on each log statement. This feature is universally supported
833by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
834`kotlinx.coroutines` includes debugging facilities to make it easier.
835
836Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
837
838```kotlin
839fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
840
841fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov32d95322017-02-09 15:57:31 +0300842 val a = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300843 log("I'm computing a piece of the answer")
844 6
845 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300846 val b = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300847 log("I'm computing another piece of the answer")
848 7
849 }
850 log("The answer is ${a.await() * b.await()}")
851}
852```
853
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300854> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300855
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300856There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300857and two coroutines computing deferred values `a` (#2) and `b` (#3).
858They are all executing in the context of `runBlocking` and are confined to the main thread.
859The output of this code is:
860
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300861```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300862[main @coroutine#2] I'm computing a piece of the answer
863[main @coroutine#3] I'm computing another piece of the answer
864[main @coroutine#1] The answer is 42
865```
866
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300867<!--- TEST -->
868
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300869The `log` function prints the name of the thread in square brackets and you can see, that it is the `main`
870thread, but the identifier of the currently executing coroutine is appended to it. This identifier
871is consecutively assigned to all created coroutines when debugging mode is turned on.
872
Roman Elizarov419a6c82017-02-09 18:36:22 +0300873You can read more about debugging facilities in the documentation for [newCoroutineContext] function.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300874
875### Jumping between threads
876
877Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
878
879```kotlin
880fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
881
882fun main(args: Array<String>) {
883 val ctx1 = newSingleThreadContext("Ctx1")
884 val ctx2 = newSingleThreadContext("Ctx2")
885 runBlocking(ctx1) {
886 log("Started in ctx1")
887 run(ctx2) {
888 log("Working in ctx2")
889 }
890 log("Back to ctx1")
891 }
892}
893```
894
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300895> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300896
Roman Elizarov419a6c82017-02-09 18:36:22 +0300897It demonstrates two new techniques. One is using [runBlocking] with an explicitly specified context, and
898the second one is using [run] function to change a context of a coroutine while still staying in the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300899same coroutine as you can see in the output below:
900
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300901```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300902[Ctx1 @coroutine#1] Started in ctx1
903[Ctx2 @coroutine#1] Working in ctx2
904[Ctx1 @coroutine#1] Back to ctx1
905```
906
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300907<!--- TEST -->
908
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300909### Job in the context
910
Roman Elizarov419a6c82017-02-09 18:36:22 +0300911The coroutine [Job] is part of its context. The coroutine can retrieve it from its own context
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300912using `context[Job]` expression:
913
914```kotlin
915fun main(args: Array<String>) = runBlocking<Unit> {
916 println("My job is ${context[Job]}")
917}
918```
919
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300920> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-05.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300921
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300922It produces somethine like
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300923
924```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300925My job is BlockingCoroutine{Active}@65ae6ba4
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300926```
927
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300928<!--- TEST lines.size == 1 && lines[0].startsWith("My job is BlockingCoroutine{Active}@") -->
929
Roman Elizarov419a6c82017-02-09 18:36:22 +0300930So, [isActive][CoroutineScope.isActive] in [CoroutineScope] is just a convenient shortcut for `context[Job]!!.isActive`.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300931
932### Children of a coroutine
933
Roman Elizarov419a6c82017-02-09 18:36:22 +0300934When [context][CoroutineScope.context] of a coroutine is used to launch another coroutine,
935the [Job] of the new coroutine becomes
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300936a _child_ of the parent coroutine's job. When the parent coroutine is cancelled, all its children
937are recursively cancelled, too.
938
939```kotlin
940fun main(args: Array<String>) = runBlocking<Unit> {
941 // start a coroutine to process some kind of incoming request
942 val request = launch(CommonPool) {
943 // it spawns two other jobs, one with its separate context
944 val job1 = launch(CommonPool) {
945 println("job1: I have my own context and execute independently!")
946 delay(1000)
947 println("job1: I am not affected by cancellation of the request")
948 }
949 // and the other inherits the parent context
950 val job2 = launch(context) {
951 println("job2: I am a child of the request coroutine")
952 delay(1000)
953 println("job2: I will not execute this line if my parent request is cancelled")
954 }
955 // request completes when both its sub-jobs complete:
956 job1.join()
957 job2.join()
958 }
959 delay(500)
960 request.cancel() // cancel processing of the request
961 delay(1000) // delay a second to see what happens
962 println("main: Who has survived request cancellation?")
963}
964```
965
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300966> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-06.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300967
968The output of this code is:
969
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300970```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300971job1: I have my own context and execute independently!
972job2: I am a child of the request coroutine
973job1: I am not affected by cancellation of the request
974main: Who has survived request cancellation?
975```
976
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300977<!--- TEST -->
978
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300979### Combining contexts
980
981Coroutine context can be combined using `+` operator. The context on the right-hand side replaces relevant entries
Roman Elizarov419a6c82017-02-09 18:36:22 +0300982of the context on the left-hand side. For example, a [Job] of the parent coroutine can be inherited, while
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300983its dispatcher replaced:
984
985```kotlin
986fun main(args: Array<String>) = runBlocking<Unit> {
987 // start a coroutine to process some kind of incoming request
988 val request = launch(context) { // use the context of `runBlocking`
989 // spawns CPU-intensive child job in CommonPool !!!
990 val job = launch(context + CommonPool) {
991 println("job: I am a child of the request coroutine, but with a different dispatcher")
992 delay(1000)
993 println("job: I will not execute this line if my parent request is cancelled")
994 }
995 job.join() // request completes when its sub-job completes
996 }
997 delay(500)
998 request.cancel() // cancel processing of the request
999 delay(1000) // delay a second to see what happens
1000 println("main: Who has survived request cancellation?")
1001}
1002```
1003
Roman Elizarovfa7723e2017-02-06 11:17:51 +03001004> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-07.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001005
1006The expected outcome of this code is:
1007
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001008```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001009job: I am a child of the request coroutine, but with a different dispatcher
1010main: Who has survived request cancellation?
1011```
1012
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001013<!--- TEST -->
1014
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001015### Naming coroutines for debugging
1016
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001017Automatically assigned ids are good when coroutines log often and you just need to correlate log records
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001018coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
1019or doing some specific background task, it is better to name it explicitly for debugging purposes.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001020[CoroutineName] serves the same function as a thread name. It'll get displayed in the thread name that
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001021is executing this coroutine when debugging more is turned on.
1022
1023The following example demonstrates this concept:
1024
1025```kotlin
1026fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
1027
1028fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
1029 log("Started main coroutine")
1030 // run two background value computations
Roman Elizarov32d95322017-02-09 15:57:31 +03001031 val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001032 log("Computing v1")
1033 delay(500)
1034 252
1035 }
Roman Elizarov32d95322017-02-09 15:57:31 +03001036 val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001037 log("Computing v2")
1038 delay(1000)
1039 6
1040 }
1041 log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
1042}
1043```
1044
Roman Elizarovfa7723e2017-02-06 11:17:51 +03001045> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001046
1047The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
1048
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001049```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001050[main @main#1] Started main coroutine
1051[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
1052[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
1053[main @main#1] The answer for v1 / v2 = 42
1054```
Roman Elizarov1293ccd2017-02-01 18:49:54 +03001055
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001056<!--- TEST FLEXIBLE_THREAD -->
1057
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001058### Cancellation via explicit job
1059
1060Let us put our knowledge about contexts, children and jobs together. Assume that our application has
1061an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application
1062and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch
1063and update data, do animations, etc. All of these coroutines must be cancelled when activity is destroyed
1064to avoid memory leaks.
1065
1066We can manage a lifecycle of our coroutines by creating an instance of [Job] that is tied to
1067the lifecycle of our activity. A job instance is created using [Job()][Job.invoke] factory function
1068as the following example shows. We need to make sure that all the coroutines are started
1069with this job in their context and then a single invocation of [Job.cancel] terminates them all.
1070
1071```kotlin
1072fun main(args: Array<String>) = runBlocking<Unit> {
1073 val job = Job() // create a job object to manage our lifecycle
1074 // now launch ten coroutines for a demo, each working for a different time
1075 val coroutines = List(10) { i ->
1076 // they are all children of our job object
1077 launch(context + job) { // we use the context of main runBlocking thread, but with our own job object
1078 delay(i * 200L) // variable delay 0ms, 200ms, 400ms, ... etc
1079 println("Coroutine $i is done")
1080 }
1081 }
1082 println("Launched ${coroutines.size} coroutines")
1083 delay(500L) // delay for half a second
1084 println("Cancelling job!")
1085 job.cancel() // cancel our job.. !!!
1086 delay(1000L) // delay for more to see if our coroutines are still working
1087}
1088```
1089
1090> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt)
1091
1092The output of this example is:
1093
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001094```text
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001095Launched 10 coroutines
1096Coroutine 0 is done
1097Coroutine 1 is done
1098Coroutine 2 is done
1099Cancelling job!
1100```
1101
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001102<!--- TEST -->
1103
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001104As you can see, only the first three coroutines had printed a message and the others were cancelled
1105by a single invocation of `job.cancel()`. So all we need to do in our hypothetical Android
1106application is to create a parent job object when activity is created, use it for child coroutines,
1107and cancel it when activity is destroyed.
1108
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001109## Channels
Roman Elizarov7deefb82017-01-31 10:33:17 +03001110
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001111Deferred values provide a convenient way to transfer a single value between coroutines.
1112Channels provide a way to transfer a stream of values.
1113
1114<!--- INCLUDE .*/example-channel-([0-9]+).kt
1115import kotlinx.coroutines.experimental.channels.*
1116-->
1117
1118### Channel basics
1119
Roman Elizarov419a6c82017-02-09 18:36:22 +03001120A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
1121instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
1122a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001123
1124```kotlin
1125fun main(args: Array<String>) = runBlocking<Unit> {
1126 val channel = Channel<Int>()
1127 launch(CommonPool) {
1128 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
1129 for (x in 1..5) channel.send(x * x)
1130 }
1131 // here we print five received integers:
1132 repeat(5) { println(channel.receive()) }
1133 println("Done!")
1134}
1135```
1136
1137> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
1138
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001139The output of this code is:
1140
1141```text
11421
11434
11449
114516
114625
1147Done!
1148```
1149
1150<!--- TEST -->
1151
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001152### Closing and iteration over channels
1153
1154Unlike a queue, a channel can be closed to indicate that no more elements are coming.
1155On the receiver side it is convenient to use a regular `for` loop to receive elements
1156from the channel.
1157
Roman Elizarov419a6c82017-02-09 18:36:22 +03001158Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001159The iteration stops as soon as this close token is received, so there is a guarantee
1160that all previously sent elements before the close are received:
1161
1162```kotlin
1163fun main(args: Array<String>) = runBlocking<Unit> {
1164 val channel = Channel<Int>()
1165 launch(CommonPool) {
1166 for (x in 1..5) channel.send(x * x)
1167 channel.close() // we're done sending
1168 }
1169 // here we print received values using `for` loop (until the channel is closed)
1170 for (y in channel) println(y)
1171 println("Done!")
1172}
1173```
1174
1175> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
1176
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001177<!--- TEST
11781
11794
11809
118116
118225
1183Done!
1184-->
1185
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001186### Building channel producers
1187
Roman Elizarova5e653f2017-02-13 13:49:55 +03001188The pattern where a coroutine is producing a sequence of elements is quite common.
1189This is a part of _producer-consumer_ pattern that is often found in concurrent code.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001190You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
Roman Elizarova5e653f2017-02-13 13:49:55 +03001191to common sense that results must be returned from functions.
1192
Roman Elizarov86349be2017-03-17 16:47:37 +03001193There is a convenience coroutine builder named [produce] that makes it easy to do it right on producer side,
1194and an extension function [consumeEach], that can replace a `for` loop on the consumer side:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001195
1196```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001197fun produceSquares() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001198 for (x in 1..5) send(x * x)
1199}
1200
1201fun main(args: Array<String>) = runBlocking<Unit> {
1202 val squares = produceSquares()
Roman Elizarov86349be2017-03-17 16:47:37 +03001203 squares.consumeEach { println(it) }
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001204 println("Done!")
1205}
1206```
1207
1208> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
1209
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001210<!--- TEST
12111
12124
12139
121416
121525
1216Done!
1217-->
1218
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001219### Pipelines
1220
1221Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
1222
1223```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001224fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001225 var x = 1
1226 while (true) send(x++) // infinite stream of integers starting from 1
1227}
1228```
1229
Roman Elizarova5e653f2017-02-13 13:49:55 +03001230And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001231In the below example the numbers are just squared:
1232
1233```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001234fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001235 for (x in numbers) send(x * x)
1236}
1237```
1238
Roman Elizarova5e653f2017-02-13 13:49:55 +03001239The main code starts and connects the whole pipeline:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001240
1241```kotlin
1242fun main(args: Array<String>) = runBlocking<Unit> {
1243 val numbers = produceNumbers() // produces integers from 1 and on
1244 val squares = square(numbers) // squares integers
1245 for (i in 1..5) println(squares.receive()) // print first five
1246 println("Done!") // we are done
1247 squares.cancel() // need to cancel these coroutines in a larger app
1248 numbers.cancel()
1249}
1250```
1251
1252> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
1253
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001254<!--- TEST
12551
12564
12579
125816
125925
1260Done!
1261-->
1262
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001263We don't have to cancel these coroutines in this example app, because
1264[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
1265but in a larger app we'll need to stop our pipeline if we don't need it anymore.
1266Alternatively, we could have run pipeline coroutines as
1267[children of a coroutine](#children-of-a-coroutine).
1268
1269### Prime numbers with pipeline
1270
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001271Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001272of coroutines. We start with an infinite sequence of numbers. This time we introduce an
1273explicit context parameter, so that caller can control where our coroutines run:
1274
1275<!--- INCLUDE kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
1276import kotlin.coroutines.experimental.CoroutineContext
1277-->
1278
1279```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001280fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001281 var x = start
1282 while (true) send(x++) // infinite stream of integers from start
1283}
1284```
1285
1286The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1287that are divisible by the given prime number:
1288
1289```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001290fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001291 for (x in numbers) if (x % prime != 0) send(x)
1292}
1293```
1294
1295Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001296and launching new pipeline stage for each prime number found:
1297
1298```
Roman Elizarova5e653f2017-02-13 13:49:55 +03001299numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
Roman Elizarov62500ba2017-02-09 18:55:40 +03001300```
1301
1302The following example prints the first ten prime numbers,
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001303running the whole pipeline in the context of the main thread:
1304
1305```kotlin
1306fun main(args: Array<String>) = runBlocking<Unit> {
1307 var cur = numbersFrom(context, 2)
1308 for (i in 1..10) {
1309 val prime = cur.receive()
1310 println(prime)
1311 cur = filter(context, cur, prime)
1312 }
1313}
1314```
1315
1316> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
1317
1318The output of this code is:
1319
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001320```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +030013212
13223
13235
13247
132511
132613
132717
132819
132923
133029
1331```
1332
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001333<!--- TEST -->
1334
Roman Elizarova5e653f2017-02-13 13:49:55 +03001335Note, that you can build the same pipeline using `buildIterator` coroutine builder from the standard library.
1336Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001337`ReceiveChannel` with `Iterator`, and get rid of the context. You will not need `runBlocking` either.
1338However, the benefit of a pipeline that uses channels as shown above is that it can actually use
1339multiple CPU cores if you run it in [CommonPool] context.
1340
Roman Elizarova5e653f2017-02-13 13:49:55 +03001341Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
Roman Elizarov62500ba2017-02-09 18:55:40 +03001342other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
1343built using `buildSeqeunce`/`buildIterator`, because they do not allow arbitrary suspension, unlike
Roman Elizarova5e653f2017-02-13 13:49:55 +03001344`produce` which is fully asynchronous.
Roman Elizarov62500ba2017-02-09 18:55:40 +03001345
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001346### Fan-out
1347
1348Multiple coroutines may receive from the same channel, distributing work between themselves.
1349Let us start with a producer coroutine that is periodically producing integers
1350(ten numbers per second):
1351
1352```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001353fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001354 var x = 1 // start from 1
1355 while (true) {
1356 send(x++) // produce next
1357 delay(100) // wait 0.1s
1358 }
1359}
1360```
1361
1362Then we can have several processor coroutines. In this example, they just print their id and
1363received number:
1364
1365```kotlin
1366fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
Roman Elizarov86349be2017-03-17 16:47:37 +03001367 channel.consumeEach {
1368 println("Processor #$id received $it")
Roman Elizarovec9384c2017-03-02 22:09:08 +03001369 }
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001370}
1371```
1372
1373Now let us launch five processors and let them work for a second. See what happens:
1374
1375```kotlin
1376fun main(args: Array<String>) = runBlocking<Unit> {
1377 val producer = produceNumbers()
1378 repeat(5) { launchProcessor(it, producer) }
1379 delay(1000)
1380 producer.cancel() // cancel producer coroutine and thus kill them all
1381}
1382```
1383
1384> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
1385
1386The output will be similar to the the following one, albeit the processor ids that receive
1387each specific integer may be different:
1388
1389```
1390Processor #2 received 1
1391Processor #4 received 2
1392Processor #0 received 3
1393Processor #1 received 4
1394Processor #3 received 5
1395Processor #2 received 6
1396Processor #4 received 7
1397Processor #0 received 8
1398Processor #1 received 9
1399Processor #3 received 10
1400```
1401
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001402<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
1403
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001404Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1405over the channel that processor coroutines are doing.
1406
1407### Fan-in
1408
1409Multiple coroutines may send to the same channel.
1410For example, let us have a channel of strings, and a suspending function that
1411repeatedly sends a specified string to this channel with a specified delay:
1412
1413```kotlin
1414suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1415 while (true) {
1416 delay(time)
1417 channel.send(s)
1418 }
1419}
1420```
1421
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001422Now, let us see what happens if we launch a couple of coroutines sending strings
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001423(in this example we launch them in the context of the main thread):
1424
1425```kotlin
1426fun main(args: Array<String>) = runBlocking<Unit> {
1427 val channel = Channel<String>()
1428 launch(context) { sendString(channel, "foo", 200L) }
1429 launch(context) { sendString(channel, "BAR!", 500L) }
1430 repeat(6) { // receive first six
1431 println(channel.receive())
1432 }
1433}
1434```
1435
1436> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
1437
1438The output is:
1439
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001440```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001441foo
1442foo
1443BAR!
1444foo
1445foo
1446BAR!
1447```
1448
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001449<!--- TEST -->
1450
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001451### Buffered channels
1452
1453The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1454meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1455if receive is invoked first, it is suspended until send is invoked.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001456
Roman Elizarova5e653f2017-02-13 13:49:55 +03001457Both [Channel()][Channel.invoke] factory function and [produce] builder take an optional `capacity` parameter to
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001458specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1459similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1460
1461Take a look at the behavior of the following code:
1462
1463```kotlin
1464fun main(args: Array<String>) = runBlocking<Unit> {
1465 val channel = Channel<Int>(4) // create buffered channel
1466 launch(context) { // launch sender coroutine
1467 repeat(10) {
1468 println("Sending $it") // print before sending each element
1469 channel.send(it) // will suspend when buffer is full
1470 }
1471 }
1472 // don't receive anything... just wait....
1473 delay(1000)
1474}
1475```
1476
1477> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
1478
1479It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1480
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001481```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001482Sending 0
1483Sending 1
1484Sending 2
1485Sending 3
1486Sending 4
1487```
1488
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001489<!--- TEST -->
1490
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001491The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001492
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001493
1494### Channels are fair
1495
1496Send and receive operations to channels are _fair_ with respect to the order of their invocation from
1497multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
1498gets the element. In the following example two coroutines "ping" and "pong" are
1499receiving the "ball" object from the shared "table" channel.
1500
1501```kotlin
1502data class Ball(var hits: Int)
1503
1504fun main(args: Array<String>) = runBlocking<Unit> {
1505 val table = Channel<Ball>() // a shared table
1506 launch(context) { player("ping", table) }
1507 launch(context) { player("pong", table) }
1508 table.send(Ball(0)) // serve the ball
1509 delay(1000) // delay 1 second
1510 table.receive() // game over, grab the ball
1511}
1512
1513suspend fun player(name: String, table: Channel<Ball>) {
1514 for (ball in table) { // receive the ball in a loop
1515 ball.hits++
1516 println("$name $ball")
Roman Elizarovf526b132017-03-10 16:07:14 +03001517 delay(300) // wait a bit
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001518 table.send(ball) // send the ball back
1519 }
1520}
1521```
1522
1523> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-09.kt)
1524
1525The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
1526coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
1527received by the "pong" coroutine, because it was already waiting for it:
1528
1529```text
1530ping Ball(hits=1)
1531pong Ball(hits=2)
1532ping Ball(hits=3)
1533pong Ball(hits=4)
1534ping Ball(hits=5)
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001535```
1536
1537<!--- TEST -->
1538
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001539## Shared mutable state and concurrency
1540
1541Coroutines can be executed concurrently using a multi-threaded dispatcher like [CommonPool]. It presents
1542all the usual concurrency problems. The main problem being synchronization of access to **shared mutable state**.
1543Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
1544but others are unique.
1545
1546### The problem
1547
Roman Elizarov1e459602017-02-27 11:05:17 +03001548Let us launch a thousand coroutines all doing the same action thousand times (for a total of a million executions).
1549We'll also measure their completion time for further comparisons:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001550
1551<!--- INCLUDE .*/example-sync-([0-9]+).kt
Roman Elizarov1e459602017-02-27 11:05:17 +03001552import kotlin.coroutines.experimental.CoroutineContext
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001553import kotlin.system.measureTimeMillis
1554-->
1555
Roman Elizarov1e459602017-02-27 11:05:17 +03001556<!--- INCLUDE .*/example-sync-03.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001557import java.util.concurrent.atomic.AtomicInteger
1558-->
1559
Roman Elizarov1e459602017-02-27 11:05:17 +03001560<!--- INCLUDE .*/example-sync-06.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001561import kotlinx.coroutines.experimental.sync.Mutex
1562-->
1563
Roman Elizarov1e459602017-02-27 11:05:17 +03001564<!--- INCLUDE .*/example-sync-07.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001565import kotlinx.coroutines.experimental.channels.*
1566-->
1567
1568```kotlin
Roman Elizarov1e459602017-02-27 11:05:17 +03001569suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
1570 val n = 1000 // number of coroutines to launch
1571 val k = 1000 // times an action is repeated by each coroutine
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001572 val time = measureTimeMillis {
1573 val jobs = List(n) {
Roman Elizarov1e459602017-02-27 11:05:17 +03001574 launch(context) {
1575 repeat(k) { action() }
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001576 }
1577 }
1578 jobs.forEach { it.join() }
1579 }
Roman Elizarov1e459602017-02-27 11:05:17 +03001580 println("Completed ${n * k} actions in $time ms")
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001581}
1582```
1583
1584<!--- INCLUDE .*/example-sync-([0-9]+).kt -->
1585
Roman Elizarov1e459602017-02-27 11:05:17 +03001586We start with a very simple action that increments a shared mutable variable using
1587multi-threaded [CommonPool] context.
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001588
1589```kotlin
1590var counter = 0
1591
1592fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001593 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001594 counter++
1595 }
1596 println("Counter = $counter")
1597}
1598```
1599
1600> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt)
1601
Roman Elizarov1e459602017-02-27 11:05:17 +03001602<!--- TEST LINES_START
1603Completed 1000000 actions in
1604Counter =
1605-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001606
Roman Elizarov1e459602017-02-27 11:05:17 +03001607What does it print at the end? It is highly unlikely to ever print "Counter = 1000000", because a thousand coroutines
1608increment the `counter` concurrently from multiple threads without any synchronization.
1609
1610### Volatiles are of no help
1611
1612There is common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
1613
1614```kotlin
1615@Volatile // in Kotlin `volatile` is an annotation
1616var counter = 0
1617
1618fun main(args: Array<String>) = runBlocking<Unit> {
1619 massiveRun(CommonPool) {
1620 counter++
1621 }
1622 println("Counter = $counter")
1623}
1624```
1625
1626> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt)
1627
1628<!--- TEST LINES_START
1629Completed 1000000 actions in
1630Counter =
1631-->
1632
1633This code works slower, but we still don't get "Counter = 1000000" at the end, because volatile variables guarantee
1634linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but
1635do not provide atomicity of larger actions (increment in our case).
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001636
1637### Thread-safe data structures
1638
1639The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
1640linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
1641operations that needs to be performed on a shared state.
Roman Elizarov1e459602017-02-27 11:05:17 +03001642In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001643
1644```kotlin
1645var counter = AtomicInteger()
1646
1647fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001648 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001649 counter.incrementAndGet()
1650 }
1651 println("Counter = ${counter.get()}")
1652}
1653```
1654
Roman Elizarov1e459602017-02-27 11:05:17 +03001655> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001656
Roman Elizarov1e459602017-02-27 11:05:17 +03001657<!--- TEST ARBITRARY_TIME
1658Completed 1000000 actions in xxx ms
1659Counter = 1000000
1660-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001661
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001662This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
1663standard data structures and basic operations on them. However, it does not easily scale to complex
1664state or to complex operations that do not have ready-to-use thread-safe implementations.
1665
Roman Elizarov1e459602017-02-27 11:05:17 +03001666### Thread confinement fine-grained
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001667
Roman Elizarov1e459602017-02-27 11:05:17 +03001668_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001669state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
1670the single event-dispatch/application thread. It is easy to apply with coroutines by using a
1671single-threaded context:
1672
1673```kotlin
1674val counterContext = newSingleThreadContext("CounterContext")
1675var counter = 0
1676
1677fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001678 massiveRun(CommonPool) { // run each coroutine in CommonPool
1679 run(counterContext) { // but confine each increment to the single-threaded context
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001680 counter++
1681 }
1682 }
1683 println("Counter = $counter")
1684}
1685```
1686
Roman Elizarov1e459602017-02-27 11:05:17 +03001687> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001688
Roman Elizarov1e459602017-02-27 11:05:17 +03001689<!--- TEST ARBITRARY_TIME
1690Completed 1000000 actions in xxx ms
1691Counter = 1000000
1692-->
1693
1694This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches
1695from multi-threaded `CommonPool` context to the single-threaded context using [run] block.
1696
1697### Thread confinement coarse-grained
1698
1699In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic
1700are confined to the single thread. The following example does it like that, running each coroutine in
1701the single-threaded context to start with.
1702
1703```kotlin
1704val counterContext = newSingleThreadContext("CounterContext")
1705var counter = 0
1706
1707fun main(args: Array<String>) = runBlocking<Unit> {
1708 massiveRun(counterContext) { // run each coroutine in the single-threaded context
1709 counter++
1710 }
1711 println("Counter = $counter")
1712}
1713```
1714
1715> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt)
1716
1717<!--- TEST ARBITRARY_TIME
1718Completed 1000000 actions in xxx ms
1719Counter = 1000000
1720-->
1721
1722This now works much faster and produces correct result.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001723
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001724### Mutual exclusion
1725
1726Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
1727that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
1728Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
1729delimit a critical section. The key difference is that `Mutex.lock` is a suspending function. It does not block a thread.
1730
1731```kotlin
1732val mutex = Mutex()
1733var counter = 0
1734
1735fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001736 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001737 mutex.lock()
1738 try { counter++ }
1739 finally { mutex.unlock() }
1740 }
1741 println("Counter = $counter")
1742}
1743```
1744
Roman Elizarov1e459602017-02-27 11:05:17 +03001745> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001746
Roman Elizarov1e459602017-02-27 11:05:17 +03001747<!--- TEST ARBITRARY_TIME
1748Completed 1000000 actions in xxx ms
1749Counter = 1000000
1750-->
1751
1752The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations
1753where you absolutely must modify some shared state periodically, but there is no natural thread that this state
1754is confined to.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001755
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001756### Actors
1757
1758An actor is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine,
1759and a channel to communicate with other coroutines. A simple actor can be written as a function,
1760but an actor with a complex state is better suited for a class.
1761
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001762There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its
1763scope to receive messages from and combines the send channel into the resulting job object, so that a
1764single reference to the actor can be carried around as its handle.
1765
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001766```kotlin
1767// Message types for counterActor
1768sealed class CounterMsg
1769object IncCounter : CounterMsg() // one-way message to increment counter
1770class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply
1771
1772// This function launches a new counter actor
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001773fun counterActor() = actor<CounterMsg>(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001774 var counter = 0 // actor state
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001775 for (msg in channel) { // iterate over incoming messages
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001776 when (msg) {
1777 is IncCounter -> counter++
1778 is GetCounter -> msg.response.send(counter)
1779 }
1780 }
1781}
1782
1783fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001784 val counter = counterActor() // create the actor
Roman Elizarov1e459602017-02-27 11:05:17 +03001785 massiveRun(CommonPool) {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001786 counter.send(IncCounter)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001787 }
1788 val response = Channel<Int>()
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001789 counter.send(GetCounter(response))
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001790 println("Counter = ${response.receive()}")
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001791 counter.close() // shutdown the actor
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001792}
1793```
1794
Roman Elizarov1e459602017-02-27 11:05:17 +03001795> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001796
Roman Elizarov1e459602017-02-27 11:05:17 +03001797<!--- TEST ARBITRARY_TIME
1798Completed 1000000 actions in xxx ms
1799Counter = 1000000
1800-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001801
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001802It does not matter (for correctness) what context the actor itself is executed in. An actor is
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001803a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
1804works as a solution to the problem of shared mutable state.
1805
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001806Actor is more efficient than locking under load, because in this case it always has work to do and it does not
1807have to switch to a different context at all.
1808
1809> Note, that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated
1810 with the channel that it receives messages from, while a producer is associated with the channel that it
1811 sends elements to.
Roman Elizarov1e459602017-02-27 11:05:17 +03001812
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001813## Select expression
1814
Roman Elizarova84730b2017-02-22 11:58:50 +03001815Select expression makes it possible to await multiple suspending functions simultaneously and _select_
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001816the first one that becomes available.
1817
1818<!--- INCLUDE .*/example-select-([0-9]+).kt
1819import kotlinx.coroutines.experimental.channels.*
1820import kotlinx.coroutines.experimental.selects.*
1821-->
1822
1823### Selecting from channels
1824
Roman Elizarov57857202017-03-02 23:17:25 +03001825Let us have two producers of strings: `fizz` and `buzz`. The `fizz` produces "Fizz" string every 300 ms:
1826
1827<!--- INCLUDE .*/example-select-01.kt
1828import kotlin.coroutines.experimental.CoroutineContext
1829-->
1830
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001831```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001832fun fizz(context: CoroutineContext) = produce<String>(context) {
1833 while (true) { // sends "Fizz" every 300 ms
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001834 delay(300)
1835 send("Fizz")
1836 }
1837}
1838```
1839
Roman Elizarov57857202017-03-02 23:17:25 +03001840And the `buzz` produces "Buzz!" string every 500 ms:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001841
1842```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001843fun buzz(context: CoroutineContext) = produce<String>(context) {
1844 while (true) { // sends "Buzz!" every 500 ms
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001845 delay(500)
1846 send("Buzz!")
1847 }
1848}
1849```
1850
1851Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the
1852other. But [select] expression allows us to receive from _both_ simultaneously using its
1853[onReceive][SelectBuilder.onReceive] clauses:
1854
1855```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001856suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001857 select<Unit> { // <Unit> means that this select expression does not produce any result
1858 fizz.onReceive { value -> // this is the first select clause
1859 println("fizz -> '$value'")
1860 }
1861 buzz.onReceive { value -> // this is the second select clause
1862 println("buzz -> '$value'")
1863 }
1864 }
1865}
1866```
1867
Roman Elizarov57857202017-03-02 23:17:25 +03001868Let us run it all seven times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001869
1870```kotlin
1871fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov57857202017-03-02 23:17:25 +03001872 val fizz = fizz(context)
1873 val buzz = buzz(context)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001874 repeat(7) {
Roman Elizarov57857202017-03-02 23:17:25 +03001875 selectFizzBuzz(fizz, buzz)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001876 }
1877}
1878```
1879
1880> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt)
1881
1882The result of this code is:
1883
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001884```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001885fizz -> 'Fizz'
1886buzz -> 'Buzz!'
1887fizz -> 'Fizz'
1888fizz -> 'Fizz'
1889buzz -> 'Buzz!'
1890fizz -> 'Fizz'
1891buzz -> 'Buzz!'
1892```
1893
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001894<!--- TEST -->
1895
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001896### Selecting on close
1897
1898The [onReceive][SelectBuilder.onReceive] clause in `select` fails when the channel is closed and the corresponding
1899`select` throws an exception. We can use [onReceiveOrNull][SelectBuilder.onReceiveOrNull] clause to perform a
Roman Elizarova84730b2017-02-22 11:58:50 +03001900specific action when the channel is closed. The following example also shows that `select` is an expression that returns
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001901the result of its selected clause:
1902
1903```kotlin
1904suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
1905 select<String> {
1906 a.onReceiveOrNull { value ->
1907 if (value == null)
1908 "Channel 'a' is closed"
1909 else
1910 "a -> '$value'"
1911 }
1912 b.onReceiveOrNull { value ->
1913 if (value == null)
1914 "Channel 'b' is closed"
1915 else
1916 "b -> '$value'"
1917 }
1918 }
1919```
1920
Roman Elizarova84730b2017-02-22 11:58:50 +03001921Let's use it with channel `a` that produces "Hello" string four times and
1922channel `b` that produces "World" four times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001923
1924```kotlin
1925fun main(args: Array<String>) = runBlocking<Unit> {
1926 // we are using the context of the main thread in this example for predictability ...
1927 val a = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001928 repeat(4) { send("Hello $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001929 }
1930 val b = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001931 repeat(4) { send("World $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001932 }
1933 repeat(8) { // print first eight results
1934 println(selectAorB(a, b))
1935 }
1936}
1937```
1938
1939> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt)
1940
Roman Elizarova84730b2017-02-22 11:58:50 +03001941The result of this code is quite interesting, so we'll analyze it in mode detail:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001942
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001943```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001944a -> 'Hello 0'
1945a -> 'Hello 1'
1946b -> 'World 0'
1947a -> 'Hello 2'
1948a -> 'Hello 3'
1949b -> 'World 1'
1950Channel 'a' is closed
1951Channel 'a' is closed
1952```
1953
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001954<!--- TEST -->
1955
Roman Elizarova84730b2017-02-22 11:58:50 +03001956There are couple of observations to make out of it.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001957
1958First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time,
1959the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel,
Roman Elizarova84730b2017-02-22 11:58:50 +03001960being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001961time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
1962
1963The second observation, is that [onReceiveOrNull][SelectBuilder.onReceiveOrNull] gets immediately selected when the
1964channel is already closed.
1965
1966### Selecting to send
1967
1968Select expression has [onSend][SelectBuilder.onSend] clause that can be used for a great good in combination
1969with a biased nature of selection.
1970
Roman Elizarova84730b2017-02-22 11:58:50 +03001971Let us write an example of producer of integers that sends its values to a `side` channel when
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001972the consumers on its primary channel cannot keep up with it:
1973
1974```kotlin
1975fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
1976 for (num in 1..10) { // produce 10 numbers from 1 to 10
1977 delay(100) // every 100 ms
1978 select<Unit> {
Roman Elizarova84730b2017-02-22 11:58:50 +03001979 onSend(num) {} // Send to the primary channel
1980 side.onSend(num) {} // or to the side channel
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001981 }
1982 }
1983}
1984```
1985
1986Consumer is going to be quite slow, taking 250 ms to process each number:
1987
1988```kotlin
1989fun main(args: Array<String>) = runBlocking<Unit> {
1990 val side = Channel<Int>() // allocate side channel
1991 launch(context) { // this is a very fast consumer for the side channel
Roman Elizarov86349be2017-03-17 16:47:37 +03001992 side.consumeEach { println("Side channel has $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001993 }
Roman Elizarov86349be2017-03-17 16:47:37 +03001994 produceNumbers(side).consumeEach {
1995 println("Consuming $it")
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001996 delay(250) // let us digest the consumed number properly, do not hurry
1997 }
1998 println("Done consuming")
1999}
2000```
2001
2002> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt)
2003
2004So let us see what happens:
2005
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002006```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002007Consuming 1
2008Side channel has 2
2009Side channel has 3
2010Consuming 4
2011Side channel has 5
2012Side channel has 6
2013Consuming 7
2014Side channel has 8
2015Side channel has 9
2016Consuming 10
2017Done consuming
2018```
2019
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002020<!--- TEST -->
2021
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002022### Selecting deferred values
2023
Roman Elizarova84730b2017-02-22 11:58:50 +03002024Deferred values can be selected using [onAwait][SelectBuilder.onAwait] clause.
2025Let us start with an async function that returns a deferred string value after
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002026a random delay:
2027
2028<!--- INCLUDE .*/example-select-04.kt
2029import java.util.*
2030-->
2031
2032```kotlin
2033fun asyncString(time: Int) = async(CommonPool) {
2034 delay(time.toLong())
2035 "Waited for $time ms"
2036}
2037```
2038
Roman Elizarova84730b2017-02-22 11:58:50 +03002039Let us start a dozen of them with a random delay.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002040
2041```kotlin
2042fun asyncStringsList(): List<Deferred<String>> {
2043 val random = Random(3)
Roman Elizarova84730b2017-02-22 11:58:50 +03002044 return List(12) { asyncString(random.nextInt(1000)) }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002045}
2046```
2047
Roman Elizarova84730b2017-02-22 11:58:50 +03002048Now the main function awaits for the first of them to complete and counts the number of deferred values
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002049that are still active. Note, that we've used here the fact that `select` expression is a Kotlin DSL,
Roman Elizarova84730b2017-02-22 11:58:50 +03002050so we can provide clauses for it using an arbitrary code. In this case we iterate over a list
2051of deferred values to provide `onAwait` clause for each deferred value.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002052
2053```kotlin
2054fun main(args: Array<String>) = runBlocking<Unit> {
2055 val list = asyncStringsList()
2056 val result = select<String> {
2057 list.withIndex().forEach { (index, deferred) ->
2058 deferred.onAwait { answer ->
2059 "Deferred $index produced answer '$answer'"
2060 }
2061 }
2062 }
2063 println(result)
Roman Elizarov7c864d82017-02-27 10:17:50 +03002064 val countActive = list.count { it.isActive }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002065 println("$countActive coroutines are still active")
2066}
2067```
2068
2069> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-04.kt)
2070
2071The output is:
2072
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002073```text
Roman Elizarova84730b2017-02-22 11:58:50 +03002074Deferred 4 produced answer 'Waited for 128 ms'
Roman Elizarovd4dcbe22017-02-22 09:57:46 +0300207511 coroutines are still active
2076```
2077
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002078<!--- TEST -->
2079
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002080### Switch over a channel of deferred values
2081
Roman Elizarova84730b2017-02-22 11:58:50 +03002082Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
2083deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002084[onReceiveOrNull][SelectBuilder.onReceiveOrNull] and [onAwait][SelectBuilder.onAwait] clauses in the same `select`:
2085
2086```kotlin
2087fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String>(CommonPool) {
Roman Elizarova84730b2017-02-22 11:58:50 +03002088 var current = input.receive() // start with first received deferred value
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002089 while (isActive) { // loop while not cancelled/closed
2090 val next = select<Deferred<String>?> { // return next deferred value from this select or null
2091 input.onReceiveOrNull { update ->
2092 update // replaces next value to wait
2093 }
2094 current.onAwait { value ->
2095 send(value) // send value that current deferred has produced
2096 input.receiveOrNull() // and use the next deferred from the input channel
2097 }
2098 }
2099 if (next == null) {
2100 println("Channel was closed")
2101 break // out of loop
2102 } else {
2103 current = next
2104 }
2105 }
2106}
2107```
2108
2109To test it, we'll use a simple async function that resolves to a specified string after a specified time:
2110
2111```kotlin
2112fun asyncString(str: String, time: Long) = async(CommonPool) {
2113 delay(time)
2114 str
2115}
2116```
2117
2118The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
2119data to it:
2120
2121```kotlin
2122fun main(args: Array<String>) = runBlocking<Unit> {
2123 val chan = Channel<Deferred<String>>() // the channel for test
Roman Elizarova84730b2017-02-22 11:58:50 +03002124 launch(context) { // launch printing coroutine
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002125 for (s in switchMapDeferreds(chan))
2126 println(s) // print each received string
2127 }
2128 chan.send(asyncString("BEGIN", 100))
2129 delay(200) // enough time for "BEGIN" to be produced
2130 chan.send(asyncString("Slow", 500))
Roman Elizarova84730b2017-02-22 11:58:50 +03002131 delay(100) // not enough time to produce slow
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002132 chan.send(asyncString("Replace", 100))
Roman Elizarova84730b2017-02-22 11:58:50 +03002133 delay(500) // give it time before the last one
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002134 chan.send(asyncString("END", 500))
2135 delay(1000) // give it time to process
Roman Elizarova84730b2017-02-22 11:58:50 +03002136 chan.close() // close the channel ...
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002137 delay(500) // and wait some time to let it finish
2138}
2139```
2140
2141> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-05.kt)
2142
2143The result of this code:
2144
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002145```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002146BEGIN
2147Replace
2148END
2149Channel was closed
2150```
2151
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002152<!--- TEST -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002153
Roman Elizarov8db17332017-03-09 12:40:45 +03002154## Further reading
2155
2156* [Guide to UI programming with coroutines](ui/coroutines-guide-ui.md)
Roman Elizarov8a4a8e12017-03-09 19:52:58 +03002157* [Guide to reactive streams with coroutines](reactive/coroutines-guide-reactive.md)
Roman Elizarov8db17332017-03-09 12:40:45 +03002158* [Coroutines design document (KEEP)](https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md)
2159* [Full kotlinx.coroutines API reference](http://kotlin.github.io/kotlinx.coroutines)
2160
Roman Elizarove0c817d2017-02-10 10:22:01 +03002161<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core -->
2162<!--- DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
2163<!--- INDEX kotlinx.coroutines.experimental -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002164[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
2165[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/delay.html
2166[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
2167[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
2168[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-cancellation-exception.html
2169[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002170[CoroutineScope.isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/is-active.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002171[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
2172[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
2173[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
2174[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
2175[async]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/async.html
2176[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002177[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html
2178[Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/start.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002179[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
2180[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002181[CoroutineScope.context]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/context.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002182[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002183[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002184[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002185[Job.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/invoke.html
2186[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/cancel.html
Roman Elizarovf5bc0472017-02-22 11:38:13 +03002187<!--- INDEX kotlinx.coroutines.experimental.sync -->
2188[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002189[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/lock.html
2190[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/unlock.html
Roman Elizarove0c817d2017-02-10 10:22:01 +03002191<!--- INDEX kotlinx.coroutines.experimental.channels -->
Roman Elizarov419a6c82017-02-09 18:36:22 +03002192[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002193[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
2194[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
2195[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html
Roman Elizarova5e653f2017-02-13 13:49:55 +03002196[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
Roman Elizarov86349be2017-03-17 16:47:37 +03002197[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002198[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/invoke.html
Roman Elizarovc0e19f82017-02-27 11:59:14 +03002199[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002200<!--- INDEX kotlinx.coroutines.experimental.selects -->
2201[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002202[SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive.html
2203[SelectBuilder.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive-or-null.html
2204[SelectBuilder.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-send.html
2205[SelectBuilder.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-await.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002206<!--- END -->