blob: b9abbd78da5b5e74b4bd2df19f681b0540139542 [file] [log] [blame] [view]
Roman Elizarova5e653f2017-02-13 13:49:55 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9]+)\.kt
2/*
3 * Copyright 2016-2017 JetBrains s.r.o.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
Roman Elizarovf16fd272017-02-07 11:26:00 +030017
Roman Elizarova5e653f2017-02-13 13:49:55 +030018// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
19package guide.$$1.example$$2
Roman Elizarovf16fd272017-02-07 11:26:00 +030020
Roman Elizarova5e653f2017-02-13 13:49:55 +030021import kotlinx.coroutines.experimental.*
Roman Elizarovf16fd272017-02-07 11:26:00 +030022-->
Roman Elizarova5e653f2017-02-13 13:49:55 +030023<!--- KNIT kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
Roman Elizarovf16fd272017-02-07 11:26:00 +030024
Roman Elizarov7deefb82017-01-31 10:33:17 +030025# Guide to kotlinx.coroutines by example
26
27This is a short guide on core features of `kotlinx.coroutines` with a series of examples.
28
Roman Elizarov1293ccd2017-02-01 18:49:54 +030029## Table of contents
30
Roman Elizarovfa7723e2017-02-06 11:17:51 +030031<!--- TOC -->
32
Roman Elizarov1293ccd2017-02-01 18:49:54 +030033* [Coroutine basics](#coroutine-basics)
34 * [Your first coroutine](#your-first-coroutine)
35 * [Bridging blocking and non-blocking worlds](#bridging-blocking-and-non-blocking-worlds)
36 * [Waiting for a job](#waiting-for-a-job)
37 * [Extract function refactoring](#extract-function-refactoring)
38 * [Coroutines ARE light-weight](#coroutines-are-light-weight)
39 * [Coroutines are like daemon threads](#coroutines-are-like-daemon-threads)
40* [Cancellation and timeouts](#cancellation-and-timeouts)
41 * [Cancelling coroutine execution](#cancelling-coroutine-execution)
42 * [Cancellation is cooperative](#cancellation-is-cooperative)
43 * [Making computation code cancellable](#making-computation-code-cancellable)
44 * [Closing resources with finally](#closing-resources-with-finally)
45 * [Run non-cancellable block](#run-non-cancellable-block)
46 * [Timeout](#timeout)
47* [Composing suspending functions](#composing-suspending-functions)
48 * [Sequential by default](#sequential-by-default)
Roman Elizarov32d95322017-02-09 15:57:31 +030049 * [Concurrent using async](#concurrent-using-async)
50 * [Lazily started async](#lazily-started-async)
51 * [Async-style functions](#async-style-functions)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030052* [Coroutine context and dispatchers](#coroutine-context-and-dispatchers)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030053 * [Dispatchers and threads](#dispatchers-and-threads)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030054 * [Unconfined vs confined dispatcher](#unconfined-vs-confined-dispatcher)
55 * [Debugging coroutines and threads](#debugging-coroutines-and-threads)
56 * [Jumping between threads](#jumping-between-threads)
57 * [Job in the context](#job-in-the-context)
58 * [Children of a coroutine](#children-of-a-coroutine)
59 * [Combining contexts](#combining-contexts)
60 * [Naming coroutines for debugging](#naming-coroutines-for-debugging)
Roman Elizarov2fd7cb32017-02-11 23:18:59 +030061 * [Cancellation via explicit job](#cancellation-via-explicit-job)
Roman Elizarovb7721cf2017-02-03 19:23:08 +030062* [Channels](#channels)
63 * [Channel basics](#channel-basics)
64 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
65 * [Building channel producers](#building-channel-producers)
66 * [Pipelines](#pipelines)
67 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
68 * [Fan-out](#fan-out)
69 * [Fan-in](#fan-in)
70 * [Buffered channels](#buffered-channels)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030071* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
72 * [The problem](#the-problem)
73 * [Thread-safe data structures](#thread-safe-data-structures)
74 * [Thread confinement](#thread-confinement)
75 * [Mutual exclusion](#mutual-exclusion)
76 * [Actors](#actors)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +030077* [Select expression](#select-expression)
78 * [Selecting from channels](#selecting-from-channels)
79 * [Selecting on close](#selecting-on-close)
80 * [Selecting to send](#selecting-to-send)
81 * [Selecting deferred values](#selecting-deferred-values)
82 * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030083
Roman Elizarova5e653f2017-02-13 13:49:55 +030084<!--- END_TOC -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +030085
86## Coroutine basics
87
88This section covers basic coroutine concepts.
89
90### Your first coroutine
Roman Elizarov7deefb82017-01-31 10:33:17 +030091
92Run the following code:
93
94```kotlin
95fun main(args: Array<String>) {
96 launch(CommonPool) { // create new coroutine in common thread pool
97 delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
98 println("World!") // print after delay
99 }
100 println("Hello,") // main function continues while coroutine is delayed
101 Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
102}
103```
104
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300105> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300106
107Run this code:
108
109```
110Hello,
111World!
112```
113
Roman Elizarov419a6c82017-02-09 18:36:22 +0300114Essentially, coroutines are light-weight threads.
115They are launched with [launch] _coroutine builder_.
116You can achieve the same result replacing
Roman Elizarov7deefb82017-01-31 10:33:17 +0300117`launch(CommonPool) { ... }` with `thread { ... }` and `delay(...)` with `Thread.sleep(...)`. Try it.
118
119If you start by replacing `launch(CommonPool)` by `thread`, the compiler produces the following error:
120
121```
122Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
123```
124
Roman Elizarov419a6c82017-02-09 18:36:22 +0300125That is because [delay] is a special _suspending function_ that does not block a thread, but _suspends_
Roman Elizarov7deefb82017-01-31 10:33:17 +0300126coroutine and it can be only used from a coroutine.
127
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300128### Bridging blocking and non-blocking worlds
Roman Elizarov7deefb82017-01-31 10:33:17 +0300129
130The first example mixes _non-blocking_ `delay(...)` and _blocking_ `Thread.sleep(...)` in the same
131code of `main` function. It is easy to get lost. Let's cleanly separate blocking and non-blocking
Roman Elizarov419a6c82017-02-09 18:36:22 +0300132worlds by using [runBlocking]:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300133
134```kotlin
135fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
136 launch(CommonPool) { // create new coroutine in common thread pool
137 delay(1000L)
138 println("World!")
139 }
140 println("Hello,") // main coroutine continues while child is delayed
141 delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
142}
143```
144
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300145> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300146
Roman Elizarov419a6c82017-02-09 18:36:22 +0300147The result is the same, but this code uses only non-blocking [delay].
Roman Elizarov7deefb82017-01-31 10:33:17 +0300148
149`runBlocking { ... }` works as an adaptor that is used here to start the top-level main coroutine.
150The regular code outside of `runBlocking` _blocks_, until the coroutine inside `runBlocking` is active.
151
152This is also a way to write unit-tests for suspending functions:
153
154```kotlin
155class MyTest {
156 @Test
157 fun testMySuspendingFunction() = runBlocking<Unit> {
158 // here we can use suspending functions using any assertion style that we like
159 }
160}
161```
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300162
163<!--- CLEAR -->
Roman Elizarov7deefb82017-01-31 10:33:17 +0300164
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300165### Waiting for a job
Roman Elizarov7deefb82017-01-31 10:33:17 +0300166
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300167Delaying for a time while another coroutine is working is not a good approach. Let's explicitly
Roman Elizarov419a6c82017-02-09 18:36:22 +0300168wait (in a non-blocking way) until the background [Job] that we have launched is complete:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300169
170```kotlin
171fun main(args: Array<String>) = runBlocking<Unit> {
172 val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
173 delay(1000L)
174 println("World!")
175 }
176 println("Hello,")
177 job.join() // wait until child coroutine completes
178}
179```
180
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300181> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300182
183Now the result is still the same, but the code of the main coroutine is not tied to the duration of
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300184the background job in any way. Much better.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300185
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300186### Extract function refactoring
Roman Elizarov7deefb82017-01-31 10:33:17 +0300187
188Let's extract the block of code inside `launch(CommonPool} { ... }` into a separate function. When you
189perform "Extract function" refactoring on this code you get a new function with `suspend` modifier.
190That is your first _suspending function_. Suspending functions can be used inside coroutines
191just like regular functions, but their additional feature is that they can, in turn,
192use other suspending functions, like `delay` in this example, to _suspend_ execution of a coroutine.
193
194```kotlin
195fun main(args: Array<String>) = runBlocking<Unit> {
196 val job = launch(CommonPool) { doWorld() }
197 println("Hello,")
198 job.join()
199}
200
201// this is your first suspending function
202suspend fun doWorld() {
203 delay(1000L)
204 println("World!")
205}
206```
207
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300208> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-04.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300209
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300210### Coroutines ARE light-weight
Roman Elizarov7deefb82017-01-31 10:33:17 +0300211
212Run the following code:
213
214```kotlin
215fun main(args: Array<String>) = runBlocking<Unit> {
216 val jobs = List(100_000) { // create a lot of coroutines and list their jobs
217 launch(CommonPool) {
218 delay(1000L)
219 print(".")
220 }
221 }
222 jobs.forEach { it.join() } // wait for all jobs to complete
223}
224```
225
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300226> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-05.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300227
228It starts 100K coroutines and, after a second, each coroutine prints a dot.
229Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
230
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300231### Coroutines are like daemon threads
Roman Elizarov7deefb82017-01-31 10:33:17 +0300232
233The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300234returns from the main function after some delay:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300235
236```kotlin
237fun main(args: Array<String>) = runBlocking<Unit> {
238 launch(CommonPool) {
239 repeat(1000) { i ->
240 println("I'm sleeping $i ...")
241 delay(500L)
242 }
243 }
244 delay(1300L) // just quit after delay
245}
246```
247
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300248> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-06.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300249
250You can run and see that it prints three lines and terminates:
251
252```
253I'm sleeping 0 ...
254I'm sleeping 1 ...
255I'm sleeping 2 ...
256```
257
258Active coroutines do not keep the process alive. They are like daemon threads.
259
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300260## Cancellation and timeouts
261
262This section covers coroutine cancellation and timeouts.
263
264### Cancelling coroutine execution
Roman Elizarov7deefb82017-01-31 10:33:17 +0300265
266In small application the return from "main" method might sound like a good idea to get all coroutines
267implicitly terminated. In a larger, long-running application, you need finer-grained control.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300268The [launch] function returns a [Job] that can be used to cancel running coroutine:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300269
270```kotlin
271fun main(args: Array<String>) = runBlocking<Unit> {
272 val job = launch(CommonPool) {
273 repeat(1000) { i ->
274 println("I'm sleeping $i ...")
275 delay(500L)
276 }
277 }
278 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300279 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300280 job.cancel() // cancels the job
281 delay(1300L) // delay a bit to ensure it was cancelled indeed
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300282 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300283}
284```
285
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300286> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300287
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300288It produces the following output:
289
290```
291I'm sleeping 0 ...
292I'm sleeping 1 ...
293I'm sleeping 2 ...
294main: I'm tired of waiting!
295main: Now I can quit.
296```
297
298As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
299
300### Cancellation is cooperative
Roman Elizarov7deefb82017-01-31 10:33:17 +0300301
Tair Rzayevaf734622017-02-01 22:30:16 +0200302Coroutine cancellation is _cooperative_. A coroutine code has to cooperate to be cancellable.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300303All the suspending functions in `kotlinx.coroutines` are _cancellable_. They check for cancellation of
Roman Elizarov419a6c82017-02-09 18:36:22 +0300304coroutine and throw [CancellationException] when cancelled. However, if a coroutine is working in
Roman Elizarov7deefb82017-01-31 10:33:17 +0300305a computation and does not check for cancellation, then it cannot be cancelled, like the following
306example shows:
307
308```kotlin
309fun main(args: Array<String>) = runBlocking<Unit> {
310 val job = launch(CommonPool) {
311 var nextPrintTime = 0L
312 var i = 0
313 while (true) { // computation loop
314 val currentTime = System.currentTimeMillis()
315 if (currentTime >= nextPrintTime) {
316 println("I'm sleeping ${i++} ...")
317 nextPrintTime = currentTime + 500L
318 }
319 }
320 }
321 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300322 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300323 job.cancel() // cancels the job
324 delay(1300L) // delay a bit to see if it was cancelled....
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300325 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300326}
327```
328
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300329> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300330
331Run it to see that it continues to print "I'm sleeping" even after cancellation.
332
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300333### Making computation code cancellable
Roman Elizarov7deefb82017-01-31 10:33:17 +0300334
335There are two approaches to making computation code cancellable. The first one is to periodically
Roman Elizarov419a6c82017-02-09 18:36:22 +0300336invoke a suspending function. There is a [yield] function that is a good choice for that purpose.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300337The other one is to explicitly check the cancellation status. Let us try the later approach.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300338
339Replace `while (true)` in the previous example with `while (isActive)` and rerun it.
340
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300341```kotlin
342fun main(args: Array<String>) = runBlocking<Unit> {
343 val job = launch(CommonPool) {
344 var nextPrintTime = 0L
345 var i = 0
346 while (isActive) { // cancellable computation loop
347 val currentTime = System.currentTimeMillis()
348 if (currentTime >= nextPrintTime) {
349 println("I'm sleeping ${i++} ...")
350 nextPrintTime = currentTime + 500L
351 }
352 }
353 }
354 delay(1300L) // delay a bit
355 println("main: I'm tired of waiting!")
356 job.cancel() // cancels the job
357 delay(1300L) // delay a bit to see if it was cancelled....
358 println("main: Now I can quit.")
359}
360```
361
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300362> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300363
Roman Elizarov419a6c82017-02-09 18:36:22 +0300364As you can see, now this loop can be cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
365the code of coroutines via [CoroutineScope] object.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300366
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300367### Closing resources with finally
368
Roman Elizarov419a6c82017-02-09 18:36:22 +0300369Cancellable suspending functions throw [CancellationException] on cancellation which can be handled in
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300370all the usual way. For example, the `try {...} finally {...}` and Kotlin `use` function execute their
371finalization actions normally when coroutine is cancelled:
372
373```kotlin
374fun main(args: Array<String>) = runBlocking<Unit> {
375 val job = launch(CommonPool) {
376 try {
377 repeat(1000) { i ->
378 println("I'm sleeping $i ...")
379 delay(500L)
380 }
381 } finally {
382 println("I'm running finally")
383 }
384 }
385 delay(1300L) // delay a bit
386 println("main: I'm tired of waiting!")
387 job.cancel() // cancels the job
388 delay(1300L) // delay a bit to ensure it was cancelled indeed
389 println("main: Now I can quit.")
390}
391```
392
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300393> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300394
395The example above produces the following output:
396
397```
398I'm sleeping 0 ...
399I'm sleeping 1 ...
400I'm sleeping 2 ...
401main: I'm tired of waiting!
402I'm running finally
403main: Now I can quit.
404```
405
406### Run non-cancellable block
407
408Any attempt to use a suspending function in the `finally` block of the previous example will cause
Roman Elizarov419a6c82017-02-09 18:36:22 +0300409[CancellationException], because the coroutine running this code is cancelled. Usually, this is not a
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300410problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
411communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
412rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
Roman Elizarov419a6c82017-02-09 18:36:22 +0300413`run(NonCancellable) {...}` using [run] function and [NonCancellable] context as the following example shows:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300414
415```kotlin
416fun main(args: Array<String>) = runBlocking<Unit> {
417 val job = launch(CommonPool) {
418 try {
419 repeat(1000) { i ->
420 println("I'm sleeping $i ...")
421 delay(500L)
422 }
423 } finally {
424 run(NonCancellable) {
425 println("I'm running finally")
426 delay(1000L)
427 println("And I've just delayed for 1 sec because I'm non-cancellable")
428 }
429 }
430 }
431 delay(1300L) // delay a bit
432 println("main: I'm tired of waiting!")
433 job.cancel() // cancels the job
434 delay(1300L) // delay a bit to ensure it was cancelled indeed
435 println("main: Now I can quit.")
436}
437```
438
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300439> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300440
441### Timeout
442
443The most obvious reason to cancel coroutine execution in practice,
444is because its execution time has exceeded some timeout.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300445While you can manually track the reference to the corresponding [Job] and launch a separate coroutine to cancel
446the tracked one after delay, there is a ready to use [withTimeout] function that does it.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300447Look at the following example:
448
449```kotlin
450fun main(args: Array<String>) = runBlocking<Unit> {
451 withTimeout(1300L) {
452 repeat(1000) { i ->
453 println("I'm sleeping $i ...")
454 delay(500L)
455 }
456 }
457}
458```
459
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300460> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-06.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300461
462It produces the following output:
463
464```
465I'm sleeping 0 ...
466I'm sleeping 1 ...
467I'm sleeping 2 ...
468Exception in thread "main" java.util.concurrent.CancellationException: Timed out waiting for 1300 MILLISECONDS
469```
470
Roman Elizarov419a6c82017-02-09 18:36:22 +0300471We have not seen the [CancellationException] stack trace printed on the console before. That is because
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300472inside a cancelled coroutine `CancellationException` is a considered a normal reason for coroutine completion.
473However, in this example we have used `withTimeout` right inside the `main` function.
474
475Because cancellation is just an exception, all the resources will be closed in a usual way.
476You can wrap the code with timeout in `try {...} catch (e: CancellationException) {...}` block if
477you need to do some additional action specifically on timeout.
478
479## Composing suspending functions
480
481This section covers various approaches to composition of suspending functions.
482
483### Sequential by default
484
485Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300486remote service call or computation. We just pretend they are useful, but actually each one just
487delays for a second for the purpose of this example:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300488
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300489<!--- INCLUDE .*/example-compose-([0-9]+).kt
490import kotlin.system.measureTimeMillis
491-->
492
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300493```kotlin
494suspend fun doSomethingUsefulOne(): Int {
495 delay(1000L) // pretend we are doing something useful here
496 return 13
497}
498
499suspend fun doSomethingUsefulTwo(): Int {
500 delay(1000L) // pretend we are doing something useful here, too
501 return 29
502}
503```
504
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300505<!--- INCLUDE .*/example-compose-([0-9]+).kt -->
506
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300507What do we do if need to invoke them _sequentially_ -- first `doSomethingUsefulOne` _and then_
508`doSomethingUsefulTwo` and compute the sum of their results?
509In practise we do this if we use the results of the first function to make a decision on whether we need
510to invoke the second one or to decide on how to invoke it.
511
512We just use a normal sequential invocation, because the code in the coroutine, just like in the regular
Roman Elizarov32d95322017-02-09 15:57:31 +0300513code, is _sequential_ by default. The following example demonstrates it by measuring the total
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300514time it takes to execute both suspending functions:
515
516```kotlin
517fun main(args: Array<String>) = runBlocking<Unit> {
518 val time = measureTimeMillis {
519 val one = doSomethingUsefulOne()
520 val two = doSomethingUsefulTwo()
521 println("The answer is ${one + two}")
522 }
523 println("Completed in $time ms")
524}
525```
526
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300527> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-01.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300528
529It produces something like this:
530
531```
532The answer is 42
533Completed in 2017 ms
534```
535
Roman Elizarov32d95322017-02-09 15:57:31 +0300536### Concurrent using async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300537
538What if there are no dependencies between invocation of `doSomethingUsefulOne` and `doSomethingUsefulTwo` and
Roman Elizarov419a6c82017-02-09 18:36:22 +0300539we want to get the answer faster, by doing both _concurrently_? This is where [async] comes to help.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300540
Roman Elizarov419a6c82017-02-09 18:36:22 +0300541Conceptually, [async] is just like [launch]. It starts a separate coroutine which is a light-weight thread
542that works concurrently with all the other coroutines. The difference is that `launch` returns a [Job] and
543does not carry any resulting value, while `async` returns a [Deferred] -- a light-weight non-blocking future
Roman Elizarov32d95322017-02-09 15:57:31 +0300544that represents a promise to provide a result later. You can use `.await()` on a deferred value to get its eventual result,
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300545but `Deferred` is also a `Job`, so you can cancel it if needed.
546
547```kotlin
548fun main(args: Array<String>) = runBlocking<Unit> {
549 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300550 val one = async(CommonPool) { doSomethingUsefulOne() }
551 val two = async(CommonPool) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300552 println("The answer is ${one.await() + two.await()}")
553 }
554 println("Completed in $time ms")
555}
556```
557
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300558> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-02.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300559
560It produces something like this:
561
562```
563The answer is 42
564Completed in 1017 ms
565```
566
567This is twice as fast, because we have concurrent execution of two coroutines.
568Note, that concurrency with coroutines is always explicit.
569
Roman Elizarov32d95322017-02-09 15:57:31 +0300570### Lazily started async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300571
Roman Elizarov419a6c82017-02-09 18:36:22 +0300572There is a laziness option to [async] with `start = false` parameter.
573It starts coroutine only when its result is needed by some
574[await][Deferred.await] or if a [start][Job.start] function
Roman Elizarov32d95322017-02-09 15:57:31 +0300575is invoked. Run the following example that differs from the previous one only by this option:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300576
577```kotlin
578fun main(args: Array<String>) = runBlocking<Unit> {
579 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300580 val one = async(CommonPool, start = false) { doSomethingUsefulOne() }
581 val two = async(CommonPool, start = false) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300582 println("The answer is ${one.await() + two.await()}")
583 }
584 println("Completed in $time ms")
585}
586```
587
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300588> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-03.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300589
590It produces something like this:
591
592```
593The answer is 42
594Completed in 2017 ms
595```
596
Roman Elizarov32d95322017-02-09 15:57:31 +0300597So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
598for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
599the standard `lazy` function in cases when computation of the value involves suspending functions.
600
601### Async-style functions
602
603We can define async-style functions that invoke `doSomethingUsefulOne` and `doSomethingUsefulTwo`
Roman Elizarov419a6c82017-02-09 18:36:22 +0300604_asynchronously_ using [async] coroutine builder. It is a good style to name such functions with
Roman Elizarov32d95322017-02-09 15:57:31 +0300605either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous
606computation and one needs to use the resulting deferred value to get the result.
607
608```kotlin
609// The result type of asyncSomethingUsefulOne is Deferred<Int>
610fun asyncSomethingUsefulOne() = async(CommonPool) {
611 doSomethingUsefulOne()
612}
613
614// The result type of asyncSomethingUsefulTwo is Deferred<Int>
615fun asyncSomethingUsefulTwo() = async(CommonPool) {
616 doSomethingUsefulTwo()
617}
618```
619
620Note, that these `asyncXXX` function are **not** _suspending_ functions. They can be used from anywhere.
621However, their use always implies asynchronous (here meaning _concurrent_) execution of their action
622with the invoking code.
623
624The following example shows their use outside of coroutine:
625
626```kotlin
627// note, that we don't have `runBlocking` to the right of `main` in this example
628fun main(args: Array<String>) {
629 val time = measureTimeMillis {
630 // we can initiate async actions outside of a coroutine
631 val one = asyncSomethingUsefulOne()
632 val two = asyncSomethingUsefulTwo()
633 // but waiting for a result must involve either suspending or blocking.
634 // here we use `runBlocking { ... }` to block the main thread while waiting for the result
635 runBlocking {
636 println("The answer is ${one.await() + two.await()}")
637 }
638 }
639 println("Completed in $time ms")
640}
641```
642
643> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300644
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300645## Coroutine context and dispatchers
646
Roman Elizarov32d95322017-02-09 15:57:31 +0300647We've already seen `launch(CommonPool) {...}`, `async(CommonPool) {...}`, `run(NonCancellable) {...}`, etc.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300648In these code snippets [CommonPool] and [NonCancellable] are _coroutine contexts_.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300649This section covers other available choices.
650
651### Dispatchers and threads
652
Roman Elizarov419a6c82017-02-09 18:36:22 +0300653Coroutine context includes a [_coroutine dispatcher_][CoroutineDispatcher] which determines what thread or threads
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300654the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
655to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:
656
657```kotlin
658fun main(args: Array<String>) = runBlocking<Unit> {
659 val jobs = arrayListOf<Job>()
660 jobs += launch(Unconfined) { // not confined -- will work with main thread
661 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
662 }
663 jobs += launch(context) { // context of the parent, runBlocking coroutine
664 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
665 }
666 jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
667 println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
668 }
669 jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
670 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
671 }
672 jobs.forEach { it.join() }
673}
674```
675
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300676> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-01.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300677
678It produces the following output (maybe in different order):
679
680```
681 'Unconfined': I'm working in thread main
682 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
683 'newSTC': I'm working in thread MyOwnThread
684 'context': I'm working in thread main
685```
686
Roman Elizarov419a6c82017-02-09 18:36:22 +0300687The difference between parent [context][CoroutineScope.context] and [Unconfined] context will be shown later.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300688
689### Unconfined vs confined dispatcher
690
Roman Elizarov419a6c82017-02-09 18:36:22 +0300691The [Unconfined] coroutine dispatcher starts coroutine in the caller thread, but only until the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300692first suspension point. After suspension it resumes in the thread that is fully determined by the
693suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not
694consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
695
Roman Elizarov419a6c82017-02-09 18:36:22 +0300696On the other side, [context][CoroutineScope.context] property that is available inside the block of any coroutine
697via [CoroutineScope] interface, is a reference to a context of this particular coroutine.
698This way, a parent context can be inherited. The default context of [runBlocking], in particular,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300699is confined to be invoker thread, so inheriting it has the effect of confining execution to
700this thread with a predictable FIFO scheduling.
701
702```kotlin
703fun main(args: Array<String>) = runBlocking<Unit> {
704 val jobs = arrayListOf<Job>()
705 jobs += launch(Unconfined) { // not confined -- will work with main thread
706 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
707 delay(1000)
708 println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
709 }
710 jobs += launch(context) { // context of the parent, runBlocking coroutine
711 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
712 delay(1000)
713 println(" 'context': After delay in thread ${Thread.currentThread().name}")
714 }
715 jobs.forEach { it.join() }
716}
717```
718
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300719> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-contest-02.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300720
721Produces the output:
722
723```
724 'Unconfined': I'm working in thread main
725 'context': I'm working in thread main
726 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
727 'context': After delay in thread main
728```
729
730So, the coroutine the had inherited `context` of `runBlocking {...}` continues to execute in the `main` thread,
Roman Elizarov419a6c82017-02-09 18:36:22 +0300731while the unconfined one had resumed in the scheduler thread that [delay] function is using.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300732
733### Debugging coroutines and threads
734
Roman Elizarov419a6c82017-02-09 18:36:22 +0300735Coroutines can suspend on one thread and resume on another thread with [Unconfined] dispatcher or
736with a multi-threaded dispatcher like [CommonPool]. Even with a single-threaded dispatcher it might be hard to
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300737figure out what coroutine was doing what, where, and when. The common approach to debugging applications with
738threads is to print the thread name in the log file on each log statement. This feature is universally supported
739by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
740`kotlinx.coroutines` includes debugging facilities to make it easier.
741
742Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
743
744```kotlin
745fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
746
747fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov32d95322017-02-09 15:57:31 +0300748 val a = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300749 log("I'm computing a piece of the answer")
750 6
751 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300752 val b = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300753 log("I'm computing another piece of the answer")
754 7
755 }
756 log("The answer is ${a.await() * b.await()}")
757}
758```
759
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300760> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300761
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300762There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300763and two coroutines computing deferred values `a` (#2) and `b` (#3).
764They are all executing in the context of `runBlocking` and are confined to the main thread.
765The output of this code is:
766
767```
768[main @coroutine#2] I'm computing a piece of the answer
769[main @coroutine#3] I'm computing another piece of the answer
770[main @coroutine#1] The answer is 42
771```
772
773The `log` function prints the name of the thread in square brackets and you can see, that it is the `main`
774thread, but the identifier of the currently executing coroutine is appended to it. This identifier
775is consecutively assigned to all created coroutines when debugging mode is turned on.
776
Roman Elizarov419a6c82017-02-09 18:36:22 +0300777You can read more about debugging facilities in the documentation for [newCoroutineContext] function.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300778
779### Jumping between threads
780
781Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
782
783```kotlin
784fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
785
786fun main(args: Array<String>) {
787 val ctx1 = newSingleThreadContext("Ctx1")
788 val ctx2 = newSingleThreadContext("Ctx2")
789 runBlocking(ctx1) {
790 log("Started in ctx1")
791 run(ctx2) {
792 log("Working in ctx2")
793 }
794 log("Back to ctx1")
795 }
796}
797```
798
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300799> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300800
Roman Elizarov419a6c82017-02-09 18:36:22 +0300801It demonstrates two new techniques. One is using [runBlocking] with an explicitly specified context, and
802the second one is using [run] function to change a context of a coroutine while still staying in the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300803same coroutine as you can see in the output below:
804
805```
806[Ctx1 @coroutine#1] Started in ctx1
807[Ctx2 @coroutine#1] Working in ctx2
808[Ctx1 @coroutine#1] Back to ctx1
809```
810
811### Job in the context
812
Roman Elizarov419a6c82017-02-09 18:36:22 +0300813The coroutine [Job] is part of its context. The coroutine can retrieve it from its own context
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300814using `context[Job]` expression:
815
816```kotlin
817fun main(args: Array<String>) = runBlocking<Unit> {
818 println("My job is ${context[Job]}")
819}
820```
821
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300822> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-05.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300823
824It produces
825
826```
827My job is BlockingCoroutine{isActive=true}
828```
829
Roman Elizarov419a6c82017-02-09 18:36:22 +0300830So, [isActive][CoroutineScope.isActive] in [CoroutineScope] is just a convenient shortcut for `context[Job]!!.isActive`.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300831
832### Children of a coroutine
833
Roman Elizarov419a6c82017-02-09 18:36:22 +0300834When [context][CoroutineScope.context] of a coroutine is used to launch another coroutine,
835the [Job] of the new coroutine becomes
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300836a _child_ of the parent coroutine's job. When the parent coroutine is cancelled, all its children
837are recursively cancelled, too.
838
839```kotlin
840fun main(args: Array<String>) = runBlocking<Unit> {
841 // start a coroutine to process some kind of incoming request
842 val request = launch(CommonPool) {
843 // it spawns two other jobs, one with its separate context
844 val job1 = launch(CommonPool) {
845 println("job1: I have my own context and execute independently!")
846 delay(1000)
847 println("job1: I am not affected by cancellation of the request")
848 }
849 // and the other inherits the parent context
850 val job2 = launch(context) {
851 println("job2: I am a child of the request coroutine")
852 delay(1000)
853 println("job2: I will not execute this line if my parent request is cancelled")
854 }
855 // request completes when both its sub-jobs complete:
856 job1.join()
857 job2.join()
858 }
859 delay(500)
860 request.cancel() // cancel processing of the request
861 delay(1000) // delay a second to see what happens
862 println("main: Who has survived request cancellation?")
863}
864```
865
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300866> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-06.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300867
868The output of this code is:
869
870```
871job1: I have my own context and execute independently!
872job2: I am a child of the request coroutine
873job1: I am not affected by cancellation of the request
874main: Who has survived request cancellation?
875```
876
877### Combining contexts
878
879Coroutine context can be combined using `+` operator. The context on the right-hand side replaces relevant entries
Roman Elizarov419a6c82017-02-09 18:36:22 +0300880of the context on the left-hand side. For example, a [Job] of the parent coroutine can be inherited, while
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300881its dispatcher replaced:
882
883```kotlin
884fun main(args: Array<String>) = runBlocking<Unit> {
885 // start a coroutine to process some kind of incoming request
886 val request = launch(context) { // use the context of `runBlocking`
887 // spawns CPU-intensive child job in CommonPool !!!
888 val job = launch(context + CommonPool) {
889 println("job: I am a child of the request coroutine, but with a different dispatcher")
890 delay(1000)
891 println("job: I will not execute this line if my parent request is cancelled")
892 }
893 job.join() // request completes when its sub-job completes
894 }
895 delay(500)
896 request.cancel() // cancel processing of the request
897 delay(1000) // delay a second to see what happens
898 println("main: Who has survived request cancellation?")
899}
900```
901
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300902> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-07.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300903
904The expected outcome of this code is:
905
906```
907job: I am a child of the request coroutine, but with a different dispatcher
908main: Who has survived request cancellation?
909```
910
911### Naming coroutines for debugging
912
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300913Automatically assigned ids are good when coroutines log often and you just need to correlate log records
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300914coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
915or doing some specific background task, it is better to name it explicitly for debugging purposes.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300916[CoroutineName] serves the same function as a thread name. It'll get displayed in the thread name that
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300917is executing this coroutine when debugging more is turned on.
918
919The following example demonstrates this concept:
920
921```kotlin
922fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
923
924fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
925 log("Started main coroutine")
926 // run two background value computations
Roman Elizarov32d95322017-02-09 15:57:31 +0300927 val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300928 log("Computing v1")
929 delay(500)
930 252
931 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300932 val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300933 log("Computing v2")
934 delay(1000)
935 6
936 }
937 log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
938}
939```
940
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300941> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300942
943The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
944
945```
946[main @main#1] Started main coroutine
947[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
948[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
949[main @main#1] The answer for v1 / v2 = 42
950```
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300951
Roman Elizarov2fd7cb32017-02-11 23:18:59 +0300952### Cancellation via explicit job
953
954Let us put our knowledge about contexts, children and jobs together. Assume that our application has
955an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application
956and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch
957and update data, do animations, etc. All of these coroutines must be cancelled when activity is destroyed
958to avoid memory leaks.
959
960We can manage a lifecycle of our coroutines by creating an instance of [Job] that is tied to
961the lifecycle of our activity. A job instance is created using [Job()][Job.invoke] factory function
962as the following example shows. We need to make sure that all the coroutines are started
963with this job in their context and then a single invocation of [Job.cancel] terminates them all.
964
965```kotlin
966fun main(args: Array<String>) = runBlocking<Unit> {
967 val job = Job() // create a job object to manage our lifecycle
968 // now launch ten coroutines for a demo, each working for a different time
969 val coroutines = List(10) { i ->
970 // they are all children of our job object
971 launch(context + job) { // we use the context of main runBlocking thread, but with our own job object
972 delay(i * 200L) // variable delay 0ms, 200ms, 400ms, ... etc
973 println("Coroutine $i is done")
974 }
975 }
976 println("Launched ${coroutines.size} coroutines")
977 delay(500L) // delay for half a second
978 println("Cancelling job!")
979 job.cancel() // cancel our job.. !!!
980 delay(1000L) // delay for more to see if our coroutines are still working
981}
982```
983
984> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt)
985
986The output of this example is:
987
988```
989Launched 10 coroutines
990Coroutine 0 is done
991Coroutine 1 is done
992Coroutine 2 is done
993Cancelling job!
994```
995
996As you can see, only the first three coroutines had printed a message and the others were cancelled
997by a single invocation of `job.cancel()`. So all we need to do in our hypothetical Android
998application is to create a parent job object when activity is created, use it for child coroutines,
999and cancel it when activity is destroyed.
1000
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001001## Channels
Roman Elizarov7deefb82017-01-31 10:33:17 +03001002
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001003Deferred values provide a convenient way to transfer a single value between coroutines.
1004Channels provide a way to transfer a stream of values.
1005
1006<!--- INCLUDE .*/example-channel-([0-9]+).kt
1007import kotlinx.coroutines.experimental.channels.*
1008-->
1009
1010### Channel basics
1011
Roman Elizarov419a6c82017-02-09 18:36:22 +03001012A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
1013instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
1014a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001015
1016```kotlin
1017fun main(args: Array<String>) = runBlocking<Unit> {
1018 val channel = Channel<Int>()
1019 launch(CommonPool) {
1020 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
1021 for (x in 1..5) channel.send(x * x)
1022 }
1023 // here we print five received integers:
1024 repeat(5) { println(channel.receive()) }
1025 println("Done!")
1026}
1027```
1028
1029> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
1030
1031### Closing and iteration over channels
1032
1033Unlike a queue, a channel can be closed to indicate that no more elements are coming.
1034On the receiver side it is convenient to use a regular `for` loop to receive elements
1035from the channel.
1036
Roman Elizarov419a6c82017-02-09 18:36:22 +03001037Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001038The iteration stops as soon as this close token is received, so there is a guarantee
1039that all previously sent elements before the close are received:
1040
1041```kotlin
1042fun main(args: Array<String>) = runBlocking<Unit> {
1043 val channel = Channel<Int>()
1044 launch(CommonPool) {
1045 for (x in 1..5) channel.send(x * x)
1046 channel.close() // we're done sending
1047 }
1048 // here we print received values using `for` loop (until the channel is closed)
1049 for (y in channel) println(y)
1050 println("Done!")
1051}
1052```
1053
1054> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
1055
1056### Building channel producers
1057
Roman Elizarova5e653f2017-02-13 13:49:55 +03001058The pattern where a coroutine is producing a sequence of elements is quite common.
1059This is a part of _producer-consumer_ pattern that is often found in concurrent code.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001060You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
Roman Elizarova5e653f2017-02-13 13:49:55 +03001061to common sense that results must be returned from functions.
1062
1063There is a convenience coroutine builder named [produce] that makes it easy to do it right:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001064
1065```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001066fun produceSquares() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001067 for (x in 1..5) send(x * x)
1068}
1069
1070fun main(args: Array<String>) = runBlocking<Unit> {
1071 val squares = produceSquares()
1072 for (y in squares) println(y)
1073 println("Done!")
1074}
1075```
1076
1077> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
1078
1079### Pipelines
1080
1081Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
1082
1083```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001084fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001085 var x = 1
1086 while (true) send(x++) // infinite stream of integers starting from 1
1087}
1088```
1089
Roman Elizarova5e653f2017-02-13 13:49:55 +03001090And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001091In the below example the numbers are just squared:
1092
1093```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001094fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001095 for (x in numbers) send(x * x)
1096}
1097```
1098
Roman Elizarova5e653f2017-02-13 13:49:55 +03001099The main code starts and connects the whole pipeline:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001100
1101```kotlin
1102fun main(args: Array<String>) = runBlocking<Unit> {
1103 val numbers = produceNumbers() // produces integers from 1 and on
1104 val squares = square(numbers) // squares integers
1105 for (i in 1..5) println(squares.receive()) // print first five
1106 println("Done!") // we are done
1107 squares.cancel() // need to cancel these coroutines in a larger app
1108 numbers.cancel()
1109}
1110```
1111
1112> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
1113
1114We don't have to cancel these coroutines in this example app, because
1115[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
1116but in a larger app we'll need to stop our pipeline if we don't need it anymore.
1117Alternatively, we could have run pipeline coroutines as
1118[children of a coroutine](#children-of-a-coroutine).
1119
1120### Prime numbers with pipeline
1121
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001122Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001123of coroutines. We start with an infinite sequence of numbers. This time we introduce an
1124explicit context parameter, so that caller can control where our coroutines run:
1125
1126<!--- INCLUDE kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
1127import kotlin.coroutines.experimental.CoroutineContext
1128-->
1129
1130```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001131fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001132 var x = start
1133 while (true) send(x++) // infinite stream of integers from start
1134}
1135```
1136
1137The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1138that are divisible by the given prime number:
1139
1140```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001141fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001142 for (x in numbers) if (x % prime != 0) send(x)
1143}
1144```
1145
1146Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001147and launching new pipeline stage for each prime number found:
1148
1149```
Roman Elizarova5e653f2017-02-13 13:49:55 +03001150numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
Roman Elizarov62500ba2017-02-09 18:55:40 +03001151```
1152
1153The following example prints the first ten prime numbers,
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001154running the whole pipeline in the context of the main thread:
1155
1156```kotlin
1157fun main(args: Array<String>) = runBlocking<Unit> {
1158 var cur = numbersFrom(context, 2)
1159 for (i in 1..10) {
1160 val prime = cur.receive()
1161 println(prime)
1162 cur = filter(context, cur, prime)
1163 }
1164}
1165```
1166
1167> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
1168
1169The output of this code is:
1170
1171```
11722
11733
11745
11757
117611
117713
117817
117919
118023
118129
1182```
1183
Roman Elizarova5e653f2017-02-13 13:49:55 +03001184Note, that you can build the same pipeline using `buildIterator` coroutine builder from the standard library.
1185Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001186`ReceiveChannel` with `Iterator`, and get rid of the context. You will not need `runBlocking` either.
1187However, the benefit of a pipeline that uses channels as shown above is that it can actually use
1188multiple CPU cores if you run it in [CommonPool] context.
1189
Roman Elizarova5e653f2017-02-13 13:49:55 +03001190Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
Roman Elizarov62500ba2017-02-09 18:55:40 +03001191other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
1192built using `buildSeqeunce`/`buildIterator`, because they do not allow arbitrary suspension, unlike
Roman Elizarova5e653f2017-02-13 13:49:55 +03001193`produce` which is fully asynchronous.
Roman Elizarov62500ba2017-02-09 18:55:40 +03001194
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001195### Fan-out
1196
1197Multiple coroutines may receive from the same channel, distributing work between themselves.
1198Let us start with a producer coroutine that is periodically producing integers
1199(ten numbers per second):
1200
1201```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001202fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001203 var x = 1 // start from 1
1204 while (true) {
1205 send(x++) // produce next
1206 delay(100) // wait 0.1s
1207 }
1208}
1209```
1210
1211Then we can have several processor coroutines. In this example, they just print their id and
1212received number:
1213
1214```kotlin
1215fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
1216 while (true) {
1217 val x = channel.receive()
1218 println("Processor #$id received $x")
1219 }
1220}
1221```
1222
1223Now let us launch five processors and let them work for a second. See what happens:
1224
1225```kotlin
1226fun main(args: Array<String>) = runBlocking<Unit> {
1227 val producer = produceNumbers()
1228 repeat(5) { launchProcessor(it, producer) }
1229 delay(1000)
1230 producer.cancel() // cancel producer coroutine and thus kill them all
1231}
1232```
1233
1234> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
1235
1236The output will be similar to the the following one, albeit the processor ids that receive
1237each specific integer may be different:
1238
1239```
1240Processor #2 received 1
1241Processor #4 received 2
1242Processor #0 received 3
1243Processor #1 received 4
1244Processor #3 received 5
1245Processor #2 received 6
1246Processor #4 received 7
1247Processor #0 received 8
1248Processor #1 received 9
1249Processor #3 received 10
1250```
1251
1252Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1253over the channel that processor coroutines are doing.
1254
1255### Fan-in
1256
1257Multiple coroutines may send to the same channel.
1258For example, let us have a channel of strings, and a suspending function that
1259repeatedly sends a specified string to this channel with a specified delay:
1260
1261```kotlin
1262suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1263 while (true) {
1264 delay(time)
1265 channel.send(s)
1266 }
1267}
1268```
1269
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001270Now, let us see what happens if we launch a couple of coroutines sending strings
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001271(in this example we launch them in the context of the main thread):
1272
1273```kotlin
1274fun main(args: Array<String>) = runBlocking<Unit> {
1275 val channel = Channel<String>()
1276 launch(context) { sendString(channel, "foo", 200L) }
1277 launch(context) { sendString(channel, "BAR!", 500L) }
1278 repeat(6) { // receive first six
1279 println(channel.receive())
1280 }
1281}
1282```
1283
1284> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
1285
1286The output is:
1287
1288```
1289foo
1290foo
1291BAR!
1292foo
1293foo
1294BAR!
1295```
1296
1297### Buffered channels
1298
1299The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1300meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1301if receive is invoked first, it is suspended until send is invoked.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001302
Roman Elizarova5e653f2017-02-13 13:49:55 +03001303Both [Channel()][Channel.invoke] factory function and [produce] builder take an optional `capacity` parameter to
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001304specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1305similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1306
1307Take a look at the behavior of the following code:
1308
1309```kotlin
1310fun main(args: Array<String>) = runBlocking<Unit> {
1311 val channel = Channel<Int>(4) // create buffered channel
1312 launch(context) { // launch sender coroutine
1313 repeat(10) {
1314 println("Sending $it") // print before sending each element
1315 channel.send(it) // will suspend when buffer is full
1316 }
1317 }
1318 // don't receive anything... just wait....
1319 delay(1000)
1320}
1321```
1322
1323> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
1324
1325It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1326
1327```
1328Sending 0
1329Sending 1
1330Sending 2
1331Sending 3
1332Sending 4
1333```
1334
1335The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001336
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001337## Shared mutable state and concurrency
1338
1339Coroutines can be executed concurrently using a multi-threaded dispatcher like [CommonPool]. It presents
1340all the usual concurrency problems. The main problem being synchronization of access to **shared mutable state**.
1341Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
1342but others are unique.
1343
1344### The problem
1345
1346Let us launch 100k coroutines all doing the same action. We'll also measure their completion time for
1347further comparisons:
1348
1349<!--- INCLUDE .*/example-sync-([0-9]+).kt
1350import kotlin.system.measureTimeMillis
1351-->
1352
1353<!--- INCLUDE .*/example-sync-02.kt
1354import java.util.concurrent.atomic.AtomicInteger
1355-->
1356
1357<!--- INCLUDE .*/example-sync-04.kt
1358import kotlinx.coroutines.experimental.sync.Mutex
1359-->
1360
1361<!--- INCLUDE .*/example-sync-05.kt
1362import kotlinx.coroutines.experimental.channels.*
1363-->
1364
1365```kotlin
1366suspend fun massiveRun(action: suspend () -> Unit) {
1367 val n = 100_000
1368 val time = measureTimeMillis {
1369 val jobs = List(n) {
1370 launch(CommonPool) {
1371 action()
1372 }
1373 }
1374 jobs.forEach { it.join() }
1375 }
1376 println("Completed in $time ms")
1377}
1378```
1379
1380<!--- INCLUDE .*/example-sync-([0-9]+).kt -->
1381
1382We start with a very simple action, that increments a shared mutable variable.
1383
1384```kotlin
1385var counter = 0
1386
1387fun main(args: Array<String>) = runBlocking<Unit> {
1388 massiveRun {
1389 counter++
1390 }
1391 println("Counter = $counter")
1392}
1393```
1394
1395> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt)
1396
1397What does it print at the end? It is highly unlikely to ever print "100000", because all the
1398100k coroutines increment the `counter` concurrently without any synchronization.
1399
1400### Thread-safe data structures
1401
1402The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
1403linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
1404operations that needs to be performed on a shared state.
1405In the case of a simple counter we can use `AtomicInteger` class:
1406
1407```kotlin
1408var counter = AtomicInteger()
1409
1410fun main(args: Array<String>) = runBlocking<Unit> {
1411 massiveRun {
1412 counter.incrementAndGet()
1413 }
1414 println("Counter = ${counter.get()}")
1415}
1416```
1417
1418> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt)
1419
1420This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
1421standard data structures and basic operations on them. However, it does not easily scale to complex
1422state or to complex operations that do not have ready-to-use thread-safe implementations.
1423
1424### Thread confinement
1425
1426Thread confinement is an approach to the problem of shared mutable state where all access to the particular shared
1427state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
1428the single event-dispatch/application thread. It is easy to apply with coroutines by using a
1429single-threaded context:
1430
1431```kotlin
1432val counterContext = newSingleThreadContext("CounterContext")
1433var counter = 0
1434
1435fun main(args: Array<String>) = runBlocking<Unit> {
1436 massiveRun {
1437 run(counterContext) {
1438 counter++
1439 }
1440 }
1441 println("Counter = $counter")
1442}
1443```
1444
1445> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt)
1446
1447### Mutual exclusion
1448
1449Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
1450that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
1451Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
1452delimit a critical section. The key difference is that `Mutex.lock` is a suspending function. It does not block a thread.
1453
1454```kotlin
1455val mutex = Mutex()
1456var counter = 0
1457
1458fun main(args: Array<String>) = runBlocking<Unit> {
1459 massiveRun {
1460 mutex.lock()
1461 try { counter++ }
1462 finally { mutex.unlock() }
1463 }
1464 println("Counter = $counter")
1465}
1466```
1467
1468> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt)
1469
1470### Actors
1471
1472An actor is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine,
1473and a channel to communicate with other coroutines. A simple actor can be written as a function,
1474but an actor with a complex state is better suited for a class.
1475
1476```kotlin
1477// Message types for counterActor
1478sealed class CounterMsg
1479object IncCounter : CounterMsg() // one-way message to increment counter
1480class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply
1481
1482// This function launches a new counter actor
1483fun counterActor(request: ReceiveChannel<CounterMsg>) = launch(CommonPool) {
1484 var counter = 0 // actor state
1485 while (true) { // main loop of the actor
1486 val msg = request.receive()
1487 when (msg) {
1488 is IncCounter -> counter++
1489 is GetCounter -> msg.response.send(counter)
1490 }
1491 }
1492}
1493
1494fun main(args: Array<String>) = runBlocking<Unit> {
1495 val request = Channel<CounterMsg>()
1496 counterActor(request)
1497 massiveRun {
1498 request.send(IncCounter)
1499 }
1500 val response = Channel<Int>()
1501 request.send(GetCounter(response))
1502 println("Counter = ${response.receive()}")
1503}
1504```
1505
1506> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt)
1507
1508Notice, that it does not matter (for correctness) what context the actor itself is executed in. An actor is
1509a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
1510works as a solution to the problem of shared mutable state.
1511
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001512## Select expression
1513
1514Select expression makes it possible to await multiple suspending function simultaneously and _select_
1515the first one that becomes available.
1516
1517<!--- INCLUDE .*/example-select-([0-9]+).kt
1518import kotlinx.coroutines.experimental.channels.*
1519import kotlinx.coroutines.experimental.selects.*
1520-->
1521
1522### Selecting from channels
1523
1524Let us have two channels of strings `fizz` and `buzz`. The `fizz` channel produces "Fizz" string every 300 ms:
1525
1526```kotlin
1527val fizz = produce<String>(CommonPool) { // produce using common thread pool
1528 while (true) {
1529 delay(300)
1530 send("Fizz")
1531 }
1532}
1533```
1534
1535And the `buzz` channel produces "Buzz!" string every 500 ms:
1536
1537```kotlin
1538val buzz = produce<String>(CommonPool) {
1539 while (true) {
1540 delay(500)
1541 send("Buzz!")
1542 }
1543}
1544```
1545
1546Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the
1547other. But [select] expression allows us to receive from _both_ simultaneously using its
1548[onReceive][SelectBuilder.onReceive] clauses:
1549
1550```kotlin
1551suspend fun selectFizzBuzz() {
1552 select<Unit> { // <Unit> means that this select expression does not produce any result
1553 fizz.onReceive { value -> // this is the first select clause
1554 println("fizz -> '$value'")
1555 }
1556 buzz.onReceive { value -> // this is the second select clause
1557 println("buzz -> '$value'")
1558 }
1559 }
1560}
1561```
1562
1563Let us run it for 7 times:
1564
1565```kotlin
1566fun main(args: Array<String>) = runBlocking<Unit> {
1567 repeat(7) {
1568 selectFizzBuzz()
1569 }
1570}
1571```
1572
1573> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt)
1574
1575The result of this code is:
1576
1577```
1578fizz -> 'Fizz'
1579buzz -> 'Buzz!'
1580fizz -> 'Fizz'
1581fizz -> 'Fizz'
1582buzz -> 'Buzz!'
1583fizz -> 'Fizz'
1584buzz -> 'Buzz!'
1585```
1586
1587### Selecting on close
1588
1589The [onReceive][SelectBuilder.onReceive] clause in `select` fails when the channel is closed and the corresponding
1590`select` throws an exception. We can use [onReceiveOrNull][SelectBuilder.onReceiveOrNull] clause to perform a
1591specific action when channel is closed. This example also show that `select` is an expression that returns
1592the result of its selected clause:
1593
1594```kotlin
1595suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
1596 select<String> {
1597 a.onReceiveOrNull { value ->
1598 if (value == null)
1599 "Channel 'a' is closed"
1600 else
1601 "a -> '$value'"
1602 }
1603 b.onReceiveOrNull { value ->
1604 if (value == null)
1605 "Channel 'b' is closed"
1606 else
1607 "b -> '$value'"
1608 }
1609 }
1610```
1611
1612Lets have channel `a` that produces "Hello" string 4 and `b` that produces "World" 4 times for this example:
1613
1614```kotlin
1615fun main(args: Array<String>) = runBlocking<Unit> {
1616 // we are using the context of the main thread in this example for predictability ...
1617 val a = produce<String>(context) {
1618 repeat(4) {
1619 send("Hello $it")
1620 }
1621 }
1622 val b = produce<String>(context) {
1623 repeat(4) {
1624 send("World $it")
1625 }
1626 }
1627 repeat(8) { // print first eight results
1628 println(selectAorB(a, b))
1629 }
1630}
1631```
1632
1633> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt)
1634
1635The result of this code is quite interesting, so we'll analyze it in mode details:
1636
1637```
1638a -> 'Hello 0'
1639a -> 'Hello 1'
1640b -> 'World 0'
1641a -> 'Hello 2'
1642a -> 'Hello 3'
1643b -> 'World 1'
1644Channel 'a' is closed
1645Channel 'a' is closed
1646```
1647
1648There are a couple of observations to make out of it.
1649
1650First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time,
1651the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel,
1652being the first clause in select wins. However, because we are using unbuffered channel, the `a` gets suspended from
1653time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
1654
1655The second observation, is that [onReceiveOrNull][SelectBuilder.onReceiveOrNull] gets immediately selected when the
1656channel is already closed.
1657
1658### Selecting to send
1659
1660Select expression has [onSend][SelectBuilder.onSend] clause that can be used for a great good in combination
1661with a biased nature of selection.
1662
1663Let us write an example of producer of integer numbers that sends its values to a `side` channel when
1664the consumers on its primary channel cannot keep up with it:
1665
1666```kotlin
1667fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
1668 for (num in 1..10) { // produce 10 numbers from 1 to 10
1669 delay(100) // every 100 ms
1670 select<Unit> {
1671 onSend(num) { } // Send to the primary channel
1672 side.onSend(num) { } // or to the side channel
1673 }
1674 }
1675}
1676```
1677
1678Consumer is going to be quite slow, taking 250 ms to process each number:
1679
1680```kotlin
1681fun main(args: Array<String>) = runBlocking<Unit> {
1682 val side = Channel<Int>() // allocate side channel
1683 launch(context) { // this is a very fast consumer for the side channel
1684 for (num in side) println("Side channel has $num")
1685 }
1686 for (num in produceNumbers(side)) {
1687 println("Consuming $num")
1688 delay(250) // let us digest the consumed number properly, do not hurry
1689 }
1690 println("Done consuming")
1691}
1692```
1693
1694> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt)
1695
1696So let us see what happens:
1697
1698```
1699Consuming 1
1700Side channel has 2
1701Side channel has 3
1702Consuming 4
1703Side channel has 5
1704Side channel has 6
1705Consuming 7
1706Side channel has 8
1707Side channel has 9
1708Consuming 10
1709Done consuming
1710```
1711
1712### Selecting deferred values
1713
1714Deferred values can be selected using [onAwait][SelectBuilder.onAwait] clause, which enables "wait first"
1715type of logic. Let us start with an async-style function that returns a deferred string value after
1716a random delay:
1717
1718<!--- INCLUDE .*/example-select-04.kt
1719import java.util.*
1720-->
1721
1722```kotlin
1723fun asyncString(time: Int) = async(CommonPool) {
1724 delay(time.toLong())
1725 "Waited for $time ms"
1726}
1727```
1728
1729Let us start a dozen for them with random delay with the following function that returns a
1730collection of deferred values:
1731
1732```kotlin
1733fun asyncStringsList(): List<Deferred<String>> {
1734 val random = Random(3)
1735 return (1..12).map { asyncString(random.nextInt(1000)) }
1736}
1737```
1738
1739Now the main function awaits for the first of them to complete and count the number of deferred values
1740that are still active. Note, that we've used here the fact that `select` expression is a Kotlin DSL,
1741and we can provide clauses for it using an arbitrary code. In this case we iterate over a list
1742of deferred values to produce an `onAwait` clause for each one of them.
1743
1744```kotlin
1745fun main(args: Array<String>) = runBlocking<Unit> {
1746 val list = asyncStringsList()
1747 val result = select<String> {
1748 list.withIndex().forEach { (index, deferred) ->
1749 deferred.onAwait { answer ->
1750 "Deferred $index produced answer '$answer'"
1751 }
1752 }
1753 }
1754 println(result)
1755 val countActive = list.sumBy { deferred -> if (deferred.isActive) 1 else 0 }
1756 println("$countActive coroutines are still active")
1757}
1758```
1759
1760> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-04.kt)
1761
1762The output is:
1763
1764```
1765Deferred 4 produced answer 'Waited for 254 ms'
176611 coroutines are still active
1767```
1768
1769### Switch over a channel of deferred values
1770
1771Let us write a channel producer function that consumes a channel of deferred string values, await for each received
1772deferred value, but only until next deferred value comes over or the channel is closed. This example puts together
1773[onReceiveOrNull][SelectBuilder.onReceiveOrNull] and [onAwait][SelectBuilder.onAwait] clauses in the same `select`:
1774
1775```kotlin
1776fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String>(CommonPool) {
1777 var current = input.receive() // will start with first received deferred value
1778 while (isActive) { // loop while not cancelled/closed
1779 val next = select<Deferred<String>?> { // return next deferred value from this select or null
1780 input.onReceiveOrNull { update ->
1781 update // replaces next value to wait
1782 }
1783 current.onAwait { value ->
1784 send(value) // send value that current deferred has produced
1785 input.receiveOrNull() // and use the next deferred from the input channel
1786 }
1787 }
1788 if (next == null) {
1789 println("Channel was closed")
1790 break // out of loop
1791 } else {
1792 current = next
1793 }
1794 }
1795}
1796```
1797
1798To test it, we'll use a simple async function that resolves to a specified string after a specified time:
1799
1800```kotlin
1801fun asyncString(str: String, time: Long) = async(CommonPool) {
1802 delay(time)
1803 str
1804}
1805```
1806
1807The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
1808data to it:
1809
1810```kotlin
1811fun main(args: Array<String>) = runBlocking<Unit> {
1812 val chan = Channel<Deferred<String>>() // the channel for test
1813 launch(context) { // launch printing coroutines
1814 for (s in switchMapDeferreds(chan))
1815 println(s) // print each received string
1816 }
1817 chan.send(asyncString("BEGIN", 100))
1818 delay(200) // enough time for "BEGIN" to be produced
1819 chan.send(asyncString("Slow", 500))
1820 delay(100) // not enough time for slow
1821 chan.send(asyncString("Replace", 100))
1822 delay(500) // will give it time before the last one
1823 chan.send(asyncString("END", 500))
1824 delay(1000) // give it time to process
1825 chan.close() // and close the channel immediately
1826 delay(500) // and wait some time to let it finish
1827}
1828```
1829
1830> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-05.kt)
1831
1832The result of this code:
1833
1834```
1835BEGIN
1836Replace
1837END
1838Channel was closed
1839```
1840
1841
Roman Elizarove0c817d2017-02-10 10:22:01 +03001842<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core -->
1843<!--- DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
1844<!--- INDEX kotlinx.coroutines.experimental -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001845[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
1846[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/delay.html
1847[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
1848[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
1849[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-cancellation-exception.html
1850[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
1851[CoroutineScope.isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/is-active.html
1852[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
1853[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
1854[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
1855[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
1856[async]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/async.html
1857[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
1858[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/await.html
1859[Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/start.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03001860[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
1861[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001862[CoroutineScope.context]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/context.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03001863[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03001864[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001865[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
1866[Job.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/invoke.html
1867[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/cancel.html
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001868<!--- INDEX kotlinx.coroutines.experimental.sync -->
1869[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
1870[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/lock.html
1871[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/unlock.html
Roman Elizarove0c817d2017-02-10 10:22:01 +03001872<!--- INDEX kotlinx.coroutines.experimental.channels -->
Roman Elizarov419a6c82017-02-09 18:36:22 +03001873[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001874[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/send.html
1875[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/receive.html
1876[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/close.html
Roman Elizarova5e653f2017-02-13 13:49:55 +03001877[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001878[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/invoke.html
1879<!--- INDEX kotlinx.coroutines.experimental.selects -->
1880[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
1881[SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-receive.html
1882[SelectBuilder.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-receive-or-null.html
1883[SelectBuilder.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-send.html
1884[SelectBuilder.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-await.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03001885<!--- END -->