blob: 954ae6b20a4a92ddf6fbdda1c231c9ae59ca84f4 [file] [log] [blame] [view]
Roman Elizarov43e90112017-05-10 11:25:20 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt
Roman Elizarova5e653f2017-02-13 13:49:55 +03002/*
3 * Copyright 2016-2017 JetBrains s.r.o.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
Roman Elizarovf16fd272017-02-07 11:26:00 +030017
Roman Elizarova5e653f2017-02-13 13:49:55 +030018// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
19package guide.$$1.example$$2
Roman Elizarovf16fd272017-02-07 11:26:00 +030020
Roman Elizarova5e653f2017-02-13 13:49:55 +030021import kotlinx.coroutines.experimental.*
Roman Elizarovf16fd272017-02-07 11:26:00 +030022-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +030023<!--- KNIT kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
24<!--- TEST_OUT kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
25// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
26package guide.test
27
28import org.junit.Test
29
30class GuideTest {
31-->
Roman Elizarovf16fd272017-02-07 11:26:00 +030032
Roman Elizarov7deefb82017-01-31 10:33:17 +030033# Guide to kotlinx.coroutines by example
34
35This is a short guide on core features of `kotlinx.coroutines` with a series of examples.
36
Roman Elizarov2a638922017-03-04 10:22:43 +030037## Introduction and setup
38
39Kotlin, as a language, provides only minimal low-level APIs in its standard library to enable various other
40libraries to utilize coroutines. Unlike many other languages with similar capabilities, `async` and `await`
41are not keywords in Kotlin and are not even part of its standard library.
42
Robert Hencke497d3432017-04-11 00:14:29 -040043`kotlinx.coroutines` is one such rich library. It contains a number of high-level
Roman Elizarov2a638922017-03-04 10:22:43 +030044coroutine-enabled primitives that this guide covers, including `async` and `await`.
45You need to add a dependency on `kotlinx-coroutines-core` module as explained
46[here](README.md#using-in-your-projects) to use primitives from this guide in your projects.
47
Roman Elizarov1293ccd2017-02-01 18:49:54 +030048## Table of contents
49
Roman Elizarovfa7723e2017-02-06 11:17:51 +030050<!--- TOC -->
51
Roman Elizarov1293ccd2017-02-01 18:49:54 +030052* [Coroutine basics](#coroutine-basics)
53 * [Your first coroutine](#your-first-coroutine)
54 * [Bridging blocking and non-blocking worlds](#bridging-blocking-and-non-blocking-worlds)
55 * [Waiting for a job](#waiting-for-a-job)
56 * [Extract function refactoring](#extract-function-refactoring)
57 * [Coroutines ARE light-weight](#coroutines-are-light-weight)
58 * [Coroutines are like daemon threads](#coroutines-are-like-daemon-threads)
59* [Cancellation and timeouts](#cancellation-and-timeouts)
60 * [Cancelling coroutine execution](#cancelling-coroutine-execution)
61 * [Cancellation is cooperative](#cancellation-is-cooperative)
62 * [Making computation code cancellable](#making-computation-code-cancellable)
63 * [Closing resources with finally](#closing-resources-with-finally)
64 * [Run non-cancellable block](#run-non-cancellable-block)
65 * [Timeout](#timeout)
66* [Composing suspending functions](#composing-suspending-functions)
67 * [Sequential by default](#sequential-by-default)
Roman Elizarov32d95322017-02-09 15:57:31 +030068 * [Concurrent using async](#concurrent-using-async)
69 * [Lazily started async](#lazily-started-async)
70 * [Async-style functions](#async-style-functions)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030071* [Coroutine context and dispatchers](#coroutine-context-and-dispatchers)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030072 * [Dispatchers and threads](#dispatchers-and-threads)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030073 * [Unconfined vs confined dispatcher](#unconfined-vs-confined-dispatcher)
74 * [Debugging coroutines and threads](#debugging-coroutines-and-threads)
75 * [Jumping between threads](#jumping-between-threads)
76 * [Job in the context](#job-in-the-context)
77 * [Children of a coroutine](#children-of-a-coroutine)
78 * [Combining contexts](#combining-contexts)
79 * [Naming coroutines for debugging](#naming-coroutines-for-debugging)
Roman Elizarov2fd7cb32017-02-11 23:18:59 +030080 * [Cancellation via explicit job](#cancellation-via-explicit-job)
Roman Elizarovb7721cf2017-02-03 19:23:08 +030081* [Channels](#channels)
82 * [Channel basics](#channel-basics)
83 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
84 * [Building channel producers](#building-channel-producers)
85 * [Pipelines](#pipelines)
86 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
87 * [Fan-out](#fan-out)
88 * [Fan-in](#fan-in)
89 * [Buffered channels](#buffered-channels)
Roman Elizarovb0517ba2017-02-27 14:03:14 +030090 * [Channels are fair](#channels-are-fair)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030091* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
92 * [The problem](#the-problem)
Roman Elizarov1e459602017-02-27 11:05:17 +030093 * [Volatiles are of no help](#volatiles-are-of-no-help)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030094 * [Thread-safe data structures](#thread-safe-data-structures)
Roman Elizarov1e459602017-02-27 11:05:17 +030095 * [Thread confinement fine-grained](#thread-confinement-fine-grained)
96 * [Thread confinement coarse-grained](#thread-confinement-coarse-grained)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030097 * [Mutual exclusion](#mutual-exclusion)
98 * [Actors](#actors)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +030099* [Select expression](#select-expression)
100 * [Selecting from channels](#selecting-from-channels)
101 * [Selecting on close](#selecting-on-close)
102 * [Selecting to send](#selecting-to-send)
103 * [Selecting deferred values](#selecting-deferred-values)
104 * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values)
Roman Elizarov8db17332017-03-09 12:40:45 +0300105* [Further reading](#further-reading)
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300106
Roman Elizarova5e653f2017-02-13 13:49:55 +0300107<!--- END_TOC -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300108
109## Coroutine basics
110
111This section covers basic coroutine concepts.
112
113### Your first coroutine
Roman Elizarov7deefb82017-01-31 10:33:17 +0300114
115Run the following code:
116
117```kotlin
118fun main(args: Array<String>) {
119 launch(CommonPool) { // create new coroutine in common thread pool
120 delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
121 println("World!") // print after delay
122 }
123 println("Hello,") // main function continues while coroutine is delayed
124 Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
125}
126```
127
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300128> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300129
130Run this code:
131
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300132```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300133Hello,
134World!
135```
136
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300137<!--- TEST -->
138
Roman Elizarov419a6c82017-02-09 18:36:22 +0300139Essentially, coroutines are light-weight threads.
140They are launched with [launch] _coroutine builder_.
141You can achieve the same result replacing
Roman Elizarov7deefb82017-01-31 10:33:17 +0300142`launch(CommonPool) { ... }` with `thread { ... }` and `delay(...)` with `Thread.sleep(...)`. Try it.
143
144If you start by replacing `launch(CommonPool)` by `thread`, the compiler produces the following error:
145
146```
147Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
148```
149
Roman Elizarov419a6c82017-02-09 18:36:22 +0300150That is because [delay] is a special _suspending function_ that does not block a thread, but _suspends_
Roman Elizarov7deefb82017-01-31 10:33:17 +0300151coroutine and it can be only used from a coroutine.
152
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300153### Bridging blocking and non-blocking worlds
Roman Elizarov7deefb82017-01-31 10:33:17 +0300154
155The first example mixes _non-blocking_ `delay(...)` and _blocking_ `Thread.sleep(...)` in the same
156code of `main` function. It is easy to get lost. Let's cleanly separate blocking and non-blocking
Roman Elizarov419a6c82017-02-09 18:36:22 +0300157worlds by using [runBlocking]:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300158
159```kotlin
160fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
161 launch(CommonPool) { // create new coroutine in common thread pool
162 delay(1000L)
163 println("World!")
164 }
165 println("Hello,") // main coroutine continues while child is delayed
166 delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
167}
168```
169
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300170> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300171
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300172<!--- TEST
173Hello,
174World!
175-->
176
Roman Elizarov419a6c82017-02-09 18:36:22 +0300177The result is the same, but this code uses only non-blocking [delay].
Roman Elizarov7deefb82017-01-31 10:33:17 +0300178
179`runBlocking { ... }` works as an adaptor that is used here to start the top-level main coroutine.
180The regular code outside of `runBlocking` _blocks_, until the coroutine inside `runBlocking` is active.
181
182This is also a way to write unit-tests for suspending functions:
183
184```kotlin
185class MyTest {
186 @Test
187 fun testMySuspendingFunction() = runBlocking<Unit> {
188 // here we can use suspending functions using any assertion style that we like
189 }
190}
191```
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300192
193<!--- CLEAR -->
Roman Elizarov7deefb82017-01-31 10:33:17 +0300194
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300195### Waiting for a job
Roman Elizarov7deefb82017-01-31 10:33:17 +0300196
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300197Delaying for a time while another coroutine is working is not a good approach. Let's explicitly
Roman Elizarov419a6c82017-02-09 18:36:22 +0300198wait (in a non-blocking way) until the background [Job] that we have launched is complete:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300199
200```kotlin
201fun main(args: Array<String>) = runBlocking<Unit> {
202 val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
203 delay(1000L)
204 println("World!")
205 }
206 println("Hello,")
207 job.join() // wait until child coroutine completes
208}
209```
210
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300211> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300212
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300213<!--- TEST
214Hello,
215World!
216-->
217
Roman Elizarov7deefb82017-01-31 10:33:17 +0300218Now the result is still the same, but the code of the main coroutine is not tied to the duration of
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300219the background job in any way. Much better.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300220
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300221### Extract function refactoring
Roman Elizarov7deefb82017-01-31 10:33:17 +0300222
223Let's extract the block of code inside `launch(CommonPool} { ... }` into a separate function. When you
224perform "Extract function" refactoring on this code you get a new function with `suspend` modifier.
225That is your first _suspending function_. Suspending functions can be used inside coroutines
226just like regular functions, but their additional feature is that they can, in turn,
227use other suspending functions, like `delay` in this example, to _suspend_ execution of a coroutine.
228
229```kotlin
230fun main(args: Array<String>) = runBlocking<Unit> {
231 val job = launch(CommonPool) { doWorld() }
232 println("Hello,")
233 job.join()
234}
235
236// this is your first suspending function
237suspend fun doWorld() {
238 delay(1000L)
239 println("World!")
240}
241```
242
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300243> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-04.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300244
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300245<!--- TEST
246Hello,
247World!
248-->
249
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300250### Coroutines ARE light-weight
Roman Elizarov7deefb82017-01-31 10:33:17 +0300251
252Run the following code:
253
254```kotlin
255fun main(args: Array<String>) = runBlocking<Unit> {
256 val jobs = List(100_000) { // create a lot of coroutines and list their jobs
257 launch(CommonPool) {
258 delay(1000L)
259 print(".")
260 }
261 }
262 jobs.forEach { it.join() } // wait for all jobs to complete
263}
264```
265
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300266> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-05.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300267
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300268<!--- TEST lines.size == 1 && lines[0] == ".".repeat(100_000) -->
269
Roman Elizarov7deefb82017-01-31 10:33:17 +0300270It starts 100K coroutines and, after a second, each coroutine prints a dot.
271Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
272
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300273### Coroutines are like daemon threads
Roman Elizarov7deefb82017-01-31 10:33:17 +0300274
275The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300276returns from the main function after some delay:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300277
278```kotlin
279fun main(args: Array<String>) = runBlocking<Unit> {
280 launch(CommonPool) {
281 repeat(1000) { i ->
282 println("I'm sleeping $i ...")
283 delay(500L)
284 }
285 }
286 delay(1300L) // just quit after delay
287}
288```
289
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300290> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-06.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300291
292You can run and see that it prints three lines and terminates:
293
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300294```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300295I'm sleeping 0 ...
296I'm sleeping 1 ...
297I'm sleeping 2 ...
298```
299
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300300<!--- TEST -->
301
Roman Elizarov7deefb82017-01-31 10:33:17 +0300302Active coroutines do not keep the process alive. They are like daemon threads.
303
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300304## Cancellation and timeouts
305
306This section covers coroutine cancellation and timeouts.
307
308### Cancelling coroutine execution
Roman Elizarov7deefb82017-01-31 10:33:17 +0300309
310In small application the return from "main" method might sound like a good idea to get all coroutines
311implicitly terminated. In a larger, long-running application, you need finer-grained control.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300312The [launch] function returns a [Job] that can be used to cancel running coroutine:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300313
314```kotlin
315fun main(args: Array<String>) = runBlocking<Unit> {
316 val job = launch(CommonPool) {
317 repeat(1000) { i ->
318 println("I'm sleeping $i ...")
319 delay(500L)
320 }
321 }
322 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300323 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300324 job.cancel() // cancels the job
325 delay(1300L) // delay a bit to ensure it was cancelled indeed
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300326 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300327}
328```
329
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300330> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300331
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300332It produces the following output:
333
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300334```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300335I'm sleeping 0 ...
336I'm sleeping 1 ...
337I'm sleeping 2 ...
338main: I'm tired of waiting!
339main: Now I can quit.
340```
341
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300342<!--- TEST -->
343
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300344As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
345
346### Cancellation is cooperative
Roman Elizarov7deefb82017-01-31 10:33:17 +0300347
Tair Rzayevaf734622017-02-01 22:30:16 +0200348Coroutine cancellation is _cooperative_. A coroutine code has to cooperate to be cancellable.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300349All the suspending functions in `kotlinx.coroutines` are _cancellable_. They check for cancellation of
Roman Elizarov419a6c82017-02-09 18:36:22 +0300350coroutine and throw [CancellationException] when cancelled. However, if a coroutine is working in
Roman Elizarov7deefb82017-01-31 10:33:17 +0300351a computation and does not check for cancellation, then it cannot be cancelled, like the following
352example shows:
353
354```kotlin
355fun main(args: Array<String>) = runBlocking<Unit> {
356 val job = launch(CommonPool) {
357 var nextPrintTime = 0L
358 var i = 0
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300359 while (i < 10) { // computation loop
Roman Elizarov7deefb82017-01-31 10:33:17 +0300360 val currentTime = System.currentTimeMillis()
361 if (currentTime >= nextPrintTime) {
362 println("I'm sleeping ${i++} ...")
363 nextPrintTime = currentTime + 500L
364 }
365 }
366 }
367 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300368 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300369 job.cancel() // cancels the job
370 delay(1300L) // delay a bit to see if it was cancelled....
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300371 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300372}
373```
374
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300375> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300376
377Run it to see that it continues to print "I'm sleeping" even after cancellation.
378
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300379<!--- TEST
380I'm sleeping 0 ...
381I'm sleeping 1 ...
382I'm sleeping 2 ...
383main: I'm tired of waiting!
384I'm sleeping 3 ...
385I'm sleeping 4 ...
386I'm sleeping 5 ...
387main: Now I can quit.
388-->
389
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300390### Making computation code cancellable
Roman Elizarov7deefb82017-01-31 10:33:17 +0300391
392There are two approaches to making computation code cancellable. The first one is to periodically
Roman Elizarov419a6c82017-02-09 18:36:22 +0300393invoke a suspending function. There is a [yield] function that is a good choice for that purpose.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300394The other one is to explicitly check the cancellation status. Let us try the later approach.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300395
396Replace `while (true)` in the previous example with `while (isActive)` and rerun it.
397
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300398```kotlin
399fun main(args: Array<String>) = runBlocking<Unit> {
400 val job = launch(CommonPool) {
401 var nextPrintTime = 0L
402 var i = 0
403 while (isActive) { // cancellable computation loop
404 val currentTime = System.currentTimeMillis()
405 if (currentTime >= nextPrintTime) {
406 println("I'm sleeping ${i++} ...")
407 nextPrintTime = currentTime + 500L
408 }
409 }
410 }
411 delay(1300L) // delay a bit
412 println("main: I'm tired of waiting!")
413 job.cancel() // cancels the job
414 delay(1300L) // delay a bit to see if it was cancelled....
415 println("main: Now I can quit.")
416}
417```
418
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300419> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300420
Roman Elizarov419a6c82017-02-09 18:36:22 +0300421As you can see, now this loop can be cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
422the code of coroutines via [CoroutineScope] object.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300423
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300424<!--- TEST
425I'm sleeping 0 ...
426I'm sleeping 1 ...
427I'm sleeping 2 ...
428main: I'm tired of waiting!
429main: Now I can quit.
430-->
431
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300432### Closing resources with finally
433
Roman Elizarov419a6c82017-02-09 18:36:22 +0300434Cancellable suspending functions throw [CancellationException] on cancellation which can be handled in
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300435all the usual way. For example, the `try {...} finally {...}` and Kotlin `use` function execute their
436finalization actions normally when coroutine is cancelled:
437
438```kotlin
439fun main(args: Array<String>) = runBlocking<Unit> {
440 val job = launch(CommonPool) {
441 try {
442 repeat(1000) { i ->
443 println("I'm sleeping $i ...")
444 delay(500L)
445 }
446 } finally {
447 println("I'm running finally")
448 }
449 }
450 delay(1300L) // delay a bit
451 println("main: I'm tired of waiting!")
452 job.cancel() // cancels the job
453 delay(1300L) // delay a bit to ensure it was cancelled indeed
454 println("main: Now I can quit.")
455}
456```
457
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300458> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300459
460The example above produces the following output:
461
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300462```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300463I'm sleeping 0 ...
464I'm sleeping 1 ...
465I'm sleeping 2 ...
466main: I'm tired of waiting!
467I'm running finally
468main: Now I can quit.
469```
470
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300471<!--- TEST -->
472
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300473### Run non-cancellable block
474
475Any attempt to use a suspending function in the `finally` block of the previous example will cause
Roman Elizarov419a6c82017-02-09 18:36:22 +0300476[CancellationException], because the coroutine running this code is cancelled. Usually, this is not a
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300477problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
478communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
479rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
Roman Elizarov419a6c82017-02-09 18:36:22 +0300480`run(NonCancellable) {...}` using [run] function and [NonCancellable] context as the following example shows:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300481
482```kotlin
483fun main(args: Array<String>) = runBlocking<Unit> {
484 val job = launch(CommonPool) {
485 try {
486 repeat(1000) { i ->
487 println("I'm sleeping $i ...")
488 delay(500L)
489 }
490 } finally {
491 run(NonCancellable) {
492 println("I'm running finally")
493 delay(1000L)
494 println("And I've just delayed for 1 sec because I'm non-cancellable")
495 }
496 }
497 }
498 delay(1300L) // delay a bit
499 println("main: I'm tired of waiting!")
500 job.cancel() // cancels the job
501 delay(1300L) // delay a bit to ensure it was cancelled indeed
502 println("main: Now I can quit.")
503}
504```
505
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300506> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300507
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300508<!--- TEST
509I'm sleeping 0 ...
510I'm sleeping 1 ...
511I'm sleeping 2 ...
512main: I'm tired of waiting!
513I'm running finally
514And I've just delayed for 1 sec because I'm non-cancellable
515main: Now I can quit.
516-->
517
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300518### Timeout
519
520The most obvious reason to cancel coroutine execution in practice,
521is because its execution time has exceeded some timeout.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300522While you can manually track the reference to the corresponding [Job] and launch a separate coroutine to cancel
523the tracked one after delay, there is a ready to use [withTimeout] function that does it.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300524Look at the following example:
525
526```kotlin
527fun main(args: Array<String>) = runBlocking<Unit> {
528 withTimeout(1300L) {
529 repeat(1000) { i ->
530 println("I'm sleeping $i ...")
531 delay(500L)
532 }
533 }
534}
535```
536
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300537> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-06.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300538
539It produces the following output:
540
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300541```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300542I'm sleeping 0 ...
543I'm sleeping 1 ...
544I'm sleeping 2 ...
Roman Elizarovca9d5be2017-04-20 19:23:18 +0300545Exception in thread "main" kotlinx.coroutines.experimental.TimeoutException: Timed out waiting for 1300 MILLISECONDS
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300546```
547
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300548<!--- TEST STARTS_WITH -->
549
Roman Elizarovca9d5be2017-04-20 19:23:18 +0300550The `TimeoutException` that is thrown by [withTimeout] is a private subclass of [CancellationException].
551We have not seen its stack trace printed on the console before. That is because
Roman Elizarov7c864d82017-02-27 10:17:50 +0300552inside a cancelled coroutine `CancellationException` is considered to be a normal reason for coroutine completion.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300553However, in this example we have used `withTimeout` right inside the `main` function.
554
555Because cancellation is just an exception, all the resources will be closed in a usual way.
556You can wrap the code with timeout in `try {...} catch (e: CancellationException) {...}` block if
557you need to do some additional action specifically on timeout.
558
559## Composing suspending functions
560
561This section covers various approaches to composition of suspending functions.
562
563### Sequential by default
564
565Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300566remote service call or computation. We just pretend they are useful, but actually each one just
567delays for a second for the purpose of this example:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300568
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300569<!--- INCLUDE .*/example-compose-([0-9]+).kt
570import kotlin.system.measureTimeMillis
571-->
572
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300573```kotlin
574suspend fun doSomethingUsefulOne(): Int {
575 delay(1000L) // pretend we are doing something useful here
576 return 13
577}
578
579suspend fun doSomethingUsefulTwo(): Int {
580 delay(1000L) // pretend we are doing something useful here, too
581 return 29
582}
583```
584
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300585<!--- INCLUDE .*/example-compose-([0-9]+).kt -->
586
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300587What do we do if need to invoke them _sequentially_ -- first `doSomethingUsefulOne` _and then_
588`doSomethingUsefulTwo` and compute the sum of their results?
589In practise we do this if we use the results of the first function to make a decision on whether we need
590to invoke the second one or to decide on how to invoke it.
591
592We just use a normal sequential invocation, because the code in the coroutine, just like in the regular
Roman Elizarov32d95322017-02-09 15:57:31 +0300593code, is _sequential_ by default. The following example demonstrates it by measuring the total
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300594time it takes to execute both suspending functions:
595
596```kotlin
597fun main(args: Array<String>) = runBlocking<Unit> {
598 val time = measureTimeMillis {
599 val one = doSomethingUsefulOne()
600 val two = doSomethingUsefulTwo()
601 println("The answer is ${one + two}")
602 }
603 println("Completed in $time ms")
604}
605```
606
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300607> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-01.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300608
609It produces something like this:
610
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300611```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300612The answer is 42
613Completed in 2017 ms
614```
615
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300616<!--- TEST FLEXIBLE_TIME -->
617
Roman Elizarov32d95322017-02-09 15:57:31 +0300618### Concurrent using async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300619
620What if there are no dependencies between invocation of `doSomethingUsefulOne` and `doSomethingUsefulTwo` and
Roman Elizarov419a6c82017-02-09 18:36:22 +0300621we want to get the answer faster, by doing both _concurrently_? This is where [async] comes to help.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300622
Roman Elizarov419a6c82017-02-09 18:36:22 +0300623Conceptually, [async] is just like [launch]. It starts a separate coroutine which is a light-weight thread
624that works concurrently with all the other coroutines. The difference is that `launch` returns a [Job] and
625does not carry any resulting value, while `async` returns a [Deferred] -- a light-weight non-blocking future
Roman Elizarov32d95322017-02-09 15:57:31 +0300626that represents a promise to provide a result later. You can use `.await()` on a deferred value to get its eventual result,
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300627but `Deferred` is also a `Job`, so you can cancel it if needed.
628
629```kotlin
630fun main(args: Array<String>) = runBlocking<Unit> {
631 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300632 val one = async(CommonPool) { doSomethingUsefulOne() }
633 val two = async(CommonPool) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300634 println("The answer is ${one.await() + two.await()}")
635 }
636 println("Completed in $time ms")
637}
638```
639
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300640> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-02.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300641
642It produces something like this:
643
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300644```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300645The answer is 42
646Completed in 1017 ms
647```
648
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300649<!--- TEST FLEXIBLE_TIME -->
650
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300651This is twice as fast, because we have concurrent execution of two coroutines.
652Note, that concurrency with coroutines is always explicit.
653
Roman Elizarov32d95322017-02-09 15:57:31 +0300654### Lazily started async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300655
Roman Elizarovecda27f2017-04-06 23:06:26 +0300656There is a laziness option to [async] with [CoroutineStart.LAZY] parameter.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300657It starts coroutine only when its result is needed by some
658[await][Deferred.await] or if a [start][Job.start] function
Roman Elizarov32d95322017-02-09 15:57:31 +0300659is invoked. Run the following example that differs from the previous one only by this option:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300660
661```kotlin
662fun main(args: Array<String>) = runBlocking<Unit> {
663 val time = measureTimeMillis {
Roman Elizarovecda27f2017-04-06 23:06:26 +0300664 val one = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulOne() }
665 val two = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300666 println("The answer is ${one.await() + two.await()}")
667 }
668 println("Completed in $time ms")
669}
670```
671
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300672> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-03.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300673
674It produces something like this:
675
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300676```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300677The answer is 42
678Completed in 2017 ms
679```
680
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300681<!--- TEST FLEXIBLE_TIME -->
682
Roman Elizarov32d95322017-02-09 15:57:31 +0300683So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
684for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
685the standard `lazy` function in cases when computation of the value involves suspending functions.
686
687### Async-style functions
688
689We can define async-style functions that invoke `doSomethingUsefulOne` and `doSomethingUsefulTwo`
Roman Elizarov419a6c82017-02-09 18:36:22 +0300690_asynchronously_ using [async] coroutine builder. It is a good style to name such functions with
Roman Elizarov32d95322017-02-09 15:57:31 +0300691either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous
692computation and one needs to use the resulting deferred value to get the result.
693
694```kotlin
695// The result type of asyncSomethingUsefulOne is Deferred<Int>
696fun asyncSomethingUsefulOne() = async(CommonPool) {
697 doSomethingUsefulOne()
698}
699
700// The result type of asyncSomethingUsefulTwo is Deferred<Int>
701fun asyncSomethingUsefulTwo() = async(CommonPool) {
702 doSomethingUsefulTwo()
703}
704```
705
706Note, that these `asyncXXX` function are **not** _suspending_ functions. They can be used from anywhere.
707However, their use always implies asynchronous (here meaning _concurrent_) execution of their action
708with the invoking code.
709
710The following example shows their use outside of coroutine:
711
712```kotlin
713// note, that we don't have `runBlocking` to the right of `main` in this example
714fun main(args: Array<String>) {
715 val time = measureTimeMillis {
716 // we can initiate async actions outside of a coroutine
717 val one = asyncSomethingUsefulOne()
718 val two = asyncSomethingUsefulTwo()
719 // but waiting for a result must involve either suspending or blocking.
720 // here we use `runBlocking { ... }` to block the main thread while waiting for the result
721 runBlocking {
722 println("The answer is ${one.await() + two.await()}")
723 }
724 }
725 println("Completed in $time ms")
726}
727```
728
729> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300730
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300731<!--- TEST FLEXIBLE_TIME
732The answer is 42
733Completed in 1085 ms
734-->
735
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300736## Coroutine context and dispatchers
737
Roman Elizarov32d95322017-02-09 15:57:31 +0300738We've already seen `launch(CommonPool) {...}`, `async(CommonPool) {...}`, `run(NonCancellable) {...}`, etc.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300739In these code snippets [CommonPool] and [NonCancellable] are _coroutine contexts_.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300740This section covers other available choices.
741
742### Dispatchers and threads
743
Roman Elizarov419a6c82017-02-09 18:36:22 +0300744Coroutine context includes a [_coroutine dispatcher_][CoroutineDispatcher] which determines what thread or threads
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300745the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
746to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:
747
748```kotlin
749fun main(args: Array<String>) = runBlocking<Unit> {
750 val jobs = arrayListOf<Job>()
751 jobs += launch(Unconfined) { // not confined -- will work with main thread
752 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
753 }
754 jobs += launch(context) { // context of the parent, runBlocking coroutine
755 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
756 }
757 jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
758 println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
759 }
760 jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
761 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
762 }
763 jobs.forEach { it.join() }
764}
765```
766
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300767> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-01.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300768
769It produces the following output (maybe in different order):
770
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300771```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300772 'Unconfined': I'm working in thread main
773 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
774 'newSTC': I'm working in thread MyOwnThread
775 'context': I'm working in thread main
776```
777
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300778<!--- TEST LINES_START_UNORDERED -->
779
Roman Elizarov419a6c82017-02-09 18:36:22 +0300780The difference between parent [context][CoroutineScope.context] and [Unconfined] context will be shown later.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300781
782### Unconfined vs confined dispatcher
783
Roman Elizarov419a6c82017-02-09 18:36:22 +0300784The [Unconfined] coroutine dispatcher starts coroutine in the caller thread, but only until the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300785first suspension point. After suspension it resumes in the thread that is fully determined by the
786suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not
787consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
788
Roman Elizarov419a6c82017-02-09 18:36:22 +0300789On the other side, [context][CoroutineScope.context] property that is available inside the block of any coroutine
790via [CoroutineScope] interface, is a reference to a context of this particular coroutine.
791This way, a parent context can be inherited. The default context of [runBlocking], in particular,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300792is confined to be invoker thread, so inheriting it has the effect of confining execution to
793this thread with a predictable FIFO scheduling.
794
795```kotlin
796fun main(args: Array<String>) = runBlocking<Unit> {
797 val jobs = arrayListOf<Job>()
798 jobs += launch(Unconfined) { // not confined -- will work with main thread
799 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarovd0021622017-03-10 15:43:38 +0300800 delay(500)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300801 println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
802 }
803 jobs += launch(context) { // context of the parent, runBlocking coroutine
804 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
805 delay(1000)
806 println(" 'context': After delay in thread ${Thread.currentThread().name}")
807 }
808 jobs.forEach { it.join() }
809}
810```
811
Roman Elizarovd0021622017-03-10 15:43:38 +0300812> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-02.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300813
814Produces the output:
815
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300816```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300817 'Unconfined': I'm working in thread main
818 'context': I'm working in thread main
819 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
820 'context': After delay in thread main
821```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300822
823<!--- TEST LINES_START -->
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300824
Roman Elizarov7c864d82017-02-27 10:17:50 +0300825So, the coroutine that had inherited `context` of `runBlocking {...}` continues to execute in the `main` thread,
Roman Elizarov419a6c82017-02-09 18:36:22 +0300826while the unconfined one had resumed in the scheduler thread that [delay] function is using.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300827
828### Debugging coroutines and threads
829
Roman Elizarov419a6c82017-02-09 18:36:22 +0300830Coroutines can suspend on one thread and resume on another thread with [Unconfined] dispatcher or
831with a multi-threaded dispatcher like [CommonPool]. Even with a single-threaded dispatcher it might be hard to
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300832figure out what coroutine was doing what, where, and when. The common approach to debugging applications with
833threads is to print the thread name in the log file on each log statement. This feature is universally supported
834by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
835`kotlinx.coroutines` includes debugging facilities to make it easier.
836
837Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
838
839```kotlin
840fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
841
842fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov32d95322017-02-09 15:57:31 +0300843 val a = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300844 log("I'm computing a piece of the answer")
845 6
846 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300847 val b = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300848 log("I'm computing another piece of the answer")
849 7
850 }
851 log("The answer is ${a.await() * b.await()}")
852}
853```
854
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300855> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300856
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300857There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300858and two coroutines computing deferred values `a` (#2) and `b` (#3).
859They are all executing in the context of `runBlocking` and are confined to the main thread.
860The output of this code is:
861
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300862```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300863[main @coroutine#2] I'm computing a piece of the answer
864[main @coroutine#3] I'm computing another piece of the answer
865[main @coroutine#1] The answer is 42
866```
867
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300868<!--- TEST -->
869
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300870The `log` function prints the name of the thread in square brackets and you can see, that it is the `main`
871thread, but the identifier of the currently executing coroutine is appended to it. This identifier
872is consecutively assigned to all created coroutines when debugging mode is turned on.
873
Roman Elizarov419a6c82017-02-09 18:36:22 +0300874You can read more about debugging facilities in the documentation for [newCoroutineContext] function.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300875
876### Jumping between threads
877
878Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
879
880```kotlin
881fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
882
883fun main(args: Array<String>) {
884 val ctx1 = newSingleThreadContext("Ctx1")
885 val ctx2 = newSingleThreadContext("Ctx2")
886 runBlocking(ctx1) {
887 log("Started in ctx1")
888 run(ctx2) {
889 log("Working in ctx2")
890 }
891 log("Back to ctx1")
892 }
893}
894```
895
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300896> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300897
Roman Elizarov419a6c82017-02-09 18:36:22 +0300898It demonstrates two new techniques. One is using [runBlocking] with an explicitly specified context, and
899the second one is using [run] function to change a context of a coroutine while still staying in the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300900same coroutine as you can see in the output below:
901
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300902```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300903[Ctx1 @coroutine#1] Started in ctx1
904[Ctx2 @coroutine#1] Working in ctx2
905[Ctx1 @coroutine#1] Back to ctx1
906```
907
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300908<!--- TEST -->
909
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300910### Job in the context
911
Roman Elizarov419a6c82017-02-09 18:36:22 +0300912The coroutine [Job] is part of its context. The coroutine can retrieve it from its own context
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300913using `context[Job]` expression:
914
915```kotlin
916fun main(args: Array<String>) = runBlocking<Unit> {
917 println("My job is ${context[Job]}")
918}
919```
920
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300921> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-05.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300922
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300923It produces somethine like
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300924
925```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300926My job is BlockingCoroutine{Active}@65ae6ba4
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300927```
928
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300929<!--- TEST lines.size == 1 && lines[0].startsWith("My job is BlockingCoroutine{Active}@") -->
930
Roman Elizarov419a6c82017-02-09 18:36:22 +0300931So, [isActive][CoroutineScope.isActive] in [CoroutineScope] is just a convenient shortcut for `context[Job]!!.isActive`.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300932
933### Children of a coroutine
934
Roman Elizarov419a6c82017-02-09 18:36:22 +0300935When [context][CoroutineScope.context] of a coroutine is used to launch another coroutine,
936the [Job] of the new coroutine becomes
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300937a _child_ of the parent coroutine's job. When the parent coroutine is cancelled, all its children
938are recursively cancelled, too.
939
940```kotlin
941fun main(args: Array<String>) = runBlocking<Unit> {
942 // start a coroutine to process some kind of incoming request
943 val request = launch(CommonPool) {
944 // it spawns two other jobs, one with its separate context
945 val job1 = launch(CommonPool) {
946 println("job1: I have my own context and execute independently!")
947 delay(1000)
948 println("job1: I am not affected by cancellation of the request")
949 }
950 // and the other inherits the parent context
951 val job2 = launch(context) {
952 println("job2: I am a child of the request coroutine")
953 delay(1000)
954 println("job2: I will not execute this line if my parent request is cancelled")
955 }
956 // request completes when both its sub-jobs complete:
957 job1.join()
958 job2.join()
959 }
960 delay(500)
961 request.cancel() // cancel processing of the request
962 delay(1000) // delay a second to see what happens
963 println("main: Who has survived request cancellation?")
964}
965```
966
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300967> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-06.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300968
969The output of this code is:
970
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300971```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300972job1: I have my own context and execute independently!
973job2: I am a child of the request coroutine
974job1: I am not affected by cancellation of the request
975main: Who has survived request cancellation?
976```
977
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300978<!--- TEST -->
979
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300980### Combining contexts
981
982Coroutine context can be combined using `+` operator. The context on the right-hand side replaces relevant entries
Roman Elizarov419a6c82017-02-09 18:36:22 +0300983of the context on the left-hand side. For example, a [Job] of the parent coroutine can be inherited, while
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300984its dispatcher replaced:
985
986```kotlin
987fun main(args: Array<String>) = runBlocking<Unit> {
988 // start a coroutine to process some kind of incoming request
989 val request = launch(context) { // use the context of `runBlocking`
990 // spawns CPU-intensive child job in CommonPool !!!
991 val job = launch(context + CommonPool) {
992 println("job: I am a child of the request coroutine, but with a different dispatcher")
993 delay(1000)
994 println("job: I will not execute this line if my parent request is cancelled")
995 }
996 job.join() // request completes when its sub-job completes
997 }
998 delay(500)
999 request.cancel() // cancel processing of the request
1000 delay(1000) // delay a second to see what happens
1001 println("main: Who has survived request cancellation?")
1002}
1003```
1004
Roman Elizarovfa7723e2017-02-06 11:17:51 +03001005> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-07.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001006
1007The expected outcome of this code is:
1008
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001009```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001010job: I am a child of the request coroutine, but with a different dispatcher
1011main: Who has survived request cancellation?
1012```
1013
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001014<!--- TEST -->
1015
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001016### Naming coroutines for debugging
1017
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001018Automatically assigned ids are good when coroutines log often and you just need to correlate log records
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001019coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
1020or doing some specific background task, it is better to name it explicitly for debugging purposes.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001021[CoroutineName] serves the same function as a thread name. It'll get displayed in the thread name that
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001022is executing this coroutine when debugging more is turned on.
1023
1024The following example demonstrates this concept:
1025
1026```kotlin
1027fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
1028
1029fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
1030 log("Started main coroutine")
1031 // run two background value computations
Roman Elizarov32d95322017-02-09 15:57:31 +03001032 val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001033 log("Computing v1")
1034 delay(500)
1035 252
1036 }
Roman Elizarov32d95322017-02-09 15:57:31 +03001037 val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001038 log("Computing v2")
1039 delay(1000)
1040 6
1041 }
1042 log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
1043}
1044```
1045
Roman Elizarovfa7723e2017-02-06 11:17:51 +03001046> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001047
1048The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
1049
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001050```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001051[main @main#1] Started main coroutine
1052[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
1053[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
1054[main @main#1] The answer for v1 / v2 = 42
1055```
Roman Elizarov1293ccd2017-02-01 18:49:54 +03001056
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001057<!--- TEST FLEXIBLE_THREAD -->
1058
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001059### Cancellation via explicit job
1060
1061Let us put our knowledge about contexts, children and jobs together. Assume that our application has
1062an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application
1063and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch
1064and update data, do animations, etc. All of these coroutines must be cancelled when activity is destroyed
1065to avoid memory leaks.
1066
1067We can manage a lifecycle of our coroutines by creating an instance of [Job] that is tied to
1068the lifecycle of our activity. A job instance is created using [Job()][Job.invoke] factory function
1069as the following example shows. We need to make sure that all the coroutines are started
1070with this job in their context and then a single invocation of [Job.cancel] terminates them all.
1071
1072```kotlin
1073fun main(args: Array<String>) = runBlocking<Unit> {
1074 val job = Job() // create a job object to manage our lifecycle
1075 // now launch ten coroutines for a demo, each working for a different time
1076 val coroutines = List(10) { i ->
1077 // they are all children of our job object
1078 launch(context + job) { // we use the context of main runBlocking thread, but with our own job object
1079 delay(i * 200L) // variable delay 0ms, 200ms, 400ms, ... etc
1080 println("Coroutine $i is done")
1081 }
1082 }
1083 println("Launched ${coroutines.size} coroutines")
1084 delay(500L) // delay for half a second
1085 println("Cancelling job!")
1086 job.cancel() // cancel our job.. !!!
1087 delay(1000L) // delay for more to see if our coroutines are still working
1088}
1089```
1090
1091> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt)
1092
1093The output of this example is:
1094
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001095```text
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001096Launched 10 coroutines
1097Coroutine 0 is done
1098Coroutine 1 is done
1099Coroutine 2 is done
1100Cancelling job!
1101```
1102
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001103<!--- TEST -->
1104
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001105As you can see, only the first three coroutines had printed a message and the others were cancelled
1106by a single invocation of `job.cancel()`. So all we need to do in our hypothetical Android
1107application is to create a parent job object when activity is created, use it for child coroutines,
1108and cancel it when activity is destroyed.
1109
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001110## Channels
Roman Elizarov7deefb82017-01-31 10:33:17 +03001111
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001112Deferred values provide a convenient way to transfer a single value between coroutines.
1113Channels provide a way to transfer a stream of values.
1114
1115<!--- INCLUDE .*/example-channel-([0-9]+).kt
1116import kotlinx.coroutines.experimental.channels.*
1117-->
1118
1119### Channel basics
1120
Roman Elizarov419a6c82017-02-09 18:36:22 +03001121A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
1122instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
1123a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001124
1125```kotlin
1126fun main(args: Array<String>) = runBlocking<Unit> {
1127 val channel = Channel<Int>()
1128 launch(CommonPool) {
1129 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
1130 for (x in 1..5) channel.send(x * x)
1131 }
1132 // here we print five received integers:
1133 repeat(5) { println(channel.receive()) }
1134 println("Done!")
1135}
1136```
1137
1138> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
1139
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001140The output of this code is:
1141
1142```text
11431
11444
11459
114616
114725
1148Done!
1149```
1150
1151<!--- TEST -->
1152
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001153### Closing and iteration over channels
1154
1155Unlike a queue, a channel can be closed to indicate that no more elements are coming.
1156On the receiver side it is convenient to use a regular `for` loop to receive elements
1157from the channel.
1158
Roman Elizarov419a6c82017-02-09 18:36:22 +03001159Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001160The iteration stops as soon as this close token is received, so there is a guarantee
1161that all previously sent elements before the close are received:
1162
1163```kotlin
1164fun main(args: Array<String>) = runBlocking<Unit> {
1165 val channel = Channel<Int>()
1166 launch(CommonPool) {
1167 for (x in 1..5) channel.send(x * x)
1168 channel.close() // we're done sending
1169 }
1170 // here we print received values using `for` loop (until the channel is closed)
1171 for (y in channel) println(y)
1172 println("Done!")
1173}
1174```
1175
1176> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
1177
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001178<!--- TEST
11791
11804
11819
118216
118325
1184Done!
1185-->
1186
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001187### Building channel producers
1188
Roman Elizarova5e653f2017-02-13 13:49:55 +03001189The pattern where a coroutine is producing a sequence of elements is quite common.
1190This is a part of _producer-consumer_ pattern that is often found in concurrent code.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001191You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
Roman Elizarova5e653f2017-02-13 13:49:55 +03001192to common sense that results must be returned from functions.
1193
Roman Elizarov86349be2017-03-17 16:47:37 +03001194There is a convenience coroutine builder named [produce] that makes it easy to do it right on producer side,
1195and an extension function [consumeEach], that can replace a `for` loop on the consumer side:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001196
1197```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001198fun produceSquares() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001199 for (x in 1..5) send(x * x)
1200}
1201
1202fun main(args: Array<String>) = runBlocking<Unit> {
1203 val squares = produceSquares()
Roman Elizarov86349be2017-03-17 16:47:37 +03001204 squares.consumeEach { println(it) }
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001205 println("Done!")
1206}
1207```
1208
1209> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
1210
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001211<!--- TEST
12121
12134
12149
121516
121625
1217Done!
1218-->
1219
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001220### Pipelines
1221
1222Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
1223
1224```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001225fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001226 var x = 1
1227 while (true) send(x++) // infinite stream of integers starting from 1
1228}
1229```
1230
Roman Elizarova5e653f2017-02-13 13:49:55 +03001231And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001232In the below example the numbers are just squared:
1233
1234```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001235fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001236 for (x in numbers) send(x * x)
1237}
1238```
1239
Roman Elizarova5e653f2017-02-13 13:49:55 +03001240The main code starts and connects the whole pipeline:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001241
1242```kotlin
1243fun main(args: Array<String>) = runBlocking<Unit> {
1244 val numbers = produceNumbers() // produces integers from 1 and on
1245 val squares = square(numbers) // squares integers
1246 for (i in 1..5) println(squares.receive()) // print first five
1247 println("Done!") // we are done
1248 squares.cancel() // need to cancel these coroutines in a larger app
1249 numbers.cancel()
1250}
1251```
1252
1253> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
1254
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001255<!--- TEST
12561
12574
12589
125916
126025
1261Done!
1262-->
1263
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001264We don't have to cancel these coroutines in this example app, because
1265[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
1266but in a larger app we'll need to stop our pipeline if we don't need it anymore.
1267Alternatively, we could have run pipeline coroutines as
1268[children of a coroutine](#children-of-a-coroutine).
1269
1270### Prime numbers with pipeline
1271
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001272Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001273of coroutines. We start with an infinite sequence of numbers. This time we introduce an
1274explicit context parameter, so that caller can control where our coroutines run:
1275
1276<!--- INCLUDE kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
1277import kotlin.coroutines.experimental.CoroutineContext
1278-->
1279
1280```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001281fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001282 var x = start
1283 while (true) send(x++) // infinite stream of integers from start
1284}
1285```
1286
1287The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1288that are divisible by the given prime number:
1289
1290```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001291fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001292 for (x in numbers) if (x % prime != 0) send(x)
1293}
1294```
1295
1296Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001297and launching new pipeline stage for each prime number found:
1298
1299```
Roman Elizarova5e653f2017-02-13 13:49:55 +03001300numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
Roman Elizarov62500ba2017-02-09 18:55:40 +03001301```
1302
1303The following example prints the first ten prime numbers,
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001304running the whole pipeline in the context of the main thread:
1305
1306```kotlin
1307fun main(args: Array<String>) = runBlocking<Unit> {
1308 var cur = numbersFrom(context, 2)
1309 for (i in 1..10) {
1310 val prime = cur.receive()
1311 println(prime)
1312 cur = filter(context, cur, prime)
1313 }
1314}
1315```
1316
1317> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
1318
1319The output of this code is:
1320
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001321```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +030013222
13233
13245
13257
132611
132713
132817
132919
133023
133129
1332```
1333
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001334<!--- TEST -->
1335
Roman Elizarova5e653f2017-02-13 13:49:55 +03001336Note, that you can build the same pipeline using `buildIterator` coroutine builder from the standard library.
1337Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001338`ReceiveChannel` with `Iterator`, and get rid of the context. You will not need `runBlocking` either.
1339However, the benefit of a pipeline that uses channels as shown above is that it can actually use
1340multiple CPU cores if you run it in [CommonPool] context.
1341
Roman Elizarova5e653f2017-02-13 13:49:55 +03001342Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
Roman Elizarov62500ba2017-02-09 18:55:40 +03001343other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
1344built using `buildSeqeunce`/`buildIterator`, because they do not allow arbitrary suspension, unlike
Roman Elizarova5e653f2017-02-13 13:49:55 +03001345`produce` which is fully asynchronous.
Roman Elizarov62500ba2017-02-09 18:55:40 +03001346
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001347### Fan-out
1348
1349Multiple coroutines may receive from the same channel, distributing work between themselves.
1350Let us start with a producer coroutine that is periodically producing integers
1351(ten numbers per second):
1352
1353```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001354fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001355 var x = 1 // start from 1
1356 while (true) {
1357 send(x++) // produce next
1358 delay(100) // wait 0.1s
1359 }
1360}
1361```
1362
1363Then we can have several processor coroutines. In this example, they just print their id and
1364received number:
1365
1366```kotlin
1367fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
Roman Elizarov86349be2017-03-17 16:47:37 +03001368 channel.consumeEach {
1369 println("Processor #$id received $it")
Roman Elizarovec9384c2017-03-02 22:09:08 +03001370 }
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001371}
1372```
1373
1374Now let us launch five processors and let them work for a second. See what happens:
1375
1376```kotlin
1377fun main(args: Array<String>) = runBlocking<Unit> {
1378 val producer = produceNumbers()
1379 repeat(5) { launchProcessor(it, producer) }
1380 delay(1000)
1381 producer.cancel() // cancel producer coroutine and thus kill them all
1382}
1383```
1384
1385> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
1386
1387The output will be similar to the the following one, albeit the processor ids that receive
1388each specific integer may be different:
1389
1390```
1391Processor #2 received 1
1392Processor #4 received 2
1393Processor #0 received 3
1394Processor #1 received 4
1395Processor #3 received 5
1396Processor #2 received 6
1397Processor #4 received 7
1398Processor #0 received 8
1399Processor #1 received 9
1400Processor #3 received 10
1401```
1402
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001403<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
1404
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001405Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1406over the channel that processor coroutines are doing.
1407
1408### Fan-in
1409
1410Multiple coroutines may send to the same channel.
1411For example, let us have a channel of strings, and a suspending function that
1412repeatedly sends a specified string to this channel with a specified delay:
1413
1414```kotlin
1415suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1416 while (true) {
1417 delay(time)
1418 channel.send(s)
1419 }
1420}
1421```
1422
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001423Now, let us see what happens if we launch a couple of coroutines sending strings
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001424(in this example we launch them in the context of the main thread):
1425
1426```kotlin
1427fun main(args: Array<String>) = runBlocking<Unit> {
1428 val channel = Channel<String>()
1429 launch(context) { sendString(channel, "foo", 200L) }
1430 launch(context) { sendString(channel, "BAR!", 500L) }
1431 repeat(6) { // receive first six
1432 println(channel.receive())
1433 }
1434}
1435```
1436
1437> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
1438
1439The output is:
1440
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001441```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001442foo
1443foo
1444BAR!
1445foo
1446foo
1447BAR!
1448```
1449
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001450<!--- TEST -->
1451
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001452### Buffered channels
1453
1454The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1455meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1456if receive is invoked first, it is suspended until send is invoked.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001457
Roman Elizarova5e653f2017-02-13 13:49:55 +03001458Both [Channel()][Channel.invoke] factory function and [produce] builder take an optional `capacity` parameter to
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001459specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1460similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1461
1462Take a look at the behavior of the following code:
1463
1464```kotlin
1465fun main(args: Array<String>) = runBlocking<Unit> {
1466 val channel = Channel<Int>(4) // create buffered channel
1467 launch(context) { // launch sender coroutine
1468 repeat(10) {
1469 println("Sending $it") // print before sending each element
1470 channel.send(it) // will suspend when buffer is full
1471 }
1472 }
1473 // don't receive anything... just wait....
1474 delay(1000)
1475}
1476```
1477
1478> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
1479
1480It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1481
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001482```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001483Sending 0
1484Sending 1
1485Sending 2
1486Sending 3
1487Sending 4
1488```
1489
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001490<!--- TEST -->
1491
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001492The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001493
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001494
1495### Channels are fair
1496
1497Send and receive operations to channels are _fair_ with respect to the order of their invocation from
1498multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
1499gets the element. In the following example two coroutines "ping" and "pong" are
1500receiving the "ball" object from the shared "table" channel.
1501
1502```kotlin
1503data class Ball(var hits: Int)
1504
1505fun main(args: Array<String>) = runBlocking<Unit> {
1506 val table = Channel<Ball>() // a shared table
1507 launch(context) { player("ping", table) }
1508 launch(context) { player("pong", table) }
1509 table.send(Ball(0)) // serve the ball
1510 delay(1000) // delay 1 second
1511 table.receive() // game over, grab the ball
1512}
1513
1514suspend fun player(name: String, table: Channel<Ball>) {
1515 for (ball in table) { // receive the ball in a loop
1516 ball.hits++
1517 println("$name $ball")
Roman Elizarovf526b132017-03-10 16:07:14 +03001518 delay(300) // wait a bit
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001519 table.send(ball) // send the ball back
1520 }
1521}
1522```
1523
1524> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-09.kt)
1525
1526The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
1527coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
1528received by the "pong" coroutine, because it was already waiting for it:
1529
1530```text
1531ping Ball(hits=1)
1532pong Ball(hits=2)
1533ping Ball(hits=3)
1534pong Ball(hits=4)
1535ping Ball(hits=5)
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001536```
1537
1538<!--- TEST -->
1539
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001540## Shared mutable state and concurrency
1541
1542Coroutines can be executed concurrently using a multi-threaded dispatcher like [CommonPool]. It presents
1543all the usual concurrency problems. The main problem being synchronization of access to **shared mutable state**.
1544Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
1545but others are unique.
1546
1547### The problem
1548
Roman Elizarov1e459602017-02-27 11:05:17 +03001549Let us launch a thousand coroutines all doing the same action thousand times (for a total of a million executions).
1550We'll also measure their completion time for further comparisons:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001551
Roman Elizarov43e90112017-05-10 11:25:20 +03001552<!--- INCLUDE .*/example-sync-([0-9a-z]+).kt
Roman Elizarov1e459602017-02-27 11:05:17 +03001553import kotlin.coroutines.experimental.CoroutineContext
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001554import kotlin.system.measureTimeMillis
1555-->
1556
Roman Elizarov1e459602017-02-27 11:05:17 +03001557<!--- INCLUDE .*/example-sync-03.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001558import java.util.concurrent.atomic.AtomicInteger
1559-->
1560
Roman Elizarov1e459602017-02-27 11:05:17 +03001561<!--- INCLUDE .*/example-sync-06.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001562import kotlinx.coroutines.experimental.sync.Mutex
1563-->
1564
Roman Elizarov1e459602017-02-27 11:05:17 +03001565<!--- INCLUDE .*/example-sync-07.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001566import kotlinx.coroutines.experimental.channels.*
1567-->
1568
1569```kotlin
Roman Elizarov1e459602017-02-27 11:05:17 +03001570suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
1571 val n = 1000 // number of coroutines to launch
1572 val k = 1000 // times an action is repeated by each coroutine
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001573 val time = measureTimeMillis {
1574 val jobs = List(n) {
Roman Elizarov1e459602017-02-27 11:05:17 +03001575 launch(context) {
1576 repeat(k) { action() }
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001577 }
1578 }
1579 jobs.forEach { it.join() }
1580 }
Roman Elizarov1e459602017-02-27 11:05:17 +03001581 println("Completed ${n * k} actions in $time ms")
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001582}
1583```
1584
Roman Elizarov43e90112017-05-10 11:25:20 +03001585<!--- INCLUDE .*/example-sync-([0-9a-z]+).kt -->
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001586
Roman Elizarov1e459602017-02-27 11:05:17 +03001587We start with a very simple action that increments a shared mutable variable using
1588multi-threaded [CommonPool] context.
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001589
1590```kotlin
1591var counter = 0
1592
1593fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001594 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001595 counter++
1596 }
1597 println("Counter = $counter")
1598}
1599```
1600
1601> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt)
1602
Roman Elizarov1e459602017-02-27 11:05:17 +03001603<!--- TEST LINES_START
1604Completed 1000000 actions in
1605Counter =
1606-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001607
Roman Elizarov1e459602017-02-27 11:05:17 +03001608What does it print at the end? It is highly unlikely to ever print "Counter = 1000000", because a thousand coroutines
1609increment the `counter` concurrently from multiple threads without any synchronization.
1610
Roman Elizarov43e90112017-05-10 11:25:20 +03001611> Note: if you have an old system with 2 or fewer CPUs, then you _will_ consistently see 1000000, because
1612`CommonPool` is running in only one thread in this case. To reproduce the problem you'll need to make the
1613following change:
1614
1615```kotlin
1616val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
1617var counter = 0
1618
1619fun main(args: Array<String>) = runBlocking<Unit> {
1620 massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
1621 counter++
1622 }
1623 println("Counter = $counter")
1624}
1625```
1626
1627> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01b.kt)
1628
1629<!--- TEST LINES_START
1630Completed 1000000 actions in
1631Counter =
1632-->
1633
Roman Elizarov1e459602017-02-27 11:05:17 +03001634### Volatiles are of no help
1635
1636There is common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
1637
1638```kotlin
1639@Volatile // in Kotlin `volatile` is an annotation
1640var counter = 0
1641
1642fun main(args: Array<String>) = runBlocking<Unit> {
1643 massiveRun(CommonPool) {
1644 counter++
1645 }
1646 println("Counter = $counter")
1647}
1648```
1649
1650> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt)
1651
1652<!--- TEST LINES_START
1653Completed 1000000 actions in
1654Counter =
1655-->
1656
1657This code works slower, but we still don't get "Counter = 1000000" at the end, because volatile variables guarantee
1658linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but
1659do not provide atomicity of larger actions (increment in our case).
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001660
1661### Thread-safe data structures
1662
1663The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
1664linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
1665operations that needs to be performed on a shared state.
Roman Elizarov1e459602017-02-27 11:05:17 +03001666In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001667
1668```kotlin
1669var counter = AtomicInteger()
1670
1671fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001672 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001673 counter.incrementAndGet()
1674 }
1675 println("Counter = ${counter.get()}")
1676}
1677```
1678
Roman Elizarov1e459602017-02-27 11:05:17 +03001679> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001680
Roman Elizarov1e459602017-02-27 11:05:17 +03001681<!--- TEST ARBITRARY_TIME
1682Completed 1000000 actions in xxx ms
1683Counter = 1000000
1684-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001685
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001686This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
1687standard data structures and basic operations on them. However, it does not easily scale to complex
1688state or to complex operations that do not have ready-to-use thread-safe implementations.
1689
Roman Elizarov1e459602017-02-27 11:05:17 +03001690### Thread confinement fine-grained
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001691
Roman Elizarov1e459602017-02-27 11:05:17 +03001692_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001693state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
1694the single event-dispatch/application thread. It is easy to apply with coroutines by using a
1695single-threaded context:
1696
1697```kotlin
1698val counterContext = newSingleThreadContext("CounterContext")
1699var counter = 0
1700
1701fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001702 massiveRun(CommonPool) { // run each coroutine in CommonPool
1703 run(counterContext) { // but confine each increment to the single-threaded context
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001704 counter++
1705 }
1706 }
1707 println("Counter = $counter")
1708}
1709```
1710
Roman Elizarov1e459602017-02-27 11:05:17 +03001711> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001712
Roman Elizarov1e459602017-02-27 11:05:17 +03001713<!--- TEST ARBITRARY_TIME
1714Completed 1000000 actions in xxx ms
1715Counter = 1000000
1716-->
1717
1718This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches
1719from multi-threaded `CommonPool` context to the single-threaded context using [run] block.
1720
1721### Thread confinement coarse-grained
1722
1723In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic
1724are confined to the single thread. The following example does it like that, running each coroutine in
1725the single-threaded context to start with.
1726
1727```kotlin
1728val counterContext = newSingleThreadContext("CounterContext")
1729var counter = 0
1730
1731fun main(args: Array<String>) = runBlocking<Unit> {
1732 massiveRun(counterContext) { // run each coroutine in the single-threaded context
1733 counter++
1734 }
1735 println("Counter = $counter")
1736}
1737```
1738
1739> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt)
1740
1741<!--- TEST ARBITRARY_TIME
1742Completed 1000000 actions in xxx ms
1743Counter = 1000000
1744-->
1745
1746This now works much faster and produces correct result.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001747
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001748### Mutual exclusion
1749
1750Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
1751that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
1752Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
1753delimit a critical section. The key difference is that `Mutex.lock` is a suspending function. It does not block a thread.
1754
1755```kotlin
1756val mutex = Mutex()
1757var counter = 0
1758
1759fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001760 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001761 mutex.lock()
1762 try { counter++ }
1763 finally { mutex.unlock() }
1764 }
1765 println("Counter = $counter")
1766}
1767```
1768
Roman Elizarov1e459602017-02-27 11:05:17 +03001769> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001770
Roman Elizarov1e459602017-02-27 11:05:17 +03001771<!--- TEST ARBITRARY_TIME
1772Completed 1000000 actions in xxx ms
1773Counter = 1000000
1774-->
1775
1776The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations
1777where you absolutely must modify some shared state periodically, but there is no natural thread that this state
1778is confined to.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001779
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001780### Actors
1781
1782An actor is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine,
1783and a channel to communicate with other coroutines. A simple actor can be written as a function,
1784but an actor with a complex state is better suited for a class.
1785
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001786There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its
1787scope to receive messages from and combines the send channel into the resulting job object, so that a
1788single reference to the actor can be carried around as its handle.
1789
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001790```kotlin
1791// Message types for counterActor
1792sealed class CounterMsg
1793object IncCounter : CounterMsg() // one-way message to increment counter
1794class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply
1795
1796// This function launches a new counter actor
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001797fun counterActor() = actor<CounterMsg>(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001798 var counter = 0 // actor state
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001799 for (msg in channel) { // iterate over incoming messages
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001800 when (msg) {
1801 is IncCounter -> counter++
1802 is GetCounter -> msg.response.send(counter)
1803 }
1804 }
1805}
1806
1807fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001808 val counter = counterActor() // create the actor
Roman Elizarov1e459602017-02-27 11:05:17 +03001809 massiveRun(CommonPool) {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001810 counter.send(IncCounter)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001811 }
1812 val response = Channel<Int>()
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001813 counter.send(GetCounter(response))
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001814 println("Counter = ${response.receive()}")
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001815 counter.close() // shutdown the actor
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001816}
1817```
1818
Roman Elizarov1e459602017-02-27 11:05:17 +03001819> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001820
Roman Elizarov1e459602017-02-27 11:05:17 +03001821<!--- TEST ARBITRARY_TIME
1822Completed 1000000 actions in xxx ms
1823Counter = 1000000
1824-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001825
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001826It does not matter (for correctness) what context the actor itself is executed in. An actor is
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001827a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
1828works as a solution to the problem of shared mutable state.
1829
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001830Actor is more efficient than locking under load, because in this case it always has work to do and it does not
1831have to switch to a different context at all.
1832
1833> Note, that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated
1834 with the channel that it receives messages from, while a producer is associated with the channel that it
1835 sends elements to.
Roman Elizarov1e459602017-02-27 11:05:17 +03001836
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001837## Select expression
1838
Roman Elizarova84730b2017-02-22 11:58:50 +03001839Select expression makes it possible to await multiple suspending functions simultaneously and _select_
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001840the first one that becomes available.
1841
1842<!--- INCLUDE .*/example-select-([0-9]+).kt
1843import kotlinx.coroutines.experimental.channels.*
1844import kotlinx.coroutines.experimental.selects.*
1845-->
1846
1847### Selecting from channels
1848
Roman Elizarov57857202017-03-02 23:17:25 +03001849Let us have two producers of strings: `fizz` and `buzz`. The `fizz` produces "Fizz" string every 300 ms:
1850
1851<!--- INCLUDE .*/example-select-01.kt
1852import kotlin.coroutines.experimental.CoroutineContext
1853-->
1854
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001855```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001856fun fizz(context: CoroutineContext) = produce<String>(context) {
1857 while (true) { // sends "Fizz" every 300 ms
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001858 delay(300)
1859 send("Fizz")
1860 }
1861}
1862```
1863
Roman Elizarov57857202017-03-02 23:17:25 +03001864And the `buzz` produces "Buzz!" string every 500 ms:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001865
1866```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001867fun buzz(context: CoroutineContext) = produce<String>(context) {
1868 while (true) { // sends "Buzz!" every 500 ms
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001869 delay(500)
1870 send("Buzz!")
1871 }
1872}
1873```
1874
1875Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the
1876other. But [select] expression allows us to receive from _both_ simultaneously using its
1877[onReceive][SelectBuilder.onReceive] clauses:
1878
1879```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001880suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001881 select<Unit> { // <Unit> means that this select expression does not produce any result
1882 fizz.onReceive { value -> // this is the first select clause
1883 println("fizz -> '$value'")
1884 }
1885 buzz.onReceive { value -> // this is the second select clause
1886 println("buzz -> '$value'")
1887 }
1888 }
1889}
1890```
1891
Roman Elizarov57857202017-03-02 23:17:25 +03001892Let us run it all seven times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001893
1894```kotlin
1895fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov57857202017-03-02 23:17:25 +03001896 val fizz = fizz(context)
1897 val buzz = buzz(context)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001898 repeat(7) {
Roman Elizarov57857202017-03-02 23:17:25 +03001899 selectFizzBuzz(fizz, buzz)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001900 }
1901}
1902```
1903
1904> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt)
1905
1906The result of this code is:
1907
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001908```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001909fizz -> 'Fizz'
1910buzz -> 'Buzz!'
1911fizz -> 'Fizz'
1912fizz -> 'Fizz'
1913buzz -> 'Buzz!'
1914fizz -> 'Fizz'
1915buzz -> 'Buzz!'
1916```
1917
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001918<!--- TEST -->
1919
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001920### Selecting on close
1921
1922The [onReceive][SelectBuilder.onReceive] clause in `select` fails when the channel is closed and the corresponding
1923`select` throws an exception. We can use [onReceiveOrNull][SelectBuilder.onReceiveOrNull] clause to perform a
Roman Elizarova84730b2017-02-22 11:58:50 +03001924specific action when the channel is closed. The following example also shows that `select` is an expression that returns
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001925the result of its selected clause:
1926
1927```kotlin
1928suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
1929 select<String> {
1930 a.onReceiveOrNull { value ->
1931 if (value == null)
1932 "Channel 'a' is closed"
1933 else
1934 "a -> '$value'"
1935 }
1936 b.onReceiveOrNull { value ->
1937 if (value == null)
1938 "Channel 'b' is closed"
1939 else
1940 "b -> '$value'"
1941 }
1942 }
1943```
1944
Roman Elizarova84730b2017-02-22 11:58:50 +03001945Let's use it with channel `a` that produces "Hello" string four times and
1946channel `b` that produces "World" four times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001947
1948```kotlin
1949fun main(args: Array<String>) = runBlocking<Unit> {
1950 // we are using the context of the main thread in this example for predictability ...
1951 val a = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001952 repeat(4) { send("Hello $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001953 }
1954 val b = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001955 repeat(4) { send("World $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001956 }
1957 repeat(8) { // print first eight results
1958 println(selectAorB(a, b))
1959 }
1960}
1961```
1962
1963> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt)
1964
Roman Elizarova84730b2017-02-22 11:58:50 +03001965The result of this code is quite interesting, so we'll analyze it in mode detail:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001966
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001967```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001968a -> 'Hello 0'
1969a -> 'Hello 1'
1970b -> 'World 0'
1971a -> 'Hello 2'
1972a -> 'Hello 3'
1973b -> 'World 1'
1974Channel 'a' is closed
1975Channel 'a' is closed
1976```
1977
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001978<!--- TEST -->
1979
Roman Elizarova84730b2017-02-22 11:58:50 +03001980There are couple of observations to make out of it.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001981
1982First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time,
1983the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel,
Roman Elizarova84730b2017-02-22 11:58:50 +03001984being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001985time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
1986
1987The second observation, is that [onReceiveOrNull][SelectBuilder.onReceiveOrNull] gets immediately selected when the
1988channel is already closed.
1989
1990### Selecting to send
1991
1992Select expression has [onSend][SelectBuilder.onSend] clause that can be used for a great good in combination
1993with a biased nature of selection.
1994
Roman Elizarova84730b2017-02-22 11:58:50 +03001995Let us write an example of producer of integers that sends its values to a `side` channel when
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001996the consumers on its primary channel cannot keep up with it:
1997
1998```kotlin
1999fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
2000 for (num in 1..10) { // produce 10 numbers from 1 to 10
2001 delay(100) // every 100 ms
2002 select<Unit> {
Roman Elizarova84730b2017-02-22 11:58:50 +03002003 onSend(num) {} // Send to the primary channel
2004 side.onSend(num) {} // or to the side channel
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002005 }
2006 }
2007}
2008```
2009
2010Consumer is going to be quite slow, taking 250 ms to process each number:
2011
2012```kotlin
2013fun main(args: Array<String>) = runBlocking<Unit> {
2014 val side = Channel<Int>() // allocate side channel
2015 launch(context) { // this is a very fast consumer for the side channel
Roman Elizarov86349be2017-03-17 16:47:37 +03002016 side.consumeEach { println("Side channel has $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002017 }
Roman Elizarov86349be2017-03-17 16:47:37 +03002018 produceNumbers(side).consumeEach {
2019 println("Consuming $it")
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002020 delay(250) // let us digest the consumed number properly, do not hurry
2021 }
2022 println("Done consuming")
2023}
2024```
2025
2026> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt)
2027
2028So let us see what happens:
2029
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002030```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002031Consuming 1
2032Side channel has 2
2033Side channel has 3
2034Consuming 4
2035Side channel has 5
2036Side channel has 6
2037Consuming 7
2038Side channel has 8
2039Side channel has 9
2040Consuming 10
2041Done consuming
2042```
2043
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002044<!--- TEST -->
2045
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002046### Selecting deferred values
2047
Roman Elizarova84730b2017-02-22 11:58:50 +03002048Deferred values can be selected using [onAwait][SelectBuilder.onAwait] clause.
2049Let us start with an async function that returns a deferred string value after
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002050a random delay:
2051
2052<!--- INCLUDE .*/example-select-04.kt
2053import java.util.*
2054-->
2055
2056```kotlin
2057fun asyncString(time: Int) = async(CommonPool) {
2058 delay(time.toLong())
2059 "Waited for $time ms"
2060}
2061```
2062
Roman Elizarova84730b2017-02-22 11:58:50 +03002063Let us start a dozen of them with a random delay.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002064
2065```kotlin
2066fun asyncStringsList(): List<Deferred<String>> {
2067 val random = Random(3)
Roman Elizarova84730b2017-02-22 11:58:50 +03002068 return List(12) { asyncString(random.nextInt(1000)) }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002069}
2070```
2071
Roman Elizarova84730b2017-02-22 11:58:50 +03002072Now the main function awaits for the first of them to complete and counts the number of deferred values
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002073that are still active. Note, that we've used here the fact that `select` expression is a Kotlin DSL,
Roman Elizarova84730b2017-02-22 11:58:50 +03002074so we can provide clauses for it using an arbitrary code. In this case we iterate over a list
2075of deferred values to provide `onAwait` clause for each deferred value.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002076
2077```kotlin
2078fun main(args: Array<String>) = runBlocking<Unit> {
2079 val list = asyncStringsList()
2080 val result = select<String> {
2081 list.withIndex().forEach { (index, deferred) ->
2082 deferred.onAwait { answer ->
2083 "Deferred $index produced answer '$answer'"
2084 }
2085 }
2086 }
2087 println(result)
Roman Elizarov7c864d82017-02-27 10:17:50 +03002088 val countActive = list.count { it.isActive }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002089 println("$countActive coroutines are still active")
2090}
2091```
2092
2093> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-04.kt)
2094
2095The output is:
2096
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002097```text
Roman Elizarova84730b2017-02-22 11:58:50 +03002098Deferred 4 produced answer 'Waited for 128 ms'
Roman Elizarovd4dcbe22017-02-22 09:57:46 +0300209911 coroutines are still active
2100```
2101
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002102<!--- TEST -->
2103
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002104### Switch over a channel of deferred values
2105
Roman Elizarova84730b2017-02-22 11:58:50 +03002106Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
2107deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002108[onReceiveOrNull][SelectBuilder.onReceiveOrNull] and [onAwait][SelectBuilder.onAwait] clauses in the same `select`:
2109
2110```kotlin
2111fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String>(CommonPool) {
Roman Elizarova84730b2017-02-22 11:58:50 +03002112 var current = input.receive() // start with first received deferred value
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002113 while (isActive) { // loop while not cancelled/closed
2114 val next = select<Deferred<String>?> { // return next deferred value from this select or null
2115 input.onReceiveOrNull { update ->
2116 update // replaces next value to wait
2117 }
2118 current.onAwait { value ->
2119 send(value) // send value that current deferred has produced
2120 input.receiveOrNull() // and use the next deferred from the input channel
2121 }
2122 }
2123 if (next == null) {
2124 println("Channel was closed")
2125 break // out of loop
2126 } else {
2127 current = next
2128 }
2129 }
2130}
2131```
2132
2133To test it, we'll use a simple async function that resolves to a specified string after a specified time:
2134
2135```kotlin
2136fun asyncString(str: String, time: Long) = async(CommonPool) {
2137 delay(time)
2138 str
2139}
2140```
2141
2142The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
2143data to it:
2144
2145```kotlin
2146fun main(args: Array<String>) = runBlocking<Unit> {
2147 val chan = Channel<Deferred<String>>() // the channel for test
Roman Elizarova84730b2017-02-22 11:58:50 +03002148 launch(context) { // launch printing coroutine
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002149 for (s in switchMapDeferreds(chan))
2150 println(s) // print each received string
2151 }
2152 chan.send(asyncString("BEGIN", 100))
2153 delay(200) // enough time for "BEGIN" to be produced
2154 chan.send(asyncString("Slow", 500))
Roman Elizarova84730b2017-02-22 11:58:50 +03002155 delay(100) // not enough time to produce slow
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002156 chan.send(asyncString("Replace", 100))
Roman Elizarova84730b2017-02-22 11:58:50 +03002157 delay(500) // give it time before the last one
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002158 chan.send(asyncString("END", 500))
2159 delay(1000) // give it time to process
Roman Elizarova84730b2017-02-22 11:58:50 +03002160 chan.close() // close the channel ...
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002161 delay(500) // and wait some time to let it finish
2162}
2163```
2164
2165> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-05.kt)
2166
2167The result of this code:
2168
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002169```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002170BEGIN
2171Replace
2172END
2173Channel was closed
2174```
2175
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002176<!--- TEST -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002177
Roman Elizarov8db17332017-03-09 12:40:45 +03002178## Further reading
2179
2180* [Guide to UI programming with coroutines](ui/coroutines-guide-ui.md)
Roman Elizarov8a4a8e12017-03-09 19:52:58 +03002181* [Guide to reactive streams with coroutines](reactive/coroutines-guide-reactive.md)
Roman Elizarov8db17332017-03-09 12:40:45 +03002182* [Coroutines design document (KEEP)](https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md)
2183* [Full kotlinx.coroutines API reference](http://kotlin.github.io/kotlinx.coroutines)
2184
Roman Elizarove0c817d2017-02-10 10:22:01 +03002185<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core -->
2186<!--- DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
2187<!--- INDEX kotlinx.coroutines.experimental -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002188[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
2189[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/delay.html
2190[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
2191[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
2192[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-cancellation-exception.html
2193[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002194[CoroutineScope.isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/is-active.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002195[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
2196[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
2197[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
2198[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
2199[async]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/async.html
2200[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
Roman Elizarovecda27f2017-04-06 23:06:26 +03002201[CoroutineStart.LAZY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-start/-l-a-z-y.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002202[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html
2203[Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/start.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002204[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
2205[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002206[CoroutineScope.context]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/context.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002207[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002208[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002209[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002210[Job.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/invoke.html
2211[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/cancel.html
Roman Elizarovf5bc0472017-02-22 11:38:13 +03002212<!--- INDEX kotlinx.coroutines.experimental.sync -->
2213[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002214[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/lock.html
2215[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/unlock.html
Roman Elizarove0c817d2017-02-10 10:22:01 +03002216<!--- INDEX kotlinx.coroutines.experimental.channels -->
Roman Elizarov419a6c82017-02-09 18:36:22 +03002217[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002218[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
2219[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
2220[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html
Roman Elizarova5e653f2017-02-13 13:49:55 +03002221[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
Roman Elizarov86349be2017-03-17 16:47:37 +03002222[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002223[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/invoke.html
Roman Elizarovc0e19f82017-02-27 11:59:14 +03002224[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002225<!--- INDEX kotlinx.coroutines.experimental.selects -->
2226[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002227[SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive.html
2228[SelectBuilder.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive-or-null.html
2229[SelectBuilder.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-send.html
2230[SelectBuilder.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-await.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002231<!--- END -->