blob: 702a9bd8a0b9cbbe579502598c356e9e0c71b395 [file] [log] [blame] [view]
Roman Elizarova5e653f2017-02-13 13:49:55 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9]+)\.kt
2/*
3 * Copyright 2016-2017 JetBrains s.r.o.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
Roman Elizarovf16fd272017-02-07 11:26:00 +030017
Roman Elizarova5e653f2017-02-13 13:49:55 +030018// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
19package guide.$$1.example$$2
Roman Elizarovf16fd272017-02-07 11:26:00 +030020
Roman Elizarova5e653f2017-02-13 13:49:55 +030021import kotlinx.coroutines.experimental.*
Roman Elizarovf16fd272017-02-07 11:26:00 +030022-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +030023<!--- KNIT kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
24<!--- TEST_OUT kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
25// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
26package guide.test
27
28import org.junit.Test
29
30class GuideTest {
31-->
Roman Elizarovf16fd272017-02-07 11:26:00 +030032
Roman Elizarov7deefb82017-01-31 10:33:17 +030033# Guide to kotlinx.coroutines by example
34
35This is a short guide on core features of `kotlinx.coroutines` with a series of examples.
36
Roman Elizarov1293ccd2017-02-01 18:49:54 +030037## Table of contents
38
Roman Elizarovfa7723e2017-02-06 11:17:51 +030039<!--- TOC -->
40
Roman Elizarov1293ccd2017-02-01 18:49:54 +030041* [Coroutine basics](#coroutine-basics)
42 * [Your first coroutine](#your-first-coroutine)
43 * [Bridging blocking and non-blocking worlds](#bridging-blocking-and-non-blocking-worlds)
44 * [Waiting for a job](#waiting-for-a-job)
45 * [Extract function refactoring](#extract-function-refactoring)
46 * [Coroutines ARE light-weight](#coroutines-are-light-weight)
47 * [Coroutines are like daemon threads](#coroutines-are-like-daemon-threads)
48* [Cancellation and timeouts](#cancellation-and-timeouts)
49 * [Cancelling coroutine execution](#cancelling-coroutine-execution)
50 * [Cancellation is cooperative](#cancellation-is-cooperative)
51 * [Making computation code cancellable](#making-computation-code-cancellable)
52 * [Closing resources with finally](#closing-resources-with-finally)
53 * [Run non-cancellable block](#run-non-cancellable-block)
54 * [Timeout](#timeout)
55* [Composing suspending functions](#composing-suspending-functions)
56 * [Sequential by default](#sequential-by-default)
Roman Elizarov32d95322017-02-09 15:57:31 +030057 * [Concurrent using async](#concurrent-using-async)
58 * [Lazily started async](#lazily-started-async)
59 * [Async-style functions](#async-style-functions)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030060* [Coroutine context and dispatchers](#coroutine-context-and-dispatchers)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030061 * [Dispatchers and threads](#dispatchers-and-threads)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030062 * [Unconfined vs confined dispatcher](#unconfined-vs-confined-dispatcher)
63 * [Debugging coroutines and threads](#debugging-coroutines-and-threads)
64 * [Jumping between threads](#jumping-between-threads)
65 * [Job in the context](#job-in-the-context)
66 * [Children of a coroutine](#children-of-a-coroutine)
67 * [Combining contexts](#combining-contexts)
68 * [Naming coroutines for debugging](#naming-coroutines-for-debugging)
Roman Elizarov2fd7cb32017-02-11 23:18:59 +030069 * [Cancellation via explicit job](#cancellation-via-explicit-job)
Roman Elizarovb7721cf2017-02-03 19:23:08 +030070* [Channels](#channels)
71 * [Channel basics](#channel-basics)
72 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
73 * [Building channel producers](#building-channel-producers)
74 * [Pipelines](#pipelines)
75 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
76 * [Fan-out](#fan-out)
77 * [Fan-in](#fan-in)
78 * [Buffered channels](#buffered-channels)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030079* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
80 * [The problem](#the-problem)
Roman Elizarov1e459602017-02-27 11:05:17 +030081 * [Volatiles are of no help](#volatiles-are-of-no-help)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030082 * [Thread-safe data structures](#thread-safe-data-structures)
Roman Elizarov1e459602017-02-27 11:05:17 +030083 * [Thread confinement fine-grained](#thread-confinement-fine-grained)
84 * [Thread confinement coarse-grained](#thread-confinement-coarse-grained)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030085 * [Mutual exclusion](#mutual-exclusion)
86 * [Actors](#actors)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +030087* [Select expression](#select-expression)
88 * [Selecting from channels](#selecting-from-channels)
89 * [Selecting on close](#selecting-on-close)
90 * [Selecting to send](#selecting-to-send)
91 * [Selecting deferred values](#selecting-deferred-values)
92 * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030093
Roman Elizarova5e653f2017-02-13 13:49:55 +030094<!--- END_TOC -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +030095
96## Coroutine basics
97
98This section covers basic coroutine concepts.
99
100### Your first coroutine
Roman Elizarov7deefb82017-01-31 10:33:17 +0300101
102Run the following code:
103
104```kotlin
105fun main(args: Array<String>) {
106 launch(CommonPool) { // create new coroutine in common thread pool
107 delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
108 println("World!") // print after delay
109 }
110 println("Hello,") // main function continues while coroutine is delayed
111 Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
112}
113```
114
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300115> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300116
117Run this code:
118
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300119```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300120Hello,
121World!
122```
123
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300124<!--- TEST -->
125
Roman Elizarov419a6c82017-02-09 18:36:22 +0300126Essentially, coroutines are light-weight threads.
127They are launched with [launch] _coroutine builder_.
128You can achieve the same result replacing
Roman Elizarov7deefb82017-01-31 10:33:17 +0300129`launch(CommonPool) { ... }` with `thread { ... }` and `delay(...)` with `Thread.sleep(...)`. Try it.
130
131If you start by replacing `launch(CommonPool)` by `thread`, the compiler produces the following error:
132
133```
134Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
135```
136
Roman Elizarov419a6c82017-02-09 18:36:22 +0300137That is because [delay] is a special _suspending function_ that does not block a thread, but _suspends_
Roman Elizarov7deefb82017-01-31 10:33:17 +0300138coroutine and it can be only used from a coroutine.
139
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300140### Bridging blocking and non-blocking worlds
Roman Elizarov7deefb82017-01-31 10:33:17 +0300141
142The first example mixes _non-blocking_ `delay(...)` and _blocking_ `Thread.sleep(...)` in the same
143code of `main` function. It is easy to get lost. Let's cleanly separate blocking and non-blocking
Roman Elizarov419a6c82017-02-09 18:36:22 +0300144worlds by using [runBlocking]:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300145
146```kotlin
147fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
148 launch(CommonPool) { // create new coroutine in common thread pool
149 delay(1000L)
150 println("World!")
151 }
152 println("Hello,") // main coroutine continues while child is delayed
153 delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
154}
155```
156
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300157> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300158
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300159<!--- TEST
160Hello,
161World!
162-->
163
Roman Elizarov419a6c82017-02-09 18:36:22 +0300164The result is the same, but this code uses only non-blocking [delay].
Roman Elizarov7deefb82017-01-31 10:33:17 +0300165
166`runBlocking { ... }` works as an adaptor that is used here to start the top-level main coroutine.
167The regular code outside of `runBlocking` _blocks_, until the coroutine inside `runBlocking` is active.
168
169This is also a way to write unit-tests for suspending functions:
170
171```kotlin
172class MyTest {
173 @Test
174 fun testMySuspendingFunction() = runBlocking<Unit> {
175 // here we can use suspending functions using any assertion style that we like
176 }
177}
178```
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300179
180<!--- CLEAR -->
Roman Elizarov7deefb82017-01-31 10:33:17 +0300181
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300182### Waiting for a job
Roman Elizarov7deefb82017-01-31 10:33:17 +0300183
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300184Delaying for a time while another coroutine is working is not a good approach. Let's explicitly
Roman Elizarov419a6c82017-02-09 18:36:22 +0300185wait (in a non-blocking way) until the background [Job] that we have launched is complete:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300186
187```kotlin
188fun main(args: Array<String>) = runBlocking<Unit> {
189 val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
190 delay(1000L)
191 println("World!")
192 }
193 println("Hello,")
194 job.join() // wait until child coroutine completes
195}
196```
197
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300198> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300199
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300200<!--- TEST
201Hello,
202World!
203-->
204
Roman Elizarov7deefb82017-01-31 10:33:17 +0300205Now the result is still the same, but the code of the main coroutine is not tied to the duration of
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300206the background job in any way. Much better.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300207
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300208### Extract function refactoring
Roman Elizarov7deefb82017-01-31 10:33:17 +0300209
210Let's extract the block of code inside `launch(CommonPool} { ... }` into a separate function. When you
211perform "Extract function" refactoring on this code you get a new function with `suspend` modifier.
212That is your first _suspending function_. Suspending functions can be used inside coroutines
213just like regular functions, but their additional feature is that they can, in turn,
214use other suspending functions, like `delay` in this example, to _suspend_ execution of a coroutine.
215
216```kotlin
217fun main(args: Array<String>) = runBlocking<Unit> {
218 val job = launch(CommonPool) { doWorld() }
219 println("Hello,")
220 job.join()
221}
222
223// this is your first suspending function
224suspend fun doWorld() {
225 delay(1000L)
226 println("World!")
227}
228```
229
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300230> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-04.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300231
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300232<!--- TEST
233Hello,
234World!
235-->
236
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300237### Coroutines ARE light-weight
Roman Elizarov7deefb82017-01-31 10:33:17 +0300238
239Run the following code:
240
241```kotlin
242fun main(args: Array<String>) = runBlocking<Unit> {
243 val jobs = List(100_000) { // create a lot of coroutines and list their jobs
244 launch(CommonPool) {
245 delay(1000L)
246 print(".")
247 }
248 }
249 jobs.forEach { it.join() } // wait for all jobs to complete
250}
251```
252
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300253> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-05.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300254
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300255<!--- TEST lines.size == 1 && lines[0] == ".".repeat(100_000) -->
256
Roman Elizarov7deefb82017-01-31 10:33:17 +0300257It starts 100K coroutines and, after a second, each coroutine prints a dot.
258Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
259
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300260### Coroutines are like daemon threads
Roman Elizarov7deefb82017-01-31 10:33:17 +0300261
262The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300263returns from the main function after some delay:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300264
265```kotlin
266fun main(args: Array<String>) = runBlocking<Unit> {
267 launch(CommonPool) {
268 repeat(1000) { i ->
269 println("I'm sleeping $i ...")
270 delay(500L)
271 }
272 }
273 delay(1300L) // just quit after delay
274}
275```
276
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300277> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-06.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300278
279You can run and see that it prints three lines and terminates:
280
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300281```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300282I'm sleeping 0 ...
283I'm sleeping 1 ...
284I'm sleeping 2 ...
285```
286
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300287<!--- TEST -->
288
Roman Elizarov7deefb82017-01-31 10:33:17 +0300289Active coroutines do not keep the process alive. They are like daemon threads.
290
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300291## Cancellation and timeouts
292
293This section covers coroutine cancellation and timeouts.
294
295### Cancelling coroutine execution
Roman Elizarov7deefb82017-01-31 10:33:17 +0300296
297In small application the return from "main" method might sound like a good idea to get all coroutines
298implicitly terminated. In a larger, long-running application, you need finer-grained control.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300299The [launch] function returns a [Job] that can be used to cancel running coroutine:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300300
301```kotlin
302fun main(args: Array<String>) = runBlocking<Unit> {
303 val job = launch(CommonPool) {
304 repeat(1000) { i ->
305 println("I'm sleeping $i ...")
306 delay(500L)
307 }
308 }
309 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300310 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300311 job.cancel() // cancels the job
312 delay(1300L) // delay a bit to ensure it was cancelled indeed
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300313 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300314}
315```
316
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300317> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300318
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300319It produces the following output:
320
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300321```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300322I'm sleeping 0 ...
323I'm sleeping 1 ...
324I'm sleeping 2 ...
325main: I'm tired of waiting!
326main: Now I can quit.
327```
328
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300329<!--- TEST -->
330
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300331As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
332
333### Cancellation is cooperative
Roman Elizarov7deefb82017-01-31 10:33:17 +0300334
Tair Rzayevaf734622017-02-01 22:30:16 +0200335Coroutine cancellation is _cooperative_. A coroutine code has to cooperate to be cancellable.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300336All the suspending functions in `kotlinx.coroutines` are _cancellable_. They check for cancellation of
Roman Elizarov419a6c82017-02-09 18:36:22 +0300337coroutine and throw [CancellationException] when cancelled. However, if a coroutine is working in
Roman Elizarov7deefb82017-01-31 10:33:17 +0300338a computation and does not check for cancellation, then it cannot be cancelled, like the following
339example shows:
340
341```kotlin
342fun main(args: Array<String>) = runBlocking<Unit> {
343 val job = launch(CommonPool) {
344 var nextPrintTime = 0L
345 var i = 0
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300346 while (i < 10) { // computation loop
Roman Elizarov7deefb82017-01-31 10:33:17 +0300347 val currentTime = System.currentTimeMillis()
348 if (currentTime >= nextPrintTime) {
349 println("I'm sleeping ${i++} ...")
350 nextPrintTime = currentTime + 500L
351 }
352 }
353 }
354 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300355 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300356 job.cancel() // cancels the job
357 delay(1300L) // delay a bit to see if it was cancelled....
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300358 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300359}
360```
361
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300362> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300363
364Run it to see that it continues to print "I'm sleeping" even after cancellation.
365
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300366<!--- TEST
367I'm sleeping 0 ...
368I'm sleeping 1 ...
369I'm sleeping 2 ...
370main: I'm tired of waiting!
371I'm sleeping 3 ...
372I'm sleeping 4 ...
373I'm sleeping 5 ...
374main: Now I can quit.
375-->
376
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300377### Making computation code cancellable
Roman Elizarov7deefb82017-01-31 10:33:17 +0300378
379There are two approaches to making computation code cancellable. The first one is to periodically
Roman Elizarov419a6c82017-02-09 18:36:22 +0300380invoke a suspending function. There is a [yield] function that is a good choice for that purpose.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300381The other one is to explicitly check the cancellation status. Let us try the later approach.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300382
383Replace `while (true)` in the previous example with `while (isActive)` and rerun it.
384
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300385```kotlin
386fun main(args: Array<String>) = runBlocking<Unit> {
387 val job = launch(CommonPool) {
388 var nextPrintTime = 0L
389 var i = 0
390 while (isActive) { // cancellable computation loop
391 val currentTime = System.currentTimeMillis()
392 if (currentTime >= nextPrintTime) {
393 println("I'm sleeping ${i++} ...")
394 nextPrintTime = currentTime + 500L
395 }
396 }
397 }
398 delay(1300L) // delay a bit
399 println("main: I'm tired of waiting!")
400 job.cancel() // cancels the job
401 delay(1300L) // delay a bit to see if it was cancelled....
402 println("main: Now I can quit.")
403}
404```
405
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300406> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300407
Roman Elizarov419a6c82017-02-09 18:36:22 +0300408As you can see, now this loop can be cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
409the code of coroutines via [CoroutineScope] object.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300410
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300411<!--- TEST
412I'm sleeping 0 ...
413I'm sleeping 1 ...
414I'm sleeping 2 ...
415main: I'm tired of waiting!
416main: Now I can quit.
417-->
418
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300419### Closing resources with finally
420
Roman Elizarov419a6c82017-02-09 18:36:22 +0300421Cancellable suspending functions throw [CancellationException] on cancellation which can be handled in
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300422all the usual way. For example, the `try {...} finally {...}` and Kotlin `use` function execute their
423finalization actions normally when coroutine is cancelled:
424
425```kotlin
426fun main(args: Array<String>) = runBlocking<Unit> {
427 val job = launch(CommonPool) {
428 try {
429 repeat(1000) { i ->
430 println("I'm sleeping $i ...")
431 delay(500L)
432 }
433 } finally {
434 println("I'm running finally")
435 }
436 }
437 delay(1300L) // delay a bit
438 println("main: I'm tired of waiting!")
439 job.cancel() // cancels the job
440 delay(1300L) // delay a bit to ensure it was cancelled indeed
441 println("main: Now I can quit.")
442}
443```
444
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300445> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300446
447The example above produces the following output:
448
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300449```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300450I'm sleeping 0 ...
451I'm sleeping 1 ...
452I'm sleeping 2 ...
453main: I'm tired of waiting!
454I'm running finally
455main: Now I can quit.
456```
457
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300458<!--- TEST -->
459
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300460### Run non-cancellable block
461
462Any attempt to use a suspending function in the `finally` block of the previous example will cause
Roman Elizarov419a6c82017-02-09 18:36:22 +0300463[CancellationException], because the coroutine running this code is cancelled. Usually, this is not a
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300464problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
465communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
466rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
Roman Elizarov419a6c82017-02-09 18:36:22 +0300467`run(NonCancellable) {...}` using [run] function and [NonCancellable] context as the following example shows:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300468
469```kotlin
470fun main(args: Array<String>) = runBlocking<Unit> {
471 val job = launch(CommonPool) {
472 try {
473 repeat(1000) { i ->
474 println("I'm sleeping $i ...")
475 delay(500L)
476 }
477 } finally {
478 run(NonCancellable) {
479 println("I'm running finally")
480 delay(1000L)
481 println("And I've just delayed for 1 sec because I'm non-cancellable")
482 }
483 }
484 }
485 delay(1300L) // delay a bit
486 println("main: I'm tired of waiting!")
487 job.cancel() // cancels the job
488 delay(1300L) // delay a bit to ensure it was cancelled indeed
489 println("main: Now I can quit.")
490}
491```
492
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300493> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300494
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300495<!--- TEST
496I'm sleeping 0 ...
497I'm sleeping 1 ...
498I'm sleeping 2 ...
499main: I'm tired of waiting!
500I'm running finally
501And I've just delayed for 1 sec because I'm non-cancellable
502main: Now I can quit.
503-->
504
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300505### Timeout
506
507The most obvious reason to cancel coroutine execution in practice,
508is because its execution time has exceeded some timeout.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300509While you can manually track the reference to the corresponding [Job] and launch a separate coroutine to cancel
510the tracked one after delay, there is a ready to use [withTimeout] function that does it.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300511Look at the following example:
512
513```kotlin
514fun main(args: Array<String>) = runBlocking<Unit> {
515 withTimeout(1300L) {
516 repeat(1000) { i ->
517 println("I'm sleeping $i ...")
518 delay(500L)
519 }
520 }
521}
522```
523
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300524> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-06.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300525
526It produces the following output:
527
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300528```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300529I'm sleeping 0 ...
530I'm sleeping 1 ...
531I'm sleeping 2 ...
532Exception in thread "main" java.util.concurrent.CancellationException: Timed out waiting for 1300 MILLISECONDS
533```
534
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300535<!--- TEST STARTS_WITH -->
536
Roman Elizarov419a6c82017-02-09 18:36:22 +0300537We have not seen the [CancellationException] stack trace printed on the console before. That is because
Roman Elizarov7c864d82017-02-27 10:17:50 +0300538inside a cancelled coroutine `CancellationException` is considered to be a normal reason for coroutine completion.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300539However, in this example we have used `withTimeout` right inside the `main` function.
540
541Because cancellation is just an exception, all the resources will be closed in a usual way.
542You can wrap the code with timeout in `try {...} catch (e: CancellationException) {...}` block if
543you need to do some additional action specifically on timeout.
544
545## Composing suspending functions
546
547This section covers various approaches to composition of suspending functions.
548
549### Sequential by default
550
551Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300552remote service call or computation. We just pretend they are useful, but actually each one just
553delays for a second for the purpose of this example:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300554
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300555<!--- INCLUDE .*/example-compose-([0-9]+).kt
556import kotlin.system.measureTimeMillis
557-->
558
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300559```kotlin
560suspend fun doSomethingUsefulOne(): Int {
561 delay(1000L) // pretend we are doing something useful here
562 return 13
563}
564
565suspend fun doSomethingUsefulTwo(): Int {
566 delay(1000L) // pretend we are doing something useful here, too
567 return 29
568}
569```
570
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300571<!--- INCLUDE .*/example-compose-([0-9]+).kt -->
572
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300573What do we do if need to invoke them _sequentially_ -- first `doSomethingUsefulOne` _and then_
574`doSomethingUsefulTwo` and compute the sum of their results?
575In practise we do this if we use the results of the first function to make a decision on whether we need
576to invoke the second one or to decide on how to invoke it.
577
578We just use a normal sequential invocation, because the code in the coroutine, just like in the regular
Roman Elizarov32d95322017-02-09 15:57:31 +0300579code, is _sequential_ by default. The following example demonstrates it by measuring the total
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300580time it takes to execute both suspending functions:
581
582```kotlin
583fun main(args: Array<String>) = runBlocking<Unit> {
584 val time = measureTimeMillis {
585 val one = doSomethingUsefulOne()
586 val two = doSomethingUsefulTwo()
587 println("The answer is ${one + two}")
588 }
589 println("Completed in $time ms")
590}
591```
592
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300593> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-01.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300594
595It produces something like this:
596
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300597```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300598The answer is 42
599Completed in 2017 ms
600```
601
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300602<!--- TEST FLEXIBLE_TIME -->
603
Roman Elizarov32d95322017-02-09 15:57:31 +0300604### Concurrent using async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300605
606What if there are no dependencies between invocation of `doSomethingUsefulOne` and `doSomethingUsefulTwo` and
Roman Elizarov419a6c82017-02-09 18:36:22 +0300607we want to get the answer faster, by doing both _concurrently_? This is where [async] comes to help.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300608
Roman Elizarov419a6c82017-02-09 18:36:22 +0300609Conceptually, [async] is just like [launch]. It starts a separate coroutine which is a light-weight thread
610that works concurrently with all the other coroutines. The difference is that `launch` returns a [Job] and
611does not carry any resulting value, while `async` returns a [Deferred] -- a light-weight non-blocking future
Roman Elizarov32d95322017-02-09 15:57:31 +0300612that represents a promise to provide a result later. You can use `.await()` on a deferred value to get its eventual result,
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300613but `Deferred` is also a `Job`, so you can cancel it if needed.
614
615```kotlin
616fun main(args: Array<String>) = runBlocking<Unit> {
617 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300618 val one = async(CommonPool) { doSomethingUsefulOne() }
619 val two = async(CommonPool) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300620 println("The answer is ${one.await() + two.await()}")
621 }
622 println("Completed in $time ms")
623}
624```
625
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300626> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-02.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300627
628It produces something like this:
629
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300630```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300631The answer is 42
632Completed in 1017 ms
633```
634
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300635<!--- TEST FLEXIBLE_TIME -->
636
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300637This is twice as fast, because we have concurrent execution of two coroutines.
638Note, that concurrency with coroutines is always explicit.
639
Roman Elizarov32d95322017-02-09 15:57:31 +0300640### Lazily started async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300641
Roman Elizarov419a6c82017-02-09 18:36:22 +0300642There is a laziness option to [async] with `start = false` parameter.
643It starts coroutine only when its result is needed by some
644[await][Deferred.await] or if a [start][Job.start] function
Roman Elizarov32d95322017-02-09 15:57:31 +0300645is invoked. Run the following example that differs from the previous one only by this option:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300646
647```kotlin
648fun main(args: Array<String>) = runBlocking<Unit> {
649 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300650 val one = async(CommonPool, start = false) { doSomethingUsefulOne() }
651 val two = async(CommonPool, start = false) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300652 println("The answer is ${one.await() + two.await()}")
653 }
654 println("Completed in $time ms")
655}
656```
657
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300658> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-03.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300659
660It produces something like this:
661
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300662```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300663The answer is 42
664Completed in 2017 ms
665```
666
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300667<!--- TEST FLEXIBLE_TIME -->
668
Roman Elizarov32d95322017-02-09 15:57:31 +0300669So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
670for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
671the standard `lazy` function in cases when computation of the value involves suspending functions.
672
673### Async-style functions
674
675We can define async-style functions that invoke `doSomethingUsefulOne` and `doSomethingUsefulTwo`
Roman Elizarov419a6c82017-02-09 18:36:22 +0300676_asynchronously_ using [async] coroutine builder. It is a good style to name such functions with
Roman Elizarov32d95322017-02-09 15:57:31 +0300677either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous
678computation and one needs to use the resulting deferred value to get the result.
679
680```kotlin
681// The result type of asyncSomethingUsefulOne is Deferred<Int>
682fun asyncSomethingUsefulOne() = async(CommonPool) {
683 doSomethingUsefulOne()
684}
685
686// The result type of asyncSomethingUsefulTwo is Deferred<Int>
687fun asyncSomethingUsefulTwo() = async(CommonPool) {
688 doSomethingUsefulTwo()
689}
690```
691
692Note, that these `asyncXXX` function are **not** _suspending_ functions. They can be used from anywhere.
693However, their use always implies asynchronous (here meaning _concurrent_) execution of their action
694with the invoking code.
695
696The following example shows their use outside of coroutine:
697
698```kotlin
699// note, that we don't have `runBlocking` to the right of `main` in this example
700fun main(args: Array<String>) {
701 val time = measureTimeMillis {
702 // we can initiate async actions outside of a coroutine
703 val one = asyncSomethingUsefulOne()
704 val two = asyncSomethingUsefulTwo()
705 // but waiting for a result must involve either suspending or blocking.
706 // here we use `runBlocking { ... }` to block the main thread while waiting for the result
707 runBlocking {
708 println("The answer is ${one.await() + two.await()}")
709 }
710 }
711 println("Completed in $time ms")
712}
713```
714
715> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300716
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300717<!--- TEST FLEXIBLE_TIME
718The answer is 42
719Completed in 1085 ms
720-->
721
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300722## Coroutine context and dispatchers
723
Roman Elizarov32d95322017-02-09 15:57:31 +0300724We've already seen `launch(CommonPool) {...}`, `async(CommonPool) {...}`, `run(NonCancellable) {...}`, etc.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300725In these code snippets [CommonPool] and [NonCancellable] are _coroutine contexts_.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300726This section covers other available choices.
727
728### Dispatchers and threads
729
Roman Elizarov419a6c82017-02-09 18:36:22 +0300730Coroutine context includes a [_coroutine dispatcher_][CoroutineDispatcher] which determines what thread or threads
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300731the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
732to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:
733
734```kotlin
735fun main(args: Array<String>) = runBlocking<Unit> {
736 val jobs = arrayListOf<Job>()
737 jobs += launch(Unconfined) { // not confined -- will work with main thread
738 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
739 }
740 jobs += launch(context) { // context of the parent, runBlocking coroutine
741 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
742 }
743 jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
744 println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
745 }
746 jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
747 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
748 }
749 jobs.forEach { it.join() }
750}
751```
752
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300753> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-01.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300754
755It produces the following output (maybe in different order):
756
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300757```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300758 'Unconfined': I'm working in thread main
759 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
760 'newSTC': I'm working in thread MyOwnThread
761 'context': I'm working in thread main
762```
763
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300764<!--- TEST LINES_START_UNORDERED -->
765
Roman Elizarov419a6c82017-02-09 18:36:22 +0300766The difference between parent [context][CoroutineScope.context] and [Unconfined] context will be shown later.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300767
768### Unconfined vs confined dispatcher
769
Roman Elizarov419a6c82017-02-09 18:36:22 +0300770The [Unconfined] coroutine dispatcher starts coroutine in the caller thread, but only until the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300771first suspension point. After suspension it resumes in the thread that is fully determined by the
772suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not
773consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
774
Roman Elizarov419a6c82017-02-09 18:36:22 +0300775On the other side, [context][CoroutineScope.context] property that is available inside the block of any coroutine
776via [CoroutineScope] interface, is a reference to a context of this particular coroutine.
777This way, a parent context can be inherited. The default context of [runBlocking], in particular,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300778is confined to be invoker thread, so inheriting it has the effect of confining execution to
779this thread with a predictable FIFO scheduling.
780
781```kotlin
782fun main(args: Array<String>) = runBlocking<Unit> {
783 val jobs = arrayListOf<Job>()
784 jobs += launch(Unconfined) { // not confined -- will work with main thread
785 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
786 delay(1000)
787 println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
788 }
789 jobs += launch(context) { // context of the parent, runBlocking coroutine
790 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
791 delay(1000)
792 println(" 'context': After delay in thread ${Thread.currentThread().name}")
793 }
794 jobs.forEach { it.join() }
795}
796```
797
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300798> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-contest-02.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300799
800Produces the output:
801
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300802```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300803 'Unconfined': I'm working in thread main
804 'context': I'm working in thread main
805 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
806 'context': After delay in thread main
807```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300808
809<!--- TEST LINES_START -->
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300810
Roman Elizarov7c864d82017-02-27 10:17:50 +0300811So, the coroutine that had inherited `context` of `runBlocking {...}` continues to execute in the `main` thread,
Roman Elizarov419a6c82017-02-09 18:36:22 +0300812while the unconfined one had resumed in the scheduler thread that [delay] function is using.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300813
814### Debugging coroutines and threads
815
Roman Elizarov419a6c82017-02-09 18:36:22 +0300816Coroutines can suspend on one thread and resume on another thread with [Unconfined] dispatcher or
817with a multi-threaded dispatcher like [CommonPool]. Even with a single-threaded dispatcher it might be hard to
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300818figure out what coroutine was doing what, where, and when. The common approach to debugging applications with
819threads is to print the thread name in the log file on each log statement. This feature is universally supported
820by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
821`kotlinx.coroutines` includes debugging facilities to make it easier.
822
823Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
824
825```kotlin
826fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
827
828fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov32d95322017-02-09 15:57:31 +0300829 val a = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300830 log("I'm computing a piece of the answer")
831 6
832 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300833 val b = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300834 log("I'm computing another piece of the answer")
835 7
836 }
837 log("The answer is ${a.await() * b.await()}")
838}
839```
840
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300841> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300842
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300843There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300844and two coroutines computing deferred values `a` (#2) and `b` (#3).
845They are all executing in the context of `runBlocking` and are confined to the main thread.
846The output of this code is:
847
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300848```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300849[main @coroutine#2] I'm computing a piece of the answer
850[main @coroutine#3] I'm computing another piece of the answer
851[main @coroutine#1] The answer is 42
852```
853
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300854<!--- TEST -->
855
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300856The `log` function prints the name of the thread in square brackets and you can see, that it is the `main`
857thread, but the identifier of the currently executing coroutine is appended to it. This identifier
858is consecutively assigned to all created coroutines when debugging mode is turned on.
859
Roman Elizarov419a6c82017-02-09 18:36:22 +0300860You can read more about debugging facilities in the documentation for [newCoroutineContext] function.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300861
862### Jumping between threads
863
864Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
865
866```kotlin
867fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
868
869fun main(args: Array<String>) {
870 val ctx1 = newSingleThreadContext("Ctx1")
871 val ctx2 = newSingleThreadContext("Ctx2")
872 runBlocking(ctx1) {
873 log("Started in ctx1")
874 run(ctx2) {
875 log("Working in ctx2")
876 }
877 log("Back to ctx1")
878 }
879}
880```
881
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300882> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300883
Roman Elizarov419a6c82017-02-09 18:36:22 +0300884It demonstrates two new techniques. One is using [runBlocking] with an explicitly specified context, and
885the second one is using [run] function to change a context of a coroutine while still staying in the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300886same coroutine as you can see in the output below:
887
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300888```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300889[Ctx1 @coroutine#1] Started in ctx1
890[Ctx2 @coroutine#1] Working in ctx2
891[Ctx1 @coroutine#1] Back to ctx1
892```
893
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300894<!--- TEST -->
895
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300896### Job in the context
897
Roman Elizarov419a6c82017-02-09 18:36:22 +0300898The coroutine [Job] is part of its context. The coroutine can retrieve it from its own context
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300899using `context[Job]` expression:
900
901```kotlin
902fun main(args: Array<String>) = runBlocking<Unit> {
903 println("My job is ${context[Job]}")
904}
905```
906
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300907> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-05.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300908
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300909It produces somethine like
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300910
911```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300912My job is BlockingCoroutine{Active}@65ae6ba4
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300913```
914
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300915<!--- TEST lines.size == 1 && lines[0].startsWith("My job is BlockingCoroutine{Active}@") -->
916
Roman Elizarov419a6c82017-02-09 18:36:22 +0300917So, [isActive][CoroutineScope.isActive] in [CoroutineScope] is just a convenient shortcut for `context[Job]!!.isActive`.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300918
919### Children of a coroutine
920
Roman Elizarov419a6c82017-02-09 18:36:22 +0300921When [context][CoroutineScope.context] of a coroutine is used to launch another coroutine,
922the [Job] of the new coroutine becomes
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300923a _child_ of the parent coroutine's job. When the parent coroutine is cancelled, all its children
924are recursively cancelled, too.
925
926```kotlin
927fun main(args: Array<String>) = runBlocking<Unit> {
928 // start a coroutine to process some kind of incoming request
929 val request = launch(CommonPool) {
930 // it spawns two other jobs, one with its separate context
931 val job1 = launch(CommonPool) {
932 println("job1: I have my own context and execute independently!")
933 delay(1000)
934 println("job1: I am not affected by cancellation of the request")
935 }
936 // and the other inherits the parent context
937 val job2 = launch(context) {
938 println("job2: I am a child of the request coroutine")
939 delay(1000)
940 println("job2: I will not execute this line if my parent request is cancelled")
941 }
942 // request completes when both its sub-jobs complete:
943 job1.join()
944 job2.join()
945 }
946 delay(500)
947 request.cancel() // cancel processing of the request
948 delay(1000) // delay a second to see what happens
949 println("main: Who has survived request cancellation?")
950}
951```
952
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300953> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-06.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300954
955The output of this code is:
956
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300957```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300958job1: I have my own context and execute independently!
959job2: I am a child of the request coroutine
960job1: I am not affected by cancellation of the request
961main: Who has survived request cancellation?
962```
963
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300964<!--- TEST -->
965
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300966### Combining contexts
967
968Coroutine context can be combined using `+` operator. The context on the right-hand side replaces relevant entries
Roman Elizarov419a6c82017-02-09 18:36:22 +0300969of the context on the left-hand side. For example, a [Job] of the parent coroutine can be inherited, while
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300970its dispatcher replaced:
971
972```kotlin
973fun main(args: Array<String>) = runBlocking<Unit> {
974 // start a coroutine to process some kind of incoming request
975 val request = launch(context) { // use the context of `runBlocking`
976 // spawns CPU-intensive child job in CommonPool !!!
977 val job = launch(context + CommonPool) {
978 println("job: I am a child of the request coroutine, but with a different dispatcher")
979 delay(1000)
980 println("job: I will not execute this line if my parent request is cancelled")
981 }
982 job.join() // request completes when its sub-job completes
983 }
984 delay(500)
985 request.cancel() // cancel processing of the request
986 delay(1000) // delay a second to see what happens
987 println("main: Who has survived request cancellation?")
988}
989```
990
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300991> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-07.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300992
993The expected outcome of this code is:
994
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300995```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300996job: I am a child of the request coroutine, but with a different dispatcher
997main: Who has survived request cancellation?
998```
999
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001000<!--- TEST -->
1001
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001002### Naming coroutines for debugging
1003
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001004Automatically assigned ids are good when coroutines log often and you just need to correlate log records
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001005coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
1006or doing some specific background task, it is better to name it explicitly for debugging purposes.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001007[CoroutineName] serves the same function as a thread name. It'll get displayed in the thread name that
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001008is executing this coroutine when debugging more is turned on.
1009
1010The following example demonstrates this concept:
1011
1012```kotlin
1013fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
1014
1015fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
1016 log("Started main coroutine")
1017 // run two background value computations
Roman Elizarov32d95322017-02-09 15:57:31 +03001018 val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001019 log("Computing v1")
1020 delay(500)
1021 252
1022 }
Roman Elizarov32d95322017-02-09 15:57:31 +03001023 val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001024 log("Computing v2")
1025 delay(1000)
1026 6
1027 }
1028 log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
1029}
1030```
1031
Roman Elizarovfa7723e2017-02-06 11:17:51 +03001032> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001033
1034The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
1035
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001036```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001037[main @main#1] Started main coroutine
1038[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
1039[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
1040[main @main#1] The answer for v1 / v2 = 42
1041```
Roman Elizarov1293ccd2017-02-01 18:49:54 +03001042
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001043<!--- TEST FLEXIBLE_THREAD -->
1044
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001045### Cancellation via explicit job
1046
1047Let us put our knowledge about contexts, children and jobs together. Assume that our application has
1048an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application
1049and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch
1050and update data, do animations, etc. All of these coroutines must be cancelled when activity is destroyed
1051to avoid memory leaks.
1052
1053We can manage a lifecycle of our coroutines by creating an instance of [Job] that is tied to
1054the lifecycle of our activity. A job instance is created using [Job()][Job.invoke] factory function
1055as the following example shows. We need to make sure that all the coroutines are started
1056with this job in their context and then a single invocation of [Job.cancel] terminates them all.
1057
1058```kotlin
1059fun main(args: Array<String>) = runBlocking<Unit> {
1060 val job = Job() // create a job object to manage our lifecycle
1061 // now launch ten coroutines for a demo, each working for a different time
1062 val coroutines = List(10) { i ->
1063 // they are all children of our job object
1064 launch(context + job) { // we use the context of main runBlocking thread, but with our own job object
1065 delay(i * 200L) // variable delay 0ms, 200ms, 400ms, ... etc
1066 println("Coroutine $i is done")
1067 }
1068 }
1069 println("Launched ${coroutines.size} coroutines")
1070 delay(500L) // delay for half a second
1071 println("Cancelling job!")
1072 job.cancel() // cancel our job.. !!!
1073 delay(1000L) // delay for more to see if our coroutines are still working
1074}
1075```
1076
1077> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt)
1078
1079The output of this example is:
1080
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001081```text
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001082Launched 10 coroutines
1083Coroutine 0 is done
1084Coroutine 1 is done
1085Coroutine 2 is done
1086Cancelling job!
1087```
1088
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001089<!--- TEST -->
1090
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001091As you can see, only the first three coroutines had printed a message and the others were cancelled
1092by a single invocation of `job.cancel()`. So all we need to do in our hypothetical Android
1093application is to create a parent job object when activity is created, use it for child coroutines,
1094and cancel it when activity is destroyed.
1095
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001096## Channels
Roman Elizarov7deefb82017-01-31 10:33:17 +03001097
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001098Deferred values provide a convenient way to transfer a single value between coroutines.
1099Channels provide a way to transfer a stream of values.
1100
1101<!--- INCLUDE .*/example-channel-([0-9]+).kt
1102import kotlinx.coroutines.experimental.channels.*
1103-->
1104
1105### Channel basics
1106
Roman Elizarov419a6c82017-02-09 18:36:22 +03001107A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
1108instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
1109a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001110
1111```kotlin
1112fun main(args: Array<String>) = runBlocking<Unit> {
1113 val channel = Channel<Int>()
1114 launch(CommonPool) {
1115 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
1116 for (x in 1..5) channel.send(x * x)
1117 }
1118 // here we print five received integers:
1119 repeat(5) { println(channel.receive()) }
1120 println("Done!")
1121}
1122```
1123
1124> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
1125
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001126The output of this code is:
1127
1128```text
11291
11304
11319
113216
113325
1134Done!
1135```
1136
1137<!--- TEST -->
1138
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001139### Closing and iteration over channels
1140
1141Unlike a queue, a channel can be closed to indicate that no more elements are coming.
1142On the receiver side it is convenient to use a regular `for` loop to receive elements
1143from the channel.
1144
Roman Elizarov419a6c82017-02-09 18:36:22 +03001145Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001146The iteration stops as soon as this close token is received, so there is a guarantee
1147that all previously sent elements before the close are received:
1148
1149```kotlin
1150fun main(args: Array<String>) = runBlocking<Unit> {
1151 val channel = Channel<Int>()
1152 launch(CommonPool) {
1153 for (x in 1..5) channel.send(x * x)
1154 channel.close() // we're done sending
1155 }
1156 // here we print received values using `for` loop (until the channel is closed)
1157 for (y in channel) println(y)
1158 println("Done!")
1159}
1160```
1161
1162> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
1163
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001164<!--- TEST
11651
11664
11679
116816
116925
1170Done!
1171-->
1172
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001173### Building channel producers
1174
Roman Elizarova5e653f2017-02-13 13:49:55 +03001175The pattern where a coroutine is producing a sequence of elements is quite common.
1176This is a part of _producer-consumer_ pattern that is often found in concurrent code.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001177You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
Roman Elizarova5e653f2017-02-13 13:49:55 +03001178to common sense that results must be returned from functions.
1179
1180There is a convenience coroutine builder named [produce] that makes it easy to do it right:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001181
1182```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001183fun produceSquares() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001184 for (x in 1..5) send(x * x)
1185}
1186
1187fun main(args: Array<String>) = runBlocking<Unit> {
1188 val squares = produceSquares()
1189 for (y in squares) println(y)
1190 println("Done!")
1191}
1192```
1193
1194> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
1195
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001196<!--- TEST
11971
11984
11999
120016
120125
1202Done!
1203-->
1204
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001205### Pipelines
1206
1207Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
1208
1209```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001210fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001211 var x = 1
1212 while (true) send(x++) // infinite stream of integers starting from 1
1213}
1214```
1215
Roman Elizarova5e653f2017-02-13 13:49:55 +03001216And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001217In the below example the numbers are just squared:
1218
1219```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001220fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001221 for (x in numbers) send(x * x)
1222}
1223```
1224
Roman Elizarova5e653f2017-02-13 13:49:55 +03001225The main code starts and connects the whole pipeline:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001226
1227```kotlin
1228fun main(args: Array<String>) = runBlocking<Unit> {
1229 val numbers = produceNumbers() // produces integers from 1 and on
1230 val squares = square(numbers) // squares integers
1231 for (i in 1..5) println(squares.receive()) // print first five
1232 println("Done!") // we are done
1233 squares.cancel() // need to cancel these coroutines in a larger app
1234 numbers.cancel()
1235}
1236```
1237
1238> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
1239
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001240<!--- TEST
12411
12424
12439
124416
124525
1246Done!
1247-->
1248
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001249We don't have to cancel these coroutines in this example app, because
1250[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
1251but in a larger app we'll need to stop our pipeline if we don't need it anymore.
1252Alternatively, we could have run pipeline coroutines as
1253[children of a coroutine](#children-of-a-coroutine).
1254
1255### Prime numbers with pipeline
1256
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001257Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001258of coroutines. We start with an infinite sequence of numbers. This time we introduce an
1259explicit context parameter, so that caller can control where our coroutines run:
1260
1261<!--- INCLUDE kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
1262import kotlin.coroutines.experimental.CoroutineContext
1263-->
1264
1265```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001266fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001267 var x = start
1268 while (true) send(x++) // infinite stream of integers from start
1269}
1270```
1271
1272The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1273that are divisible by the given prime number:
1274
1275```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001276fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001277 for (x in numbers) if (x % prime != 0) send(x)
1278}
1279```
1280
1281Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001282and launching new pipeline stage for each prime number found:
1283
1284```
Roman Elizarova5e653f2017-02-13 13:49:55 +03001285numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
Roman Elizarov62500ba2017-02-09 18:55:40 +03001286```
1287
1288The following example prints the first ten prime numbers,
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001289running the whole pipeline in the context of the main thread:
1290
1291```kotlin
1292fun main(args: Array<String>) = runBlocking<Unit> {
1293 var cur = numbersFrom(context, 2)
1294 for (i in 1..10) {
1295 val prime = cur.receive()
1296 println(prime)
1297 cur = filter(context, cur, prime)
1298 }
1299}
1300```
1301
1302> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
1303
1304The output of this code is:
1305
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001306```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +030013072
13083
13095
13107
131111
131213
131317
131419
131523
131629
1317```
1318
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001319<!--- TEST -->
1320
Roman Elizarova5e653f2017-02-13 13:49:55 +03001321Note, that you can build the same pipeline using `buildIterator` coroutine builder from the standard library.
1322Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001323`ReceiveChannel` with `Iterator`, and get rid of the context. You will not need `runBlocking` either.
1324However, the benefit of a pipeline that uses channels as shown above is that it can actually use
1325multiple CPU cores if you run it in [CommonPool] context.
1326
Roman Elizarova5e653f2017-02-13 13:49:55 +03001327Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
Roman Elizarov62500ba2017-02-09 18:55:40 +03001328other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
1329built using `buildSeqeunce`/`buildIterator`, because they do not allow arbitrary suspension, unlike
Roman Elizarova5e653f2017-02-13 13:49:55 +03001330`produce` which is fully asynchronous.
Roman Elizarov62500ba2017-02-09 18:55:40 +03001331
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001332### Fan-out
1333
1334Multiple coroutines may receive from the same channel, distributing work between themselves.
1335Let us start with a producer coroutine that is periodically producing integers
1336(ten numbers per second):
1337
1338```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001339fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001340 var x = 1 // start from 1
1341 while (true) {
1342 send(x++) // produce next
1343 delay(100) // wait 0.1s
1344 }
1345}
1346```
1347
1348Then we can have several processor coroutines. In this example, they just print their id and
1349received number:
1350
1351```kotlin
1352fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
1353 while (true) {
1354 val x = channel.receive()
1355 println("Processor #$id received $x")
1356 }
1357}
1358```
1359
1360Now let us launch five processors and let them work for a second. See what happens:
1361
1362```kotlin
1363fun main(args: Array<String>) = runBlocking<Unit> {
1364 val producer = produceNumbers()
1365 repeat(5) { launchProcessor(it, producer) }
1366 delay(1000)
1367 producer.cancel() // cancel producer coroutine and thus kill them all
1368}
1369```
1370
1371> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
1372
1373The output will be similar to the the following one, albeit the processor ids that receive
1374each specific integer may be different:
1375
1376```
1377Processor #2 received 1
1378Processor #4 received 2
1379Processor #0 received 3
1380Processor #1 received 4
1381Processor #3 received 5
1382Processor #2 received 6
1383Processor #4 received 7
1384Processor #0 received 8
1385Processor #1 received 9
1386Processor #3 received 10
1387```
1388
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001389<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
1390
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001391Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1392over the channel that processor coroutines are doing.
1393
1394### Fan-in
1395
1396Multiple coroutines may send to the same channel.
1397For example, let us have a channel of strings, and a suspending function that
1398repeatedly sends a specified string to this channel with a specified delay:
1399
1400```kotlin
1401suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1402 while (true) {
1403 delay(time)
1404 channel.send(s)
1405 }
1406}
1407```
1408
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001409Now, let us see what happens if we launch a couple of coroutines sending strings
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001410(in this example we launch them in the context of the main thread):
1411
1412```kotlin
1413fun main(args: Array<String>) = runBlocking<Unit> {
1414 val channel = Channel<String>()
1415 launch(context) { sendString(channel, "foo", 200L) }
1416 launch(context) { sendString(channel, "BAR!", 500L) }
1417 repeat(6) { // receive first six
1418 println(channel.receive())
1419 }
1420}
1421```
1422
1423> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
1424
1425The output is:
1426
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001427```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001428foo
1429foo
1430BAR!
1431foo
1432foo
1433BAR!
1434```
1435
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001436<!--- TEST -->
1437
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001438### Buffered channels
1439
1440The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1441meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1442if receive is invoked first, it is suspended until send is invoked.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001443
Roman Elizarova5e653f2017-02-13 13:49:55 +03001444Both [Channel()][Channel.invoke] factory function and [produce] builder take an optional `capacity` parameter to
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001445specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1446similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1447
1448Take a look at the behavior of the following code:
1449
1450```kotlin
1451fun main(args: Array<String>) = runBlocking<Unit> {
1452 val channel = Channel<Int>(4) // create buffered channel
1453 launch(context) { // launch sender coroutine
1454 repeat(10) {
1455 println("Sending $it") // print before sending each element
1456 channel.send(it) // will suspend when buffer is full
1457 }
1458 }
1459 // don't receive anything... just wait....
1460 delay(1000)
1461}
1462```
1463
1464> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
1465
1466It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1467
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001468```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001469Sending 0
1470Sending 1
1471Sending 2
1472Sending 3
1473Sending 4
1474```
1475
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001476<!--- TEST -->
1477
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001478The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001479
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001480## Shared mutable state and concurrency
1481
1482Coroutines can be executed concurrently using a multi-threaded dispatcher like [CommonPool]. It presents
1483all the usual concurrency problems. The main problem being synchronization of access to **shared mutable state**.
1484Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
1485but others are unique.
1486
1487### The problem
1488
Roman Elizarov1e459602017-02-27 11:05:17 +03001489Let us launch a thousand coroutines all doing the same action thousand times (for a total of a million executions).
1490We'll also measure their completion time for further comparisons:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001491
1492<!--- INCLUDE .*/example-sync-([0-9]+).kt
Roman Elizarov1e459602017-02-27 11:05:17 +03001493import kotlin.coroutines.experimental.CoroutineContext
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001494import kotlin.system.measureTimeMillis
1495-->
1496
Roman Elizarov1e459602017-02-27 11:05:17 +03001497<!--- INCLUDE .*/example-sync-03.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001498import java.util.concurrent.atomic.AtomicInteger
1499-->
1500
Roman Elizarov1e459602017-02-27 11:05:17 +03001501<!--- INCLUDE .*/example-sync-06.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001502import kotlinx.coroutines.experimental.sync.Mutex
1503-->
1504
Roman Elizarov1e459602017-02-27 11:05:17 +03001505<!--- INCLUDE .*/example-sync-07.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001506import kotlinx.coroutines.experimental.channels.*
1507-->
1508
1509```kotlin
Roman Elizarov1e459602017-02-27 11:05:17 +03001510suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
1511 val n = 1000 // number of coroutines to launch
1512 val k = 1000 // times an action is repeated by each coroutine
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001513 val time = measureTimeMillis {
1514 val jobs = List(n) {
Roman Elizarov1e459602017-02-27 11:05:17 +03001515 launch(context) {
1516 repeat(k) { action() }
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001517 }
1518 }
1519 jobs.forEach { it.join() }
1520 }
Roman Elizarov1e459602017-02-27 11:05:17 +03001521 println("Completed ${n * k} actions in $time ms")
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001522}
1523```
1524
1525<!--- INCLUDE .*/example-sync-([0-9]+).kt -->
1526
Roman Elizarov1e459602017-02-27 11:05:17 +03001527We start with a very simple action that increments a shared mutable variable using
1528multi-threaded [CommonPool] context.
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001529
1530```kotlin
1531var counter = 0
1532
1533fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001534 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001535 counter++
1536 }
1537 println("Counter = $counter")
1538}
1539```
1540
1541> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt)
1542
Roman Elizarov1e459602017-02-27 11:05:17 +03001543<!--- TEST LINES_START
1544Completed 1000000 actions in
1545Counter =
1546-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001547
Roman Elizarov1e459602017-02-27 11:05:17 +03001548What does it print at the end? It is highly unlikely to ever print "Counter = 1000000", because a thousand coroutines
1549increment the `counter` concurrently from multiple threads without any synchronization.
1550
1551### Volatiles are of no help
1552
1553There is common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
1554
1555```kotlin
1556@Volatile // in Kotlin `volatile` is an annotation
1557var counter = 0
1558
1559fun main(args: Array<String>) = runBlocking<Unit> {
1560 massiveRun(CommonPool) {
1561 counter++
1562 }
1563 println("Counter = $counter")
1564}
1565```
1566
1567> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt)
1568
1569<!--- TEST LINES_START
1570Completed 1000000 actions in
1571Counter =
1572-->
1573
1574This code works slower, but we still don't get "Counter = 1000000" at the end, because volatile variables guarantee
1575linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but
1576do not provide atomicity of larger actions (increment in our case).
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001577
1578### Thread-safe data structures
1579
1580The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
1581linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
1582operations that needs to be performed on a shared state.
Roman Elizarov1e459602017-02-27 11:05:17 +03001583In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001584
1585```kotlin
1586var counter = AtomicInteger()
1587
1588fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001589 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001590 counter.incrementAndGet()
1591 }
1592 println("Counter = ${counter.get()}")
1593}
1594```
1595
Roman Elizarov1e459602017-02-27 11:05:17 +03001596> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001597
Roman Elizarov1e459602017-02-27 11:05:17 +03001598<!--- TEST ARBITRARY_TIME
1599Completed 1000000 actions in xxx ms
1600Counter = 1000000
1601-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001602
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001603This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
1604standard data structures and basic operations on them. However, it does not easily scale to complex
1605state or to complex operations that do not have ready-to-use thread-safe implementations.
1606
Roman Elizarov1e459602017-02-27 11:05:17 +03001607### Thread confinement fine-grained
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001608
Roman Elizarov1e459602017-02-27 11:05:17 +03001609_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001610state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
1611the single event-dispatch/application thread. It is easy to apply with coroutines by using a
1612single-threaded context:
1613
1614```kotlin
1615val counterContext = newSingleThreadContext("CounterContext")
1616var counter = 0
1617
1618fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001619 massiveRun(CommonPool) { // run each coroutine in CommonPool
1620 run(counterContext) { // but confine each increment to the single-threaded context
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001621 counter++
1622 }
1623 }
1624 println("Counter = $counter")
1625}
1626```
1627
Roman Elizarov1e459602017-02-27 11:05:17 +03001628> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001629
Roman Elizarov1e459602017-02-27 11:05:17 +03001630<!--- TEST ARBITRARY_TIME
1631Completed 1000000 actions in xxx ms
1632Counter = 1000000
1633-->
1634
1635This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches
1636from multi-threaded `CommonPool` context to the single-threaded context using [run] block.
1637
1638### Thread confinement coarse-grained
1639
1640In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic
1641are confined to the single thread. The following example does it like that, running each coroutine in
1642the single-threaded context to start with.
1643
1644```kotlin
1645val counterContext = newSingleThreadContext("CounterContext")
1646var counter = 0
1647
1648fun main(args: Array<String>) = runBlocking<Unit> {
1649 massiveRun(counterContext) { // run each coroutine in the single-threaded context
1650 counter++
1651 }
1652 println("Counter = $counter")
1653}
1654```
1655
1656> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt)
1657
1658<!--- TEST ARBITRARY_TIME
1659Completed 1000000 actions in xxx ms
1660Counter = 1000000
1661-->
1662
1663This now works much faster and produces correct result.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001664
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001665### Mutual exclusion
1666
1667Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
1668that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
1669Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
1670delimit a critical section. The key difference is that `Mutex.lock` is a suspending function. It does not block a thread.
1671
1672```kotlin
1673val mutex = Mutex()
1674var counter = 0
1675
1676fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001677 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001678 mutex.lock()
1679 try { counter++ }
1680 finally { mutex.unlock() }
1681 }
1682 println("Counter = $counter")
1683}
1684```
1685
Roman Elizarov1e459602017-02-27 11:05:17 +03001686> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001687
Roman Elizarov1e459602017-02-27 11:05:17 +03001688<!--- TEST ARBITRARY_TIME
1689Completed 1000000 actions in xxx ms
1690Counter = 1000000
1691-->
1692
1693The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations
1694where you absolutely must modify some shared state periodically, but there is no natural thread that this state
1695is confined to.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001696
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001697### Actors
1698
1699An actor is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine,
1700and a channel to communicate with other coroutines. A simple actor can be written as a function,
1701but an actor with a complex state is better suited for a class.
1702
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001703There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its
1704scope to receive messages from and combines the send channel into the resulting job object, so that a
1705single reference to the actor can be carried around as its handle.
1706
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001707```kotlin
1708// Message types for counterActor
1709sealed class CounterMsg
1710object IncCounter : CounterMsg() // one-way message to increment counter
1711class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply
1712
1713// This function launches a new counter actor
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001714fun counterActor() = actor<CounterMsg>(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001715 var counter = 0 // actor state
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001716 for (msg in channel) { // iterate over incoming messages
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001717 when (msg) {
1718 is IncCounter -> counter++
1719 is GetCounter -> msg.response.send(counter)
1720 }
1721 }
1722}
1723
1724fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001725 val counter = counterActor() // create the actor
Roman Elizarov1e459602017-02-27 11:05:17 +03001726 massiveRun(CommonPool) {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001727 counter.send(IncCounter)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001728 }
1729 val response = Channel<Int>()
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001730 counter.send(GetCounter(response))
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001731 println("Counter = ${response.receive()}")
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001732 counter.close() // shutdown the actor
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001733}
1734```
1735
Roman Elizarov1e459602017-02-27 11:05:17 +03001736> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001737
Roman Elizarov1e459602017-02-27 11:05:17 +03001738<!--- TEST ARBITRARY_TIME
1739Completed 1000000 actions in xxx ms
1740Counter = 1000000
1741-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001742
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001743It does not matter (for correctness) what context the actor itself is executed in. An actor is
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001744a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
1745works as a solution to the problem of shared mutable state.
1746
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001747Actor is more efficient than locking under load, because in this case it always has work to do and it does not
1748have to switch to a different context at all.
1749
1750> Note, that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated
1751 with the channel that it receives messages from, while a producer is associated with the channel that it
1752 sends elements to.
Roman Elizarov1e459602017-02-27 11:05:17 +03001753
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001754## Select expression
1755
Roman Elizarova84730b2017-02-22 11:58:50 +03001756Select expression makes it possible to await multiple suspending functions simultaneously and _select_
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001757the first one that becomes available.
1758
1759<!--- INCLUDE .*/example-select-([0-9]+).kt
1760import kotlinx.coroutines.experimental.channels.*
1761import kotlinx.coroutines.experimental.selects.*
1762-->
1763
1764### Selecting from channels
1765
1766Let us have two channels of strings `fizz` and `buzz`. The `fizz` channel produces "Fizz" string every 300 ms:
1767
1768```kotlin
1769val fizz = produce<String>(CommonPool) { // produce using common thread pool
1770 while (true) {
1771 delay(300)
1772 send("Fizz")
1773 }
1774}
1775```
1776
1777And the `buzz` channel produces "Buzz!" string every 500 ms:
1778
1779```kotlin
1780val buzz = produce<String>(CommonPool) {
1781 while (true) {
1782 delay(500)
1783 send("Buzz!")
1784 }
1785}
1786```
1787
1788Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the
1789other. But [select] expression allows us to receive from _both_ simultaneously using its
1790[onReceive][SelectBuilder.onReceive] clauses:
1791
1792```kotlin
1793suspend fun selectFizzBuzz() {
1794 select<Unit> { // <Unit> means that this select expression does not produce any result
1795 fizz.onReceive { value -> // this is the first select clause
1796 println("fizz -> '$value'")
1797 }
1798 buzz.onReceive { value -> // this is the second select clause
1799 println("buzz -> '$value'")
1800 }
1801 }
1802}
1803```
1804
Roman Elizarova84730b2017-02-22 11:58:50 +03001805Let us run it seven times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001806
1807```kotlin
1808fun main(args: Array<String>) = runBlocking<Unit> {
1809 repeat(7) {
1810 selectFizzBuzz()
1811 }
1812}
1813```
1814
1815> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt)
1816
1817The result of this code is:
1818
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001819```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001820fizz -> 'Fizz'
1821buzz -> 'Buzz!'
1822fizz -> 'Fizz'
1823fizz -> 'Fizz'
1824buzz -> 'Buzz!'
1825fizz -> 'Fizz'
1826buzz -> 'Buzz!'
1827```
1828
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001829<!--- TEST -->
1830
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001831### Selecting on close
1832
1833The [onReceive][SelectBuilder.onReceive] clause in `select` fails when the channel is closed and the corresponding
1834`select` throws an exception. We can use [onReceiveOrNull][SelectBuilder.onReceiveOrNull] clause to perform a
Roman Elizarova84730b2017-02-22 11:58:50 +03001835specific action when the channel is closed. The following example also shows that `select` is an expression that returns
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001836the result of its selected clause:
1837
1838```kotlin
1839suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
1840 select<String> {
1841 a.onReceiveOrNull { value ->
1842 if (value == null)
1843 "Channel 'a' is closed"
1844 else
1845 "a -> '$value'"
1846 }
1847 b.onReceiveOrNull { value ->
1848 if (value == null)
1849 "Channel 'b' is closed"
1850 else
1851 "b -> '$value'"
1852 }
1853 }
1854```
1855
Roman Elizarova84730b2017-02-22 11:58:50 +03001856Let's use it with channel `a` that produces "Hello" string four times and
1857channel `b` that produces "World" four times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001858
1859```kotlin
1860fun main(args: Array<String>) = runBlocking<Unit> {
1861 // we are using the context of the main thread in this example for predictability ...
1862 val a = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001863 repeat(4) { send("Hello $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001864 }
1865 val b = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001866 repeat(4) { send("World $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001867 }
1868 repeat(8) { // print first eight results
1869 println(selectAorB(a, b))
1870 }
1871}
1872```
1873
1874> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt)
1875
Roman Elizarova84730b2017-02-22 11:58:50 +03001876The result of this code is quite interesting, so we'll analyze it in mode detail:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001877
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001878```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001879a -> 'Hello 0'
1880a -> 'Hello 1'
1881b -> 'World 0'
1882a -> 'Hello 2'
1883a -> 'Hello 3'
1884b -> 'World 1'
1885Channel 'a' is closed
1886Channel 'a' is closed
1887```
1888
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001889<!--- TEST -->
1890
Roman Elizarova84730b2017-02-22 11:58:50 +03001891There are couple of observations to make out of it.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001892
1893First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time,
1894the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel,
Roman Elizarova84730b2017-02-22 11:58:50 +03001895being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001896time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
1897
1898The second observation, is that [onReceiveOrNull][SelectBuilder.onReceiveOrNull] gets immediately selected when the
1899channel is already closed.
1900
1901### Selecting to send
1902
1903Select expression has [onSend][SelectBuilder.onSend] clause that can be used for a great good in combination
1904with a biased nature of selection.
1905
Roman Elizarova84730b2017-02-22 11:58:50 +03001906Let us write an example of producer of integers that sends its values to a `side` channel when
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001907the consumers on its primary channel cannot keep up with it:
1908
1909```kotlin
1910fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
1911 for (num in 1..10) { // produce 10 numbers from 1 to 10
1912 delay(100) // every 100 ms
1913 select<Unit> {
Roman Elizarova84730b2017-02-22 11:58:50 +03001914 onSend(num) {} // Send to the primary channel
1915 side.onSend(num) {} // or to the side channel
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001916 }
1917 }
1918}
1919```
1920
1921Consumer is going to be quite slow, taking 250 ms to process each number:
1922
1923```kotlin
1924fun main(args: Array<String>) = runBlocking<Unit> {
1925 val side = Channel<Int>() // allocate side channel
1926 launch(context) { // this is a very fast consumer for the side channel
1927 for (num in side) println("Side channel has $num")
1928 }
1929 for (num in produceNumbers(side)) {
1930 println("Consuming $num")
1931 delay(250) // let us digest the consumed number properly, do not hurry
1932 }
1933 println("Done consuming")
1934}
1935```
1936
1937> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt)
1938
1939So let us see what happens:
1940
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001941```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001942Consuming 1
1943Side channel has 2
1944Side channel has 3
1945Consuming 4
1946Side channel has 5
1947Side channel has 6
1948Consuming 7
1949Side channel has 8
1950Side channel has 9
1951Consuming 10
1952Done consuming
1953```
1954
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001955<!--- TEST -->
1956
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001957### Selecting deferred values
1958
Roman Elizarova84730b2017-02-22 11:58:50 +03001959Deferred values can be selected using [onAwait][SelectBuilder.onAwait] clause.
1960Let us start with an async function that returns a deferred string value after
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001961a random delay:
1962
1963<!--- INCLUDE .*/example-select-04.kt
1964import java.util.*
1965-->
1966
1967```kotlin
1968fun asyncString(time: Int) = async(CommonPool) {
1969 delay(time.toLong())
1970 "Waited for $time ms"
1971}
1972```
1973
Roman Elizarova84730b2017-02-22 11:58:50 +03001974Let us start a dozen of them with a random delay.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001975
1976```kotlin
1977fun asyncStringsList(): List<Deferred<String>> {
1978 val random = Random(3)
Roman Elizarova84730b2017-02-22 11:58:50 +03001979 return List(12) { asyncString(random.nextInt(1000)) }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001980}
1981```
1982
Roman Elizarova84730b2017-02-22 11:58:50 +03001983Now the main function awaits for the first of them to complete and counts the number of deferred values
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001984that are still active. Note, that we've used here the fact that `select` expression is a Kotlin DSL,
Roman Elizarova84730b2017-02-22 11:58:50 +03001985so we can provide clauses for it using an arbitrary code. In this case we iterate over a list
1986of deferred values to provide `onAwait` clause for each deferred value.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001987
1988```kotlin
1989fun main(args: Array<String>) = runBlocking<Unit> {
1990 val list = asyncStringsList()
1991 val result = select<String> {
1992 list.withIndex().forEach { (index, deferred) ->
1993 deferred.onAwait { answer ->
1994 "Deferred $index produced answer '$answer'"
1995 }
1996 }
1997 }
1998 println(result)
Roman Elizarov7c864d82017-02-27 10:17:50 +03001999 val countActive = list.count { it.isActive }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002000 println("$countActive coroutines are still active")
2001}
2002```
2003
2004> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-04.kt)
2005
2006The output is:
2007
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002008```text
Roman Elizarova84730b2017-02-22 11:58:50 +03002009Deferred 4 produced answer 'Waited for 128 ms'
Roman Elizarovd4dcbe22017-02-22 09:57:46 +0300201011 coroutines are still active
2011```
2012
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002013<!--- TEST -->
2014
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002015### Switch over a channel of deferred values
2016
Roman Elizarova84730b2017-02-22 11:58:50 +03002017Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
2018deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002019[onReceiveOrNull][SelectBuilder.onReceiveOrNull] and [onAwait][SelectBuilder.onAwait] clauses in the same `select`:
2020
2021```kotlin
2022fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String>(CommonPool) {
Roman Elizarova84730b2017-02-22 11:58:50 +03002023 var current = input.receive() // start with first received deferred value
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002024 while (isActive) { // loop while not cancelled/closed
2025 val next = select<Deferred<String>?> { // return next deferred value from this select or null
2026 input.onReceiveOrNull { update ->
2027 update // replaces next value to wait
2028 }
2029 current.onAwait { value ->
2030 send(value) // send value that current deferred has produced
2031 input.receiveOrNull() // and use the next deferred from the input channel
2032 }
2033 }
2034 if (next == null) {
2035 println("Channel was closed")
2036 break // out of loop
2037 } else {
2038 current = next
2039 }
2040 }
2041}
2042```
2043
2044To test it, we'll use a simple async function that resolves to a specified string after a specified time:
2045
2046```kotlin
2047fun asyncString(str: String, time: Long) = async(CommonPool) {
2048 delay(time)
2049 str
2050}
2051```
2052
2053The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
2054data to it:
2055
2056```kotlin
2057fun main(args: Array<String>) = runBlocking<Unit> {
2058 val chan = Channel<Deferred<String>>() // the channel for test
Roman Elizarova84730b2017-02-22 11:58:50 +03002059 launch(context) { // launch printing coroutine
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002060 for (s in switchMapDeferreds(chan))
2061 println(s) // print each received string
2062 }
2063 chan.send(asyncString("BEGIN", 100))
2064 delay(200) // enough time for "BEGIN" to be produced
2065 chan.send(asyncString("Slow", 500))
Roman Elizarova84730b2017-02-22 11:58:50 +03002066 delay(100) // not enough time to produce slow
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002067 chan.send(asyncString("Replace", 100))
Roman Elizarova84730b2017-02-22 11:58:50 +03002068 delay(500) // give it time before the last one
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002069 chan.send(asyncString("END", 500))
2070 delay(1000) // give it time to process
Roman Elizarova84730b2017-02-22 11:58:50 +03002071 chan.close() // close the channel ...
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002072 delay(500) // and wait some time to let it finish
2073}
2074```
2075
2076> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-05.kt)
2077
2078The result of this code:
2079
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002080```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002081BEGIN
2082Replace
2083END
2084Channel was closed
2085```
2086
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002087<!--- TEST -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002088
Roman Elizarove0c817d2017-02-10 10:22:01 +03002089<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core -->
2090<!--- DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
2091<!--- INDEX kotlinx.coroutines.experimental -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002092[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
2093[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/delay.html
2094[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
2095[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
2096[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-cancellation-exception.html
2097[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
2098[CoroutineScope.isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/is-active.html
2099[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
2100[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
2101[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
2102[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
2103[async]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/async.html
2104[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
2105[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/await.html
2106[Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/start.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002107[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
2108[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002109[CoroutineScope.context]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/context.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002110[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002111[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002112[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
2113[Job.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/invoke.html
2114[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/cancel.html
Roman Elizarovf5bc0472017-02-22 11:38:13 +03002115<!--- INDEX kotlinx.coroutines.experimental.sync -->
2116[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
2117[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/lock.html
2118[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/unlock.html
Roman Elizarove0c817d2017-02-10 10:22:01 +03002119<!--- INDEX kotlinx.coroutines.experimental.channels -->
Roman Elizarov419a6c82017-02-09 18:36:22 +03002120[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002121[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/send.html
2122[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/receive.html
2123[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/close.html
Roman Elizarova5e653f2017-02-13 13:49:55 +03002124[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002125[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/invoke.html
Roman Elizarovc0e19f82017-02-27 11:59:14 +03002126[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002127<!--- INDEX kotlinx.coroutines.experimental.selects -->
2128[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
2129[SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-receive.html
2130[SelectBuilder.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-receive-or-null.html
2131[SelectBuilder.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-send.html
2132[SelectBuilder.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-await.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002133<!--- END -->