blob: 7886bee622374cb9ce21444e907cfd9d2197827c [file] [log] [blame] [view]
Roman Elizarova5e653f2017-02-13 13:49:55 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9]+)\.kt
2/*
3 * Copyright 2016-2017 JetBrains s.r.o.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
Roman Elizarovf16fd272017-02-07 11:26:00 +030017
Roman Elizarova5e653f2017-02-13 13:49:55 +030018// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
19package guide.$$1.example$$2
Roman Elizarovf16fd272017-02-07 11:26:00 +030020
Roman Elizarova5e653f2017-02-13 13:49:55 +030021import kotlinx.coroutines.experimental.*
Roman Elizarovf16fd272017-02-07 11:26:00 +030022-->
Roman Elizarova5e653f2017-02-13 13:49:55 +030023<!--- KNIT kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
Roman Elizarovf16fd272017-02-07 11:26:00 +030024
Roman Elizarov7deefb82017-01-31 10:33:17 +030025# Guide to kotlinx.coroutines by example
26
27This is a short guide on core features of `kotlinx.coroutines` with a series of examples.
28
Roman Elizarov1293ccd2017-02-01 18:49:54 +030029## Table of contents
30
Roman Elizarovfa7723e2017-02-06 11:17:51 +030031<!--- TOC -->
32
Roman Elizarov1293ccd2017-02-01 18:49:54 +030033* [Coroutine basics](#coroutine-basics)
34 * [Your first coroutine](#your-first-coroutine)
35 * [Bridging blocking and non-blocking worlds](#bridging-blocking-and-non-blocking-worlds)
36 * [Waiting for a job](#waiting-for-a-job)
37 * [Extract function refactoring](#extract-function-refactoring)
38 * [Coroutines ARE light-weight](#coroutines-are-light-weight)
39 * [Coroutines are like daemon threads](#coroutines-are-like-daemon-threads)
40* [Cancellation and timeouts](#cancellation-and-timeouts)
41 * [Cancelling coroutine execution](#cancelling-coroutine-execution)
42 * [Cancellation is cooperative](#cancellation-is-cooperative)
43 * [Making computation code cancellable](#making-computation-code-cancellable)
44 * [Closing resources with finally](#closing-resources-with-finally)
45 * [Run non-cancellable block](#run-non-cancellable-block)
46 * [Timeout](#timeout)
47* [Composing suspending functions](#composing-suspending-functions)
48 * [Sequential by default](#sequential-by-default)
Roman Elizarov32d95322017-02-09 15:57:31 +030049 * [Concurrent using async](#concurrent-using-async)
50 * [Lazily started async](#lazily-started-async)
51 * [Async-style functions](#async-style-functions)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030052* [Coroutine context and dispatchers](#coroutine-context-and-dispatchers)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030053 * [Dispatchers and threads](#dispatchers-and-threads)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030054 * [Unconfined vs confined dispatcher](#unconfined-vs-confined-dispatcher)
55 * [Debugging coroutines and threads](#debugging-coroutines-and-threads)
56 * [Jumping between threads](#jumping-between-threads)
57 * [Job in the context](#job-in-the-context)
58 * [Children of a coroutine](#children-of-a-coroutine)
59 * [Combining contexts](#combining-contexts)
60 * [Naming coroutines for debugging](#naming-coroutines-for-debugging)
Roman Elizarov2fd7cb32017-02-11 23:18:59 +030061 * [Cancellation via explicit job](#cancellation-via-explicit-job)
Roman Elizarovb7721cf2017-02-03 19:23:08 +030062* [Channels](#channels)
63 * [Channel basics](#channel-basics)
64 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
65 * [Building channel producers](#building-channel-producers)
66 * [Pipelines](#pipelines)
67 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
68 * [Fan-out](#fan-out)
69 * [Fan-in](#fan-in)
70 * [Buffered channels](#buffered-channels)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +030071* [Select expression](#select-expression)
72 * [Selecting from channels](#selecting-from-channels)
73 * [Selecting on close](#selecting-on-close)
74 * [Selecting to send](#selecting-to-send)
75 * [Selecting deferred values](#selecting-deferred-values)
76 * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030077
Roman Elizarova5e653f2017-02-13 13:49:55 +030078<!--- END_TOC -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +030079
80## Coroutine basics
81
82This section covers basic coroutine concepts.
83
84### Your first coroutine
Roman Elizarov7deefb82017-01-31 10:33:17 +030085
86Run the following code:
87
88```kotlin
89fun main(args: Array<String>) {
90 launch(CommonPool) { // create new coroutine in common thread pool
91 delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
92 println("World!") // print after delay
93 }
94 println("Hello,") // main function continues while coroutine is delayed
95 Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
96}
97```
98
Roman Elizarovfa7723e2017-02-06 11:17:51 +030099> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300100
101Run this code:
102
103```
104Hello,
105World!
106```
107
Roman Elizarov419a6c82017-02-09 18:36:22 +0300108Essentially, coroutines are light-weight threads.
109They are launched with [launch] _coroutine builder_.
110You can achieve the same result replacing
Roman Elizarov7deefb82017-01-31 10:33:17 +0300111`launch(CommonPool) { ... }` with `thread { ... }` and `delay(...)` with `Thread.sleep(...)`. Try it.
112
113If you start by replacing `launch(CommonPool)` by `thread`, the compiler produces the following error:
114
115```
116Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
117```
118
Roman Elizarov419a6c82017-02-09 18:36:22 +0300119That is because [delay] is a special _suspending function_ that does not block a thread, but _suspends_
Roman Elizarov7deefb82017-01-31 10:33:17 +0300120coroutine and it can be only used from a coroutine.
121
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300122### Bridging blocking and non-blocking worlds
Roman Elizarov7deefb82017-01-31 10:33:17 +0300123
124The first example mixes _non-blocking_ `delay(...)` and _blocking_ `Thread.sleep(...)` in the same
125code of `main` function. It is easy to get lost. Let's cleanly separate blocking and non-blocking
Roman Elizarov419a6c82017-02-09 18:36:22 +0300126worlds by using [runBlocking]:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300127
128```kotlin
129fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
130 launch(CommonPool) { // create new coroutine in common thread pool
131 delay(1000L)
132 println("World!")
133 }
134 println("Hello,") // main coroutine continues while child is delayed
135 delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
136}
137```
138
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300139> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300140
Roman Elizarov419a6c82017-02-09 18:36:22 +0300141The result is the same, but this code uses only non-blocking [delay].
Roman Elizarov7deefb82017-01-31 10:33:17 +0300142
143`runBlocking { ... }` works as an adaptor that is used here to start the top-level main coroutine.
144The regular code outside of `runBlocking` _blocks_, until the coroutine inside `runBlocking` is active.
145
146This is also a way to write unit-tests for suspending functions:
147
148```kotlin
149class MyTest {
150 @Test
151 fun testMySuspendingFunction() = runBlocking<Unit> {
152 // here we can use suspending functions using any assertion style that we like
153 }
154}
155```
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300156
157<!--- CLEAR -->
Roman Elizarov7deefb82017-01-31 10:33:17 +0300158
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300159### Waiting for a job
Roman Elizarov7deefb82017-01-31 10:33:17 +0300160
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300161Delaying for a time while another coroutine is working is not a good approach. Let's explicitly
Roman Elizarov419a6c82017-02-09 18:36:22 +0300162wait (in a non-blocking way) until the background [Job] that we have launched is complete:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300163
164```kotlin
165fun main(args: Array<String>) = runBlocking<Unit> {
166 val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
167 delay(1000L)
168 println("World!")
169 }
170 println("Hello,")
171 job.join() // wait until child coroutine completes
172}
173```
174
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300175> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300176
177Now the result is still the same, but the code of the main coroutine is not tied to the duration of
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300178the background job in any way. Much better.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300179
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300180### Extract function refactoring
Roman Elizarov7deefb82017-01-31 10:33:17 +0300181
182Let's extract the block of code inside `launch(CommonPool} { ... }` into a separate function. When you
183perform "Extract function" refactoring on this code you get a new function with `suspend` modifier.
184That is your first _suspending function_. Suspending functions can be used inside coroutines
185just like regular functions, but their additional feature is that they can, in turn,
186use other suspending functions, like `delay` in this example, to _suspend_ execution of a coroutine.
187
188```kotlin
189fun main(args: Array<String>) = runBlocking<Unit> {
190 val job = launch(CommonPool) { doWorld() }
191 println("Hello,")
192 job.join()
193}
194
195// this is your first suspending function
196suspend fun doWorld() {
197 delay(1000L)
198 println("World!")
199}
200```
201
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300202> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-04.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300203
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300204### Coroutines ARE light-weight
Roman Elizarov7deefb82017-01-31 10:33:17 +0300205
206Run the following code:
207
208```kotlin
209fun main(args: Array<String>) = runBlocking<Unit> {
210 val jobs = List(100_000) { // create a lot of coroutines and list their jobs
211 launch(CommonPool) {
212 delay(1000L)
213 print(".")
214 }
215 }
216 jobs.forEach { it.join() } // wait for all jobs to complete
217}
218```
219
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300220> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-05.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300221
222It starts 100K coroutines and, after a second, each coroutine prints a dot.
223Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
224
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300225### Coroutines are like daemon threads
Roman Elizarov7deefb82017-01-31 10:33:17 +0300226
227The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300228returns from the main function after some delay:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300229
230```kotlin
231fun main(args: Array<String>) = runBlocking<Unit> {
232 launch(CommonPool) {
233 repeat(1000) { i ->
234 println("I'm sleeping $i ...")
235 delay(500L)
236 }
237 }
238 delay(1300L) // just quit after delay
239}
240```
241
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300242> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-06.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300243
244You can run and see that it prints three lines and terminates:
245
246```
247I'm sleeping 0 ...
248I'm sleeping 1 ...
249I'm sleeping 2 ...
250```
251
252Active coroutines do not keep the process alive. They are like daemon threads.
253
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300254## Cancellation and timeouts
255
256This section covers coroutine cancellation and timeouts.
257
258### Cancelling coroutine execution
Roman Elizarov7deefb82017-01-31 10:33:17 +0300259
260In small application the return from "main" method might sound like a good idea to get all coroutines
261implicitly terminated. In a larger, long-running application, you need finer-grained control.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300262The [launch] function returns a [Job] that can be used to cancel running coroutine:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300263
264```kotlin
265fun main(args: Array<String>) = runBlocking<Unit> {
266 val job = launch(CommonPool) {
267 repeat(1000) { i ->
268 println("I'm sleeping $i ...")
269 delay(500L)
270 }
271 }
272 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300273 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300274 job.cancel() // cancels the job
275 delay(1300L) // delay a bit to ensure it was cancelled indeed
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300276 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300277}
278```
279
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300280> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300281
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300282It produces the following output:
283
284```
285I'm sleeping 0 ...
286I'm sleeping 1 ...
287I'm sleeping 2 ...
288main: I'm tired of waiting!
289main: Now I can quit.
290```
291
292As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
293
294### Cancellation is cooperative
Roman Elizarov7deefb82017-01-31 10:33:17 +0300295
Tair Rzayevaf734622017-02-01 22:30:16 +0200296Coroutine cancellation is _cooperative_. A coroutine code has to cooperate to be cancellable.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300297All the suspending functions in `kotlinx.coroutines` are _cancellable_. They check for cancellation of
Roman Elizarov419a6c82017-02-09 18:36:22 +0300298coroutine and throw [CancellationException] when cancelled. However, if a coroutine is working in
Roman Elizarov7deefb82017-01-31 10:33:17 +0300299a computation and does not check for cancellation, then it cannot be cancelled, like the following
300example shows:
301
302```kotlin
303fun main(args: Array<String>) = runBlocking<Unit> {
304 val job = launch(CommonPool) {
305 var nextPrintTime = 0L
306 var i = 0
307 while (true) { // computation loop
308 val currentTime = System.currentTimeMillis()
309 if (currentTime >= nextPrintTime) {
310 println("I'm sleeping ${i++} ...")
311 nextPrintTime = currentTime + 500L
312 }
313 }
314 }
315 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300316 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300317 job.cancel() // cancels the job
318 delay(1300L) // delay a bit to see if it was cancelled....
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300319 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300320}
321```
322
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300323> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300324
325Run it to see that it continues to print "I'm sleeping" even after cancellation.
326
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300327### Making computation code cancellable
Roman Elizarov7deefb82017-01-31 10:33:17 +0300328
329There are two approaches to making computation code cancellable. The first one is to periodically
Roman Elizarov419a6c82017-02-09 18:36:22 +0300330invoke a suspending function. There is a [yield] function that is a good choice for that purpose.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300331The other one is to explicitly check the cancellation status. Let us try the later approach.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300332
333Replace `while (true)` in the previous example with `while (isActive)` and rerun it.
334
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300335```kotlin
336fun main(args: Array<String>) = runBlocking<Unit> {
337 val job = launch(CommonPool) {
338 var nextPrintTime = 0L
339 var i = 0
340 while (isActive) { // cancellable computation loop
341 val currentTime = System.currentTimeMillis()
342 if (currentTime >= nextPrintTime) {
343 println("I'm sleeping ${i++} ...")
344 nextPrintTime = currentTime + 500L
345 }
346 }
347 }
348 delay(1300L) // delay a bit
349 println("main: I'm tired of waiting!")
350 job.cancel() // cancels the job
351 delay(1300L) // delay a bit to see if it was cancelled....
352 println("main: Now I can quit.")
353}
354```
355
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300356> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300357
Roman Elizarov419a6c82017-02-09 18:36:22 +0300358As you can see, now this loop can be cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
359the code of coroutines via [CoroutineScope] object.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300360
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300361### Closing resources with finally
362
Roman Elizarov419a6c82017-02-09 18:36:22 +0300363Cancellable suspending functions throw [CancellationException] on cancellation which can be handled in
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300364all the usual way. For example, the `try {...} finally {...}` and Kotlin `use` function execute their
365finalization actions normally when coroutine is cancelled:
366
367```kotlin
368fun main(args: Array<String>) = runBlocking<Unit> {
369 val job = launch(CommonPool) {
370 try {
371 repeat(1000) { i ->
372 println("I'm sleeping $i ...")
373 delay(500L)
374 }
375 } finally {
376 println("I'm running finally")
377 }
378 }
379 delay(1300L) // delay a bit
380 println("main: I'm tired of waiting!")
381 job.cancel() // cancels the job
382 delay(1300L) // delay a bit to ensure it was cancelled indeed
383 println("main: Now I can quit.")
384}
385```
386
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300387> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300388
389The example above produces the following output:
390
391```
392I'm sleeping 0 ...
393I'm sleeping 1 ...
394I'm sleeping 2 ...
395main: I'm tired of waiting!
396I'm running finally
397main: Now I can quit.
398```
399
400### Run non-cancellable block
401
402Any attempt to use a suspending function in the `finally` block of the previous example will cause
Roman Elizarov419a6c82017-02-09 18:36:22 +0300403[CancellationException], because the coroutine running this code is cancelled. Usually, this is not a
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300404problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
405communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
406rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
Roman Elizarov419a6c82017-02-09 18:36:22 +0300407`run(NonCancellable) {...}` using [run] function and [NonCancellable] context as the following example shows:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300408
409```kotlin
410fun main(args: Array<String>) = runBlocking<Unit> {
411 val job = launch(CommonPool) {
412 try {
413 repeat(1000) { i ->
414 println("I'm sleeping $i ...")
415 delay(500L)
416 }
417 } finally {
418 run(NonCancellable) {
419 println("I'm running finally")
420 delay(1000L)
421 println("And I've just delayed for 1 sec because I'm non-cancellable")
422 }
423 }
424 }
425 delay(1300L) // delay a bit
426 println("main: I'm tired of waiting!")
427 job.cancel() // cancels the job
428 delay(1300L) // delay a bit to ensure it was cancelled indeed
429 println("main: Now I can quit.")
430}
431```
432
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300433> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300434
435### Timeout
436
437The most obvious reason to cancel coroutine execution in practice,
438is because its execution time has exceeded some timeout.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300439While you can manually track the reference to the corresponding [Job] and launch a separate coroutine to cancel
440the tracked one after delay, there is a ready to use [withTimeout] function that does it.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300441Look at the following example:
442
443```kotlin
444fun main(args: Array<String>) = runBlocking<Unit> {
445 withTimeout(1300L) {
446 repeat(1000) { i ->
447 println("I'm sleeping $i ...")
448 delay(500L)
449 }
450 }
451}
452```
453
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300454> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-06.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300455
456It produces the following output:
457
458```
459I'm sleeping 0 ...
460I'm sleeping 1 ...
461I'm sleeping 2 ...
462Exception in thread "main" java.util.concurrent.CancellationException: Timed out waiting for 1300 MILLISECONDS
463```
464
Roman Elizarov419a6c82017-02-09 18:36:22 +0300465We have not seen the [CancellationException] stack trace printed on the console before. That is because
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300466inside a cancelled coroutine `CancellationException` is a considered a normal reason for coroutine completion.
467However, in this example we have used `withTimeout` right inside the `main` function.
468
469Because cancellation is just an exception, all the resources will be closed in a usual way.
470You can wrap the code with timeout in `try {...} catch (e: CancellationException) {...}` block if
471you need to do some additional action specifically on timeout.
472
473## Composing suspending functions
474
475This section covers various approaches to composition of suspending functions.
476
477### Sequential by default
478
479Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300480remote service call or computation. We just pretend they are useful, but actually each one just
481delays for a second for the purpose of this example:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300482
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300483<!--- INCLUDE .*/example-compose-([0-9]+).kt
484import kotlin.system.measureTimeMillis
485-->
486
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300487```kotlin
488suspend fun doSomethingUsefulOne(): Int {
489 delay(1000L) // pretend we are doing something useful here
490 return 13
491}
492
493suspend fun doSomethingUsefulTwo(): Int {
494 delay(1000L) // pretend we are doing something useful here, too
495 return 29
496}
497```
498
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300499<!--- INCLUDE .*/example-compose-([0-9]+).kt -->
500
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300501What do we do if need to invoke them _sequentially_ -- first `doSomethingUsefulOne` _and then_
502`doSomethingUsefulTwo` and compute the sum of their results?
503In practise we do this if we use the results of the first function to make a decision on whether we need
504to invoke the second one or to decide on how to invoke it.
505
506We just use a normal sequential invocation, because the code in the coroutine, just like in the regular
Roman Elizarov32d95322017-02-09 15:57:31 +0300507code, is _sequential_ by default. The following example demonstrates it by measuring the total
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300508time it takes to execute both suspending functions:
509
510```kotlin
511fun main(args: Array<String>) = runBlocking<Unit> {
512 val time = measureTimeMillis {
513 val one = doSomethingUsefulOne()
514 val two = doSomethingUsefulTwo()
515 println("The answer is ${one + two}")
516 }
517 println("Completed in $time ms")
518}
519```
520
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300521> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-01.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300522
523It produces something like this:
524
525```
526The answer is 42
527Completed in 2017 ms
528```
529
Roman Elizarov32d95322017-02-09 15:57:31 +0300530### Concurrent using async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300531
532What if there are no dependencies between invocation of `doSomethingUsefulOne` and `doSomethingUsefulTwo` and
Roman Elizarov419a6c82017-02-09 18:36:22 +0300533we want to get the answer faster, by doing both _concurrently_? This is where [async] comes to help.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300534
Roman Elizarov419a6c82017-02-09 18:36:22 +0300535Conceptually, [async] is just like [launch]. It starts a separate coroutine which is a light-weight thread
536that works concurrently with all the other coroutines. The difference is that `launch` returns a [Job] and
537does not carry any resulting value, while `async` returns a [Deferred] -- a light-weight non-blocking future
Roman Elizarov32d95322017-02-09 15:57:31 +0300538that represents a promise to provide a result later. You can use `.await()` on a deferred value to get its eventual result,
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300539but `Deferred` is also a `Job`, so you can cancel it if needed.
540
541```kotlin
542fun main(args: Array<String>) = runBlocking<Unit> {
543 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300544 val one = async(CommonPool) { doSomethingUsefulOne() }
545 val two = async(CommonPool) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300546 println("The answer is ${one.await() + two.await()}")
547 }
548 println("Completed in $time ms")
549}
550```
551
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300552> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-02.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300553
554It produces something like this:
555
556```
557The answer is 42
558Completed in 1017 ms
559```
560
561This is twice as fast, because we have concurrent execution of two coroutines.
562Note, that concurrency with coroutines is always explicit.
563
Roman Elizarov32d95322017-02-09 15:57:31 +0300564### Lazily started async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300565
Roman Elizarov419a6c82017-02-09 18:36:22 +0300566There is a laziness option to [async] with `start = false` parameter.
567It starts coroutine only when its result is needed by some
568[await][Deferred.await] or if a [start][Job.start] function
Roman Elizarov32d95322017-02-09 15:57:31 +0300569is invoked. Run the following example that differs from the previous one only by this option:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300570
571```kotlin
572fun main(args: Array<String>) = runBlocking<Unit> {
573 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300574 val one = async(CommonPool, start = false) { doSomethingUsefulOne() }
575 val two = async(CommonPool, start = false) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300576 println("The answer is ${one.await() + two.await()}")
577 }
578 println("Completed in $time ms")
579}
580```
581
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300582> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-03.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300583
584It produces something like this:
585
586```
587The answer is 42
588Completed in 2017 ms
589```
590
Roman Elizarov32d95322017-02-09 15:57:31 +0300591So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
592for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
593the standard `lazy` function in cases when computation of the value involves suspending functions.
594
595### Async-style functions
596
597We can define async-style functions that invoke `doSomethingUsefulOne` and `doSomethingUsefulTwo`
Roman Elizarov419a6c82017-02-09 18:36:22 +0300598_asynchronously_ using [async] coroutine builder. It is a good style to name such functions with
Roman Elizarov32d95322017-02-09 15:57:31 +0300599either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous
600computation and one needs to use the resulting deferred value to get the result.
601
602```kotlin
603// The result type of asyncSomethingUsefulOne is Deferred<Int>
604fun asyncSomethingUsefulOne() = async(CommonPool) {
605 doSomethingUsefulOne()
606}
607
608// The result type of asyncSomethingUsefulTwo is Deferred<Int>
609fun asyncSomethingUsefulTwo() = async(CommonPool) {
610 doSomethingUsefulTwo()
611}
612```
613
614Note, that these `asyncXXX` function are **not** _suspending_ functions. They can be used from anywhere.
615However, their use always implies asynchronous (here meaning _concurrent_) execution of their action
616with the invoking code.
617
618The following example shows their use outside of coroutine:
619
620```kotlin
621// note, that we don't have `runBlocking` to the right of `main` in this example
622fun main(args: Array<String>) {
623 val time = measureTimeMillis {
624 // we can initiate async actions outside of a coroutine
625 val one = asyncSomethingUsefulOne()
626 val two = asyncSomethingUsefulTwo()
627 // but waiting for a result must involve either suspending or blocking.
628 // here we use `runBlocking { ... }` to block the main thread while waiting for the result
629 runBlocking {
630 println("The answer is ${one.await() + two.await()}")
631 }
632 }
633 println("Completed in $time ms")
634}
635```
636
637> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300638
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300639## Coroutine context and dispatchers
640
Roman Elizarov32d95322017-02-09 15:57:31 +0300641We've already seen `launch(CommonPool) {...}`, `async(CommonPool) {...}`, `run(NonCancellable) {...}`, etc.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300642In these code snippets [CommonPool] and [NonCancellable] are _coroutine contexts_.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300643This section covers other available choices.
644
645### Dispatchers and threads
646
Roman Elizarov419a6c82017-02-09 18:36:22 +0300647Coroutine context includes a [_coroutine dispatcher_][CoroutineDispatcher] which determines what thread or threads
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300648the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
649to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:
650
651```kotlin
652fun main(args: Array<String>) = runBlocking<Unit> {
653 val jobs = arrayListOf<Job>()
654 jobs += launch(Unconfined) { // not confined -- will work with main thread
655 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
656 }
657 jobs += launch(context) { // context of the parent, runBlocking coroutine
658 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
659 }
660 jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
661 println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
662 }
663 jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
664 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
665 }
666 jobs.forEach { it.join() }
667}
668```
669
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300670> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-01.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300671
672It produces the following output (maybe in different order):
673
674```
675 'Unconfined': I'm working in thread main
676 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
677 'newSTC': I'm working in thread MyOwnThread
678 'context': I'm working in thread main
679```
680
Roman Elizarov419a6c82017-02-09 18:36:22 +0300681The difference between parent [context][CoroutineScope.context] and [Unconfined] context will be shown later.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300682
683### Unconfined vs confined dispatcher
684
Roman Elizarov419a6c82017-02-09 18:36:22 +0300685The [Unconfined] coroutine dispatcher starts coroutine in the caller thread, but only until the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300686first suspension point. After suspension it resumes in the thread that is fully determined by the
687suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not
688consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
689
Roman Elizarov419a6c82017-02-09 18:36:22 +0300690On the other side, [context][CoroutineScope.context] property that is available inside the block of any coroutine
691via [CoroutineScope] interface, is a reference to a context of this particular coroutine.
692This way, a parent context can be inherited. The default context of [runBlocking], in particular,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300693is confined to be invoker thread, so inheriting it has the effect of confining execution to
694this thread with a predictable FIFO scheduling.
695
696```kotlin
697fun main(args: Array<String>) = runBlocking<Unit> {
698 val jobs = arrayListOf<Job>()
699 jobs += launch(Unconfined) { // not confined -- will work with main thread
700 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
701 delay(1000)
702 println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
703 }
704 jobs += launch(context) { // context of the parent, runBlocking coroutine
705 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
706 delay(1000)
707 println(" 'context': After delay in thread ${Thread.currentThread().name}")
708 }
709 jobs.forEach { it.join() }
710}
711```
712
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300713> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-contest-02.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300714
715Produces the output:
716
717```
718 'Unconfined': I'm working in thread main
719 'context': I'm working in thread main
720 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
721 'context': After delay in thread main
722```
723
724So, the coroutine the had inherited `context` of `runBlocking {...}` continues to execute in the `main` thread,
Roman Elizarov419a6c82017-02-09 18:36:22 +0300725while the unconfined one had resumed in the scheduler thread that [delay] function is using.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300726
727### Debugging coroutines and threads
728
Roman Elizarov419a6c82017-02-09 18:36:22 +0300729Coroutines can suspend on one thread and resume on another thread with [Unconfined] dispatcher or
730with a multi-threaded dispatcher like [CommonPool]. Even with a single-threaded dispatcher it might be hard to
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300731figure out what coroutine was doing what, where, and when. The common approach to debugging applications with
732threads is to print the thread name in the log file on each log statement. This feature is universally supported
733by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
734`kotlinx.coroutines` includes debugging facilities to make it easier.
735
736Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
737
738```kotlin
739fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
740
741fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov32d95322017-02-09 15:57:31 +0300742 val a = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300743 log("I'm computing a piece of the answer")
744 6
745 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300746 val b = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300747 log("I'm computing another piece of the answer")
748 7
749 }
750 log("The answer is ${a.await() * b.await()}")
751}
752```
753
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300754> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300755
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300756There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300757and two coroutines computing deferred values `a` (#2) and `b` (#3).
758They are all executing in the context of `runBlocking` and are confined to the main thread.
759The output of this code is:
760
761```
762[main @coroutine#2] I'm computing a piece of the answer
763[main @coroutine#3] I'm computing another piece of the answer
764[main @coroutine#1] The answer is 42
765```
766
767The `log` function prints the name of the thread in square brackets and you can see, that it is the `main`
768thread, but the identifier of the currently executing coroutine is appended to it. This identifier
769is consecutively assigned to all created coroutines when debugging mode is turned on.
770
Roman Elizarov419a6c82017-02-09 18:36:22 +0300771You can read more about debugging facilities in the documentation for [newCoroutineContext] function.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300772
773### Jumping between threads
774
775Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
776
777```kotlin
778fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
779
780fun main(args: Array<String>) {
781 val ctx1 = newSingleThreadContext("Ctx1")
782 val ctx2 = newSingleThreadContext("Ctx2")
783 runBlocking(ctx1) {
784 log("Started in ctx1")
785 run(ctx2) {
786 log("Working in ctx2")
787 }
788 log("Back to ctx1")
789 }
790}
791```
792
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300793> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300794
Roman Elizarov419a6c82017-02-09 18:36:22 +0300795It demonstrates two new techniques. One is using [runBlocking] with an explicitly specified context, and
796the second one is using [run] function to change a context of a coroutine while still staying in the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300797same coroutine as you can see in the output below:
798
799```
800[Ctx1 @coroutine#1] Started in ctx1
801[Ctx2 @coroutine#1] Working in ctx2
802[Ctx1 @coroutine#1] Back to ctx1
803```
804
805### Job in the context
806
Roman Elizarov419a6c82017-02-09 18:36:22 +0300807The coroutine [Job] is part of its context. The coroutine can retrieve it from its own context
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300808using `context[Job]` expression:
809
810```kotlin
811fun main(args: Array<String>) = runBlocking<Unit> {
812 println("My job is ${context[Job]}")
813}
814```
815
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300816> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-05.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300817
818It produces
819
820```
821My job is BlockingCoroutine{isActive=true}
822```
823
Roman Elizarov419a6c82017-02-09 18:36:22 +0300824So, [isActive][CoroutineScope.isActive] in [CoroutineScope] is just a convenient shortcut for `context[Job]!!.isActive`.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300825
826### Children of a coroutine
827
Roman Elizarov419a6c82017-02-09 18:36:22 +0300828When [context][CoroutineScope.context] of a coroutine is used to launch another coroutine,
829the [Job] of the new coroutine becomes
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300830a _child_ of the parent coroutine's job. When the parent coroutine is cancelled, all its children
831are recursively cancelled, too.
832
833```kotlin
834fun main(args: Array<String>) = runBlocking<Unit> {
835 // start a coroutine to process some kind of incoming request
836 val request = launch(CommonPool) {
837 // it spawns two other jobs, one with its separate context
838 val job1 = launch(CommonPool) {
839 println("job1: I have my own context and execute independently!")
840 delay(1000)
841 println("job1: I am not affected by cancellation of the request")
842 }
843 // and the other inherits the parent context
844 val job2 = launch(context) {
845 println("job2: I am a child of the request coroutine")
846 delay(1000)
847 println("job2: I will not execute this line if my parent request is cancelled")
848 }
849 // request completes when both its sub-jobs complete:
850 job1.join()
851 job2.join()
852 }
853 delay(500)
854 request.cancel() // cancel processing of the request
855 delay(1000) // delay a second to see what happens
856 println("main: Who has survived request cancellation?")
857}
858```
859
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300860> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-06.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300861
862The output of this code is:
863
864```
865job1: I have my own context and execute independently!
866job2: I am a child of the request coroutine
867job1: I am not affected by cancellation of the request
868main: Who has survived request cancellation?
869```
870
871### Combining contexts
872
873Coroutine context can be combined using `+` operator. The context on the right-hand side replaces relevant entries
Roman Elizarov419a6c82017-02-09 18:36:22 +0300874of the context on the left-hand side. For example, a [Job] of the parent coroutine can be inherited, while
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300875its dispatcher replaced:
876
877```kotlin
878fun main(args: Array<String>) = runBlocking<Unit> {
879 // start a coroutine to process some kind of incoming request
880 val request = launch(context) { // use the context of `runBlocking`
881 // spawns CPU-intensive child job in CommonPool !!!
882 val job = launch(context + CommonPool) {
883 println("job: I am a child of the request coroutine, but with a different dispatcher")
884 delay(1000)
885 println("job: I will not execute this line if my parent request is cancelled")
886 }
887 job.join() // request completes when its sub-job completes
888 }
889 delay(500)
890 request.cancel() // cancel processing of the request
891 delay(1000) // delay a second to see what happens
892 println("main: Who has survived request cancellation?")
893}
894```
895
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300896> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-07.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300897
898The expected outcome of this code is:
899
900```
901job: I am a child of the request coroutine, but with a different dispatcher
902main: Who has survived request cancellation?
903```
904
905### Naming coroutines for debugging
906
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300907Automatically assigned ids are good when coroutines log often and you just need to correlate log records
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300908coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
909or doing some specific background task, it is better to name it explicitly for debugging purposes.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300910[CoroutineName] serves the same function as a thread name. It'll get displayed in the thread name that
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300911is executing this coroutine when debugging more is turned on.
912
913The following example demonstrates this concept:
914
915```kotlin
916fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
917
918fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
919 log("Started main coroutine")
920 // run two background value computations
Roman Elizarov32d95322017-02-09 15:57:31 +0300921 val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300922 log("Computing v1")
923 delay(500)
924 252
925 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300926 val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300927 log("Computing v2")
928 delay(1000)
929 6
930 }
931 log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
932}
933```
934
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300935> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300936
937The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
938
939```
940[main @main#1] Started main coroutine
941[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
942[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
943[main @main#1] The answer for v1 / v2 = 42
944```
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300945
Roman Elizarov2fd7cb32017-02-11 23:18:59 +0300946### Cancellation via explicit job
947
948Let us put our knowledge about contexts, children and jobs together. Assume that our application has
949an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application
950and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch
951and update data, do animations, etc. All of these coroutines must be cancelled when activity is destroyed
952to avoid memory leaks.
953
954We can manage a lifecycle of our coroutines by creating an instance of [Job] that is tied to
955the lifecycle of our activity. A job instance is created using [Job()][Job.invoke] factory function
956as the following example shows. We need to make sure that all the coroutines are started
957with this job in their context and then a single invocation of [Job.cancel] terminates them all.
958
959```kotlin
960fun main(args: Array<String>) = runBlocking<Unit> {
961 val job = Job() // create a job object to manage our lifecycle
962 // now launch ten coroutines for a demo, each working for a different time
963 val coroutines = List(10) { i ->
964 // they are all children of our job object
965 launch(context + job) { // we use the context of main runBlocking thread, but with our own job object
966 delay(i * 200L) // variable delay 0ms, 200ms, 400ms, ... etc
967 println("Coroutine $i is done")
968 }
969 }
970 println("Launched ${coroutines.size} coroutines")
971 delay(500L) // delay for half a second
972 println("Cancelling job!")
973 job.cancel() // cancel our job.. !!!
974 delay(1000L) // delay for more to see if our coroutines are still working
975}
976```
977
978> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt)
979
980The output of this example is:
981
982```
983Launched 10 coroutines
984Coroutine 0 is done
985Coroutine 1 is done
986Coroutine 2 is done
987Cancelling job!
988```
989
990As you can see, only the first three coroutines had printed a message and the others were cancelled
991by a single invocation of `job.cancel()`. So all we need to do in our hypothetical Android
992application is to create a parent job object when activity is created, use it for child coroutines,
993and cancel it when activity is destroyed.
994
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300995## Channels
Roman Elizarov7deefb82017-01-31 10:33:17 +0300996
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300997Deferred values provide a convenient way to transfer a single value between coroutines.
998Channels provide a way to transfer a stream of values.
999
1000<!--- INCLUDE .*/example-channel-([0-9]+).kt
1001import kotlinx.coroutines.experimental.channels.*
1002-->
1003
1004### Channel basics
1005
Roman Elizarov419a6c82017-02-09 18:36:22 +03001006A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
1007instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
1008a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001009
1010```kotlin
1011fun main(args: Array<String>) = runBlocking<Unit> {
1012 val channel = Channel<Int>()
1013 launch(CommonPool) {
1014 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
1015 for (x in 1..5) channel.send(x * x)
1016 }
1017 // here we print five received integers:
1018 repeat(5) { println(channel.receive()) }
1019 println("Done!")
1020}
1021```
1022
1023> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
1024
1025### Closing and iteration over channels
1026
1027Unlike a queue, a channel can be closed to indicate that no more elements are coming.
1028On the receiver side it is convenient to use a regular `for` loop to receive elements
1029from the channel.
1030
Roman Elizarov419a6c82017-02-09 18:36:22 +03001031Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001032The iteration stops as soon as this close token is received, so there is a guarantee
1033that all previously sent elements before the close are received:
1034
1035```kotlin
1036fun main(args: Array<String>) = runBlocking<Unit> {
1037 val channel = Channel<Int>()
1038 launch(CommonPool) {
1039 for (x in 1..5) channel.send(x * x)
1040 channel.close() // we're done sending
1041 }
1042 // here we print received values using `for` loop (until the channel is closed)
1043 for (y in channel) println(y)
1044 println("Done!")
1045}
1046```
1047
1048> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
1049
1050### Building channel producers
1051
Roman Elizarova5e653f2017-02-13 13:49:55 +03001052The pattern where a coroutine is producing a sequence of elements is quite common.
1053This is a part of _producer-consumer_ pattern that is often found in concurrent code.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001054You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
Roman Elizarova5e653f2017-02-13 13:49:55 +03001055to common sense that results must be returned from functions.
1056
1057There is a convenience coroutine builder named [produce] that makes it easy to do it right:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001058
1059```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001060fun produceSquares() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001061 for (x in 1..5) send(x * x)
1062}
1063
1064fun main(args: Array<String>) = runBlocking<Unit> {
1065 val squares = produceSquares()
1066 for (y in squares) println(y)
1067 println("Done!")
1068}
1069```
1070
1071> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
1072
1073### Pipelines
1074
1075Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
1076
1077```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001078fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001079 var x = 1
1080 while (true) send(x++) // infinite stream of integers starting from 1
1081}
1082```
1083
Roman Elizarova5e653f2017-02-13 13:49:55 +03001084And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001085In the below example the numbers are just squared:
1086
1087```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001088fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001089 for (x in numbers) send(x * x)
1090}
1091```
1092
Roman Elizarova5e653f2017-02-13 13:49:55 +03001093The main code starts and connects the whole pipeline:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001094
1095```kotlin
1096fun main(args: Array<String>) = runBlocking<Unit> {
1097 val numbers = produceNumbers() // produces integers from 1 and on
1098 val squares = square(numbers) // squares integers
1099 for (i in 1..5) println(squares.receive()) // print first five
1100 println("Done!") // we are done
1101 squares.cancel() // need to cancel these coroutines in a larger app
1102 numbers.cancel()
1103}
1104```
1105
1106> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
1107
1108We don't have to cancel these coroutines in this example app, because
1109[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
1110but in a larger app we'll need to stop our pipeline if we don't need it anymore.
1111Alternatively, we could have run pipeline coroutines as
1112[children of a coroutine](#children-of-a-coroutine).
1113
1114### Prime numbers with pipeline
1115
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001116Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001117of coroutines. We start with an infinite sequence of numbers. This time we introduce an
1118explicit context parameter, so that caller can control where our coroutines run:
1119
1120<!--- INCLUDE kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
1121import kotlin.coroutines.experimental.CoroutineContext
1122-->
1123
1124```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001125fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001126 var x = start
1127 while (true) send(x++) // infinite stream of integers from start
1128}
1129```
1130
1131The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1132that are divisible by the given prime number:
1133
1134```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001135fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001136 for (x in numbers) if (x % prime != 0) send(x)
1137}
1138```
1139
1140Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001141and launching new pipeline stage for each prime number found:
1142
1143```
Roman Elizarova5e653f2017-02-13 13:49:55 +03001144numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
Roman Elizarov62500ba2017-02-09 18:55:40 +03001145```
1146
1147The following example prints the first ten prime numbers,
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001148running the whole pipeline in the context of the main thread:
1149
1150```kotlin
1151fun main(args: Array<String>) = runBlocking<Unit> {
1152 var cur = numbersFrom(context, 2)
1153 for (i in 1..10) {
1154 val prime = cur.receive()
1155 println(prime)
1156 cur = filter(context, cur, prime)
1157 }
1158}
1159```
1160
1161> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
1162
1163The output of this code is:
1164
1165```
11662
11673
11685
11697
117011
117113
117217
117319
117423
117529
1176```
1177
Roman Elizarova5e653f2017-02-13 13:49:55 +03001178Note, that you can build the same pipeline using `buildIterator` coroutine builder from the standard library.
1179Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001180`ReceiveChannel` with `Iterator`, and get rid of the context. You will not need `runBlocking` either.
1181However, the benefit of a pipeline that uses channels as shown above is that it can actually use
1182multiple CPU cores if you run it in [CommonPool] context.
1183
Roman Elizarova5e653f2017-02-13 13:49:55 +03001184Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
Roman Elizarov62500ba2017-02-09 18:55:40 +03001185other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
1186built using `buildSeqeunce`/`buildIterator`, because they do not allow arbitrary suspension, unlike
Roman Elizarova5e653f2017-02-13 13:49:55 +03001187`produce` which is fully asynchronous.
Roman Elizarov62500ba2017-02-09 18:55:40 +03001188
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001189### Fan-out
1190
1191Multiple coroutines may receive from the same channel, distributing work between themselves.
1192Let us start with a producer coroutine that is periodically producing integers
1193(ten numbers per second):
1194
1195```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001196fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001197 var x = 1 // start from 1
1198 while (true) {
1199 send(x++) // produce next
1200 delay(100) // wait 0.1s
1201 }
1202}
1203```
1204
1205Then we can have several processor coroutines. In this example, they just print their id and
1206received number:
1207
1208```kotlin
1209fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
1210 while (true) {
1211 val x = channel.receive()
1212 println("Processor #$id received $x")
1213 }
1214}
1215```
1216
1217Now let us launch five processors and let them work for a second. See what happens:
1218
1219```kotlin
1220fun main(args: Array<String>) = runBlocking<Unit> {
1221 val producer = produceNumbers()
1222 repeat(5) { launchProcessor(it, producer) }
1223 delay(1000)
1224 producer.cancel() // cancel producer coroutine and thus kill them all
1225}
1226```
1227
1228> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
1229
1230The output will be similar to the the following one, albeit the processor ids that receive
1231each specific integer may be different:
1232
1233```
1234Processor #2 received 1
1235Processor #4 received 2
1236Processor #0 received 3
1237Processor #1 received 4
1238Processor #3 received 5
1239Processor #2 received 6
1240Processor #4 received 7
1241Processor #0 received 8
1242Processor #1 received 9
1243Processor #3 received 10
1244```
1245
1246Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1247over the channel that processor coroutines are doing.
1248
1249### Fan-in
1250
1251Multiple coroutines may send to the same channel.
1252For example, let us have a channel of strings, and a suspending function that
1253repeatedly sends a specified string to this channel with a specified delay:
1254
1255```kotlin
1256suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1257 while (true) {
1258 delay(time)
1259 channel.send(s)
1260 }
1261}
1262```
1263
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001264Now, let us see what happens if we launch a couple of coroutines sending strings
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001265(in this example we launch them in the context of the main thread):
1266
1267```kotlin
1268fun main(args: Array<String>) = runBlocking<Unit> {
1269 val channel = Channel<String>()
1270 launch(context) { sendString(channel, "foo", 200L) }
1271 launch(context) { sendString(channel, "BAR!", 500L) }
1272 repeat(6) { // receive first six
1273 println(channel.receive())
1274 }
1275}
1276```
1277
1278> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
1279
1280The output is:
1281
1282```
1283foo
1284foo
1285BAR!
1286foo
1287foo
1288BAR!
1289```
1290
1291### Buffered channels
1292
1293The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1294meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1295if receive is invoked first, it is suspended until send is invoked.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001296
Roman Elizarova5e653f2017-02-13 13:49:55 +03001297Both [Channel()][Channel.invoke] factory function and [produce] builder take an optional `capacity` parameter to
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001298specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1299similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1300
1301Take a look at the behavior of the following code:
1302
1303```kotlin
1304fun main(args: Array<String>) = runBlocking<Unit> {
1305 val channel = Channel<Int>(4) // create buffered channel
1306 launch(context) { // launch sender coroutine
1307 repeat(10) {
1308 println("Sending $it") // print before sending each element
1309 channel.send(it) // will suspend when buffer is full
1310 }
1311 }
1312 // don't receive anything... just wait....
1313 delay(1000)
1314}
1315```
1316
1317> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
1318
1319It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1320
1321```
1322Sending 0
1323Sending 1
1324Sending 2
1325Sending 3
1326Sending 4
1327```
1328
1329The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001330
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001331## Select expression
1332
1333Select expression makes it possible to await multiple suspending function simultaneously and _select_
1334the first one that becomes available.
1335
1336<!--- INCLUDE .*/example-select-([0-9]+).kt
1337import kotlinx.coroutines.experimental.channels.*
1338import kotlinx.coroutines.experimental.selects.*
1339-->
1340
1341### Selecting from channels
1342
1343Let us have two channels of strings `fizz` and `buzz`. The `fizz` channel produces "Fizz" string every 300 ms:
1344
1345```kotlin
1346val fizz = produce<String>(CommonPool) { // produce using common thread pool
1347 while (true) {
1348 delay(300)
1349 send("Fizz")
1350 }
1351}
1352```
1353
1354And the `buzz` channel produces "Buzz!" string every 500 ms:
1355
1356```kotlin
1357val buzz = produce<String>(CommonPool) {
1358 while (true) {
1359 delay(500)
1360 send("Buzz!")
1361 }
1362}
1363```
1364
1365Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the
1366other. But [select] expression allows us to receive from _both_ simultaneously using its
1367[onReceive][SelectBuilder.onReceive] clauses:
1368
1369```kotlin
1370suspend fun selectFizzBuzz() {
1371 select<Unit> { // <Unit> means that this select expression does not produce any result
1372 fizz.onReceive { value -> // this is the first select clause
1373 println("fizz -> '$value'")
1374 }
1375 buzz.onReceive { value -> // this is the second select clause
1376 println("buzz -> '$value'")
1377 }
1378 }
1379}
1380```
1381
1382Let us run it for 7 times:
1383
1384```kotlin
1385fun main(args: Array<String>) = runBlocking<Unit> {
1386 repeat(7) {
1387 selectFizzBuzz()
1388 }
1389}
1390```
1391
1392> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt)
1393
1394The result of this code is:
1395
1396```
1397fizz -> 'Fizz'
1398buzz -> 'Buzz!'
1399fizz -> 'Fizz'
1400fizz -> 'Fizz'
1401buzz -> 'Buzz!'
1402fizz -> 'Fizz'
1403buzz -> 'Buzz!'
1404```
1405
1406### Selecting on close
1407
1408The [onReceive][SelectBuilder.onReceive] clause in `select` fails when the channel is closed and the corresponding
1409`select` throws an exception. We can use [onReceiveOrNull][SelectBuilder.onReceiveOrNull] clause to perform a
1410specific action when channel is closed. This example also show that `select` is an expression that returns
1411the result of its selected clause:
1412
1413```kotlin
1414suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
1415 select<String> {
1416 a.onReceiveOrNull { value ->
1417 if (value == null)
1418 "Channel 'a' is closed"
1419 else
1420 "a -> '$value'"
1421 }
1422 b.onReceiveOrNull { value ->
1423 if (value == null)
1424 "Channel 'b' is closed"
1425 else
1426 "b -> '$value'"
1427 }
1428 }
1429```
1430
1431Lets have channel `a` that produces "Hello" string 4 and `b` that produces "World" 4 times for this example:
1432
1433```kotlin
1434fun main(args: Array<String>) = runBlocking<Unit> {
1435 // we are using the context of the main thread in this example for predictability ...
1436 val a = produce<String>(context) {
1437 repeat(4) {
1438 send("Hello $it")
1439 }
1440 }
1441 val b = produce<String>(context) {
1442 repeat(4) {
1443 send("World $it")
1444 }
1445 }
1446 repeat(8) { // print first eight results
1447 println(selectAorB(a, b))
1448 }
1449}
1450```
1451
1452> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt)
1453
1454The result of this code is quite interesting, so we'll analyze it in mode details:
1455
1456```
1457a -> 'Hello 0'
1458a -> 'Hello 1'
1459b -> 'World 0'
1460a -> 'Hello 2'
1461a -> 'Hello 3'
1462b -> 'World 1'
1463Channel 'a' is closed
1464Channel 'a' is closed
1465```
1466
1467There are a couple of observations to make out of it.
1468
1469First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time,
1470the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel,
1471being the first clause in select wins. However, because we are using unbuffered channel, the `a` gets suspended from
1472time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
1473
1474The second observation, is that [onReceiveOrNull][SelectBuilder.onReceiveOrNull] gets immediately selected when the
1475channel is already closed.
1476
1477### Selecting to send
1478
1479Select expression has [onSend][SelectBuilder.onSend] clause that can be used for a great good in combination
1480with a biased nature of selection.
1481
1482Let us write an example of producer of integer numbers that sends its values to a `side` channel when
1483the consumers on its primary channel cannot keep up with it:
1484
1485```kotlin
1486fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
1487 for (num in 1..10) { // produce 10 numbers from 1 to 10
1488 delay(100) // every 100 ms
1489 select<Unit> {
1490 onSend(num) { } // Send to the primary channel
1491 side.onSend(num) { } // or to the side channel
1492 }
1493 }
1494}
1495```
1496
1497Consumer is going to be quite slow, taking 250 ms to process each number:
1498
1499```kotlin
1500fun main(args: Array<String>) = runBlocking<Unit> {
1501 val side = Channel<Int>() // allocate side channel
1502 launch(context) { // this is a very fast consumer for the side channel
1503 for (num in side) println("Side channel has $num")
1504 }
1505 for (num in produceNumbers(side)) {
1506 println("Consuming $num")
1507 delay(250) // let us digest the consumed number properly, do not hurry
1508 }
1509 println("Done consuming")
1510}
1511```
1512
1513> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt)
1514
1515So let us see what happens:
1516
1517```
1518Consuming 1
1519Side channel has 2
1520Side channel has 3
1521Consuming 4
1522Side channel has 5
1523Side channel has 6
1524Consuming 7
1525Side channel has 8
1526Side channel has 9
1527Consuming 10
1528Done consuming
1529```
1530
1531### Selecting deferred values
1532
1533Deferred values can be selected using [onAwait][SelectBuilder.onAwait] clause, which enables "wait first"
1534type of logic. Let us start with an async-style function that returns a deferred string value after
1535a random delay:
1536
1537<!--- INCLUDE .*/example-select-04.kt
1538import java.util.*
1539-->
1540
1541```kotlin
1542fun asyncString(time: Int) = async(CommonPool) {
1543 delay(time.toLong())
1544 "Waited for $time ms"
1545}
1546```
1547
1548Let us start a dozen for them with random delay with the following function that returns a
1549collection of deferred values:
1550
1551```kotlin
1552fun asyncStringsList(): List<Deferred<String>> {
1553 val random = Random(3)
1554 return (1..12).map { asyncString(random.nextInt(1000)) }
1555}
1556```
1557
1558Now the main function awaits for the first of them to complete and count the number of deferred values
1559that are still active. Note, that we've used here the fact that `select` expression is a Kotlin DSL,
1560and we can provide clauses for it using an arbitrary code. In this case we iterate over a list
1561of deferred values to produce an `onAwait` clause for each one of them.
1562
1563```kotlin
1564fun main(args: Array<String>) = runBlocking<Unit> {
1565 val list = asyncStringsList()
1566 val result = select<String> {
1567 list.withIndex().forEach { (index, deferred) ->
1568 deferred.onAwait { answer ->
1569 "Deferred $index produced answer '$answer'"
1570 }
1571 }
1572 }
1573 println(result)
1574 val countActive = list.sumBy { deferred -> if (deferred.isActive) 1 else 0 }
1575 println("$countActive coroutines are still active")
1576}
1577```
1578
1579> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-04.kt)
1580
1581The output is:
1582
1583```
1584Deferred 4 produced answer 'Waited for 254 ms'
158511 coroutines are still active
1586```
1587
1588### Switch over a channel of deferred values
1589
1590Let us write a channel producer function that consumes a channel of deferred string values, await for each received
1591deferred value, but only until next deferred value comes over or the channel is closed. This example puts together
1592[onReceiveOrNull][SelectBuilder.onReceiveOrNull] and [onAwait][SelectBuilder.onAwait] clauses in the same `select`:
1593
1594```kotlin
1595fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String>(CommonPool) {
1596 var current = input.receive() // will start with first received deferred value
1597 while (isActive) { // loop while not cancelled/closed
1598 val next = select<Deferred<String>?> { // return next deferred value from this select or null
1599 input.onReceiveOrNull { update ->
1600 update // replaces next value to wait
1601 }
1602 current.onAwait { value ->
1603 send(value) // send value that current deferred has produced
1604 input.receiveOrNull() // and use the next deferred from the input channel
1605 }
1606 }
1607 if (next == null) {
1608 println("Channel was closed")
1609 break // out of loop
1610 } else {
1611 current = next
1612 }
1613 }
1614}
1615```
1616
1617To test it, we'll use a simple async function that resolves to a specified string after a specified time:
1618
1619```kotlin
1620fun asyncString(str: String, time: Long) = async(CommonPool) {
1621 delay(time)
1622 str
1623}
1624```
1625
1626The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
1627data to it:
1628
1629```kotlin
1630fun main(args: Array<String>) = runBlocking<Unit> {
1631 val chan = Channel<Deferred<String>>() // the channel for test
1632 launch(context) { // launch printing coroutines
1633 for (s in switchMapDeferreds(chan))
1634 println(s) // print each received string
1635 }
1636 chan.send(asyncString("BEGIN", 100))
1637 delay(200) // enough time for "BEGIN" to be produced
1638 chan.send(asyncString("Slow", 500))
1639 delay(100) // not enough time for slow
1640 chan.send(asyncString("Replace", 100))
1641 delay(500) // will give it time before the last one
1642 chan.send(asyncString("END", 500))
1643 delay(1000) // give it time to process
1644 chan.close() // and close the channel immediately
1645 delay(500) // and wait some time to let it finish
1646}
1647```
1648
1649> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-05.kt)
1650
1651The result of this code:
1652
1653```
1654BEGIN
1655Replace
1656END
1657Channel was closed
1658```
1659
1660
Roman Elizarove0c817d2017-02-10 10:22:01 +03001661<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core -->
1662<!--- DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
1663<!--- INDEX kotlinx.coroutines.experimental -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001664[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
1665[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/delay.html
1666[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
1667[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
1668[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-cancellation-exception.html
1669[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
1670[CoroutineScope.isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/is-active.html
1671[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
1672[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
1673[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
1674[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
1675[async]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/async.html
1676[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
1677[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/await.html
1678[Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/start.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03001679[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
1680[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001681[CoroutineScope.context]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/context.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03001682[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03001683[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001684[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
1685[Job.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/invoke.html
1686[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/cancel.html
Roman Elizarove0c817d2017-02-10 10:22:01 +03001687<!--- INDEX kotlinx.coroutines.experimental.channels -->
Roman Elizarov419a6c82017-02-09 18:36:22 +03001688[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001689[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/send.html
1690[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/receive.html
1691[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/close.html
Roman Elizarova5e653f2017-02-13 13:49:55 +03001692[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001693[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/invoke.html
1694<!--- INDEX kotlinx.coroutines.experimental.selects -->
1695[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
1696[SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-receive.html
1697[SelectBuilder.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-receive-or-null.html
1698[SelectBuilder.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-send.html
1699[SelectBuilder.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-await.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03001700<!--- END -->