blob: a8f69bed83b5842fcc6b7427fdabc54ddfea85b4 [file] [log] [blame] [view]
Roman Elizarova5e653f2017-02-13 13:49:55 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9]+)\.kt
2/*
3 * Copyright 2016-2017 JetBrains s.r.o.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
Roman Elizarovf16fd272017-02-07 11:26:00 +030017
Roman Elizarova5e653f2017-02-13 13:49:55 +030018// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
19package guide.$$1.example$$2
Roman Elizarovf16fd272017-02-07 11:26:00 +030020
Roman Elizarova5e653f2017-02-13 13:49:55 +030021import kotlinx.coroutines.experimental.*
Roman Elizarovf16fd272017-02-07 11:26:00 +030022-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +030023<!--- KNIT kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
24<!--- TEST_OUT kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
25// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
26package guide.test
27
28import org.junit.Test
29
30class GuideTest {
31-->
Roman Elizarovf16fd272017-02-07 11:26:00 +030032
Roman Elizarov7deefb82017-01-31 10:33:17 +030033# Guide to kotlinx.coroutines by example
34
35This is a short guide on core features of `kotlinx.coroutines` with a series of examples.
36
Roman Elizarov1293ccd2017-02-01 18:49:54 +030037## Table of contents
38
Roman Elizarovfa7723e2017-02-06 11:17:51 +030039<!--- TOC -->
40
Roman Elizarov1293ccd2017-02-01 18:49:54 +030041* [Coroutine basics](#coroutine-basics)
42 * [Your first coroutine](#your-first-coroutine)
43 * [Bridging blocking and non-blocking worlds](#bridging-blocking-and-non-blocking-worlds)
44 * [Waiting for a job](#waiting-for-a-job)
45 * [Extract function refactoring](#extract-function-refactoring)
46 * [Coroutines ARE light-weight](#coroutines-are-light-weight)
47 * [Coroutines are like daemon threads](#coroutines-are-like-daemon-threads)
48* [Cancellation and timeouts](#cancellation-and-timeouts)
49 * [Cancelling coroutine execution](#cancelling-coroutine-execution)
50 * [Cancellation is cooperative](#cancellation-is-cooperative)
51 * [Making computation code cancellable](#making-computation-code-cancellable)
52 * [Closing resources with finally](#closing-resources-with-finally)
53 * [Run non-cancellable block](#run-non-cancellable-block)
54 * [Timeout](#timeout)
55* [Composing suspending functions](#composing-suspending-functions)
56 * [Sequential by default](#sequential-by-default)
Roman Elizarov32d95322017-02-09 15:57:31 +030057 * [Concurrent using async](#concurrent-using-async)
58 * [Lazily started async](#lazily-started-async)
59 * [Async-style functions](#async-style-functions)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030060* [Coroutine context and dispatchers](#coroutine-context-and-dispatchers)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030061 * [Dispatchers and threads](#dispatchers-and-threads)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030062 * [Unconfined vs confined dispatcher](#unconfined-vs-confined-dispatcher)
63 * [Debugging coroutines and threads](#debugging-coroutines-and-threads)
64 * [Jumping between threads](#jumping-between-threads)
65 * [Job in the context](#job-in-the-context)
66 * [Children of a coroutine](#children-of-a-coroutine)
67 * [Combining contexts](#combining-contexts)
68 * [Naming coroutines for debugging](#naming-coroutines-for-debugging)
Roman Elizarov2fd7cb32017-02-11 23:18:59 +030069 * [Cancellation via explicit job](#cancellation-via-explicit-job)
Roman Elizarovb7721cf2017-02-03 19:23:08 +030070* [Channels](#channels)
71 * [Channel basics](#channel-basics)
72 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
73 * [Building channel producers](#building-channel-producers)
74 * [Pipelines](#pipelines)
75 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
76 * [Fan-out](#fan-out)
77 * [Fan-in](#fan-in)
78 * [Buffered channels](#buffered-channels)
Roman Elizarovb0517ba2017-02-27 14:03:14 +030079 * [Channels are fair](#channels-are-fair)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030080* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
81 * [The problem](#the-problem)
Roman Elizarov1e459602017-02-27 11:05:17 +030082 * [Volatiles are of no help](#volatiles-are-of-no-help)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030083 * [Thread-safe data structures](#thread-safe-data-structures)
Roman Elizarov1e459602017-02-27 11:05:17 +030084 * [Thread confinement fine-grained](#thread-confinement-fine-grained)
85 * [Thread confinement coarse-grained](#thread-confinement-coarse-grained)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030086 * [Mutual exclusion](#mutual-exclusion)
87 * [Actors](#actors)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +030088* [Select expression](#select-expression)
89 * [Selecting from channels](#selecting-from-channels)
90 * [Selecting on close](#selecting-on-close)
91 * [Selecting to send](#selecting-to-send)
92 * [Selecting deferred values](#selecting-deferred-values)
93 * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030094
Roman Elizarova5e653f2017-02-13 13:49:55 +030095<!--- END_TOC -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +030096
97## Coroutine basics
98
99This section covers basic coroutine concepts.
100
101### Your first coroutine
Roman Elizarov7deefb82017-01-31 10:33:17 +0300102
103Run the following code:
104
105```kotlin
106fun main(args: Array<String>) {
107 launch(CommonPool) { // create new coroutine in common thread pool
108 delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
109 println("World!") // print after delay
110 }
111 println("Hello,") // main function continues while coroutine is delayed
112 Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
113}
114```
115
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300116> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300117
118Run this code:
119
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300120```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300121Hello,
122World!
123```
124
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300125<!--- TEST -->
126
Roman Elizarov419a6c82017-02-09 18:36:22 +0300127Essentially, coroutines are light-weight threads.
128They are launched with [launch] _coroutine builder_.
129You can achieve the same result replacing
Roman Elizarov7deefb82017-01-31 10:33:17 +0300130`launch(CommonPool) { ... }` with `thread { ... }` and `delay(...)` with `Thread.sleep(...)`. Try it.
131
132If you start by replacing `launch(CommonPool)` by `thread`, the compiler produces the following error:
133
134```
135Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
136```
137
Roman Elizarov419a6c82017-02-09 18:36:22 +0300138That is because [delay] is a special _suspending function_ that does not block a thread, but _suspends_
Roman Elizarov7deefb82017-01-31 10:33:17 +0300139coroutine and it can be only used from a coroutine.
140
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300141### Bridging blocking and non-blocking worlds
Roman Elizarov7deefb82017-01-31 10:33:17 +0300142
143The first example mixes _non-blocking_ `delay(...)` and _blocking_ `Thread.sleep(...)` in the same
144code of `main` function. It is easy to get lost. Let's cleanly separate blocking and non-blocking
Roman Elizarov419a6c82017-02-09 18:36:22 +0300145worlds by using [runBlocking]:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300146
147```kotlin
148fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
149 launch(CommonPool) { // create new coroutine in common thread pool
150 delay(1000L)
151 println("World!")
152 }
153 println("Hello,") // main coroutine continues while child is delayed
154 delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
155}
156```
157
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300158> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300159
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300160<!--- TEST
161Hello,
162World!
163-->
164
Roman Elizarov419a6c82017-02-09 18:36:22 +0300165The result is the same, but this code uses only non-blocking [delay].
Roman Elizarov7deefb82017-01-31 10:33:17 +0300166
167`runBlocking { ... }` works as an adaptor that is used here to start the top-level main coroutine.
168The regular code outside of `runBlocking` _blocks_, until the coroutine inside `runBlocking` is active.
169
170This is also a way to write unit-tests for suspending functions:
171
172```kotlin
173class MyTest {
174 @Test
175 fun testMySuspendingFunction() = runBlocking<Unit> {
176 // here we can use suspending functions using any assertion style that we like
177 }
178}
179```
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300180
181<!--- CLEAR -->
Roman Elizarov7deefb82017-01-31 10:33:17 +0300182
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300183### Waiting for a job
Roman Elizarov7deefb82017-01-31 10:33:17 +0300184
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300185Delaying for a time while another coroutine is working is not a good approach. Let's explicitly
Roman Elizarov419a6c82017-02-09 18:36:22 +0300186wait (in a non-blocking way) until the background [Job] that we have launched is complete:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300187
188```kotlin
189fun main(args: Array<String>) = runBlocking<Unit> {
190 val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
191 delay(1000L)
192 println("World!")
193 }
194 println("Hello,")
195 job.join() // wait until child coroutine completes
196}
197```
198
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300199> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300200
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300201<!--- TEST
202Hello,
203World!
204-->
205
Roman Elizarov7deefb82017-01-31 10:33:17 +0300206Now the result is still the same, but the code of the main coroutine is not tied to the duration of
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300207the background job in any way. Much better.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300208
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300209### Extract function refactoring
Roman Elizarov7deefb82017-01-31 10:33:17 +0300210
211Let's extract the block of code inside `launch(CommonPool} { ... }` into a separate function. When you
212perform "Extract function" refactoring on this code you get a new function with `suspend` modifier.
213That is your first _suspending function_. Suspending functions can be used inside coroutines
214just like regular functions, but their additional feature is that they can, in turn,
215use other suspending functions, like `delay` in this example, to _suspend_ execution of a coroutine.
216
217```kotlin
218fun main(args: Array<String>) = runBlocking<Unit> {
219 val job = launch(CommonPool) { doWorld() }
220 println("Hello,")
221 job.join()
222}
223
224// this is your first suspending function
225suspend fun doWorld() {
226 delay(1000L)
227 println("World!")
228}
229```
230
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300231> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-04.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300232
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300233<!--- TEST
234Hello,
235World!
236-->
237
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300238### Coroutines ARE light-weight
Roman Elizarov7deefb82017-01-31 10:33:17 +0300239
240Run the following code:
241
242```kotlin
243fun main(args: Array<String>) = runBlocking<Unit> {
244 val jobs = List(100_000) { // create a lot of coroutines and list their jobs
245 launch(CommonPool) {
246 delay(1000L)
247 print(".")
248 }
249 }
250 jobs.forEach { it.join() } // wait for all jobs to complete
251}
252```
253
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300254> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-05.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300255
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300256<!--- TEST lines.size == 1 && lines[0] == ".".repeat(100_000) -->
257
Roman Elizarov7deefb82017-01-31 10:33:17 +0300258It starts 100K coroutines and, after a second, each coroutine prints a dot.
259Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
260
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300261### Coroutines are like daemon threads
Roman Elizarov7deefb82017-01-31 10:33:17 +0300262
263The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300264returns from the main function after some delay:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300265
266```kotlin
267fun main(args: Array<String>) = runBlocking<Unit> {
268 launch(CommonPool) {
269 repeat(1000) { i ->
270 println("I'm sleeping $i ...")
271 delay(500L)
272 }
273 }
274 delay(1300L) // just quit after delay
275}
276```
277
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300278> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-06.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300279
280You can run and see that it prints three lines and terminates:
281
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300282```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300283I'm sleeping 0 ...
284I'm sleeping 1 ...
285I'm sleeping 2 ...
286```
287
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300288<!--- TEST -->
289
Roman Elizarov7deefb82017-01-31 10:33:17 +0300290Active coroutines do not keep the process alive. They are like daemon threads.
291
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300292## Cancellation and timeouts
293
294This section covers coroutine cancellation and timeouts.
295
296### Cancelling coroutine execution
Roman Elizarov7deefb82017-01-31 10:33:17 +0300297
298In small application the return from "main" method might sound like a good idea to get all coroutines
299implicitly terminated. In a larger, long-running application, you need finer-grained control.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300300The [launch] function returns a [Job] that can be used to cancel running coroutine:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300301
302```kotlin
303fun main(args: Array<String>) = runBlocking<Unit> {
304 val job = launch(CommonPool) {
305 repeat(1000) { i ->
306 println("I'm sleeping $i ...")
307 delay(500L)
308 }
309 }
310 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300311 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300312 job.cancel() // cancels the job
313 delay(1300L) // delay a bit to ensure it was cancelled indeed
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300314 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300315}
316```
317
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300318> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300319
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300320It produces the following output:
321
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300322```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300323I'm sleeping 0 ...
324I'm sleeping 1 ...
325I'm sleeping 2 ...
326main: I'm tired of waiting!
327main: Now I can quit.
328```
329
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300330<!--- TEST -->
331
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300332As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
333
334### Cancellation is cooperative
Roman Elizarov7deefb82017-01-31 10:33:17 +0300335
Tair Rzayevaf734622017-02-01 22:30:16 +0200336Coroutine cancellation is _cooperative_. A coroutine code has to cooperate to be cancellable.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300337All the suspending functions in `kotlinx.coroutines` are _cancellable_. They check for cancellation of
Roman Elizarov419a6c82017-02-09 18:36:22 +0300338coroutine and throw [CancellationException] when cancelled. However, if a coroutine is working in
Roman Elizarov7deefb82017-01-31 10:33:17 +0300339a computation and does not check for cancellation, then it cannot be cancelled, like the following
340example shows:
341
342```kotlin
343fun main(args: Array<String>) = runBlocking<Unit> {
344 val job = launch(CommonPool) {
345 var nextPrintTime = 0L
346 var i = 0
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300347 while (i < 10) { // computation loop
Roman Elizarov7deefb82017-01-31 10:33:17 +0300348 val currentTime = System.currentTimeMillis()
349 if (currentTime >= nextPrintTime) {
350 println("I'm sleeping ${i++} ...")
351 nextPrintTime = currentTime + 500L
352 }
353 }
354 }
355 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300356 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300357 job.cancel() // cancels the job
358 delay(1300L) // delay a bit to see if it was cancelled....
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300359 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300360}
361```
362
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300363> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300364
365Run it to see that it continues to print "I'm sleeping" even after cancellation.
366
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300367<!--- TEST
368I'm sleeping 0 ...
369I'm sleeping 1 ...
370I'm sleeping 2 ...
371main: I'm tired of waiting!
372I'm sleeping 3 ...
373I'm sleeping 4 ...
374I'm sleeping 5 ...
375main: Now I can quit.
376-->
377
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300378### Making computation code cancellable
Roman Elizarov7deefb82017-01-31 10:33:17 +0300379
380There are two approaches to making computation code cancellable. The first one is to periodically
Roman Elizarov419a6c82017-02-09 18:36:22 +0300381invoke a suspending function. There is a [yield] function that is a good choice for that purpose.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300382The other one is to explicitly check the cancellation status. Let us try the later approach.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300383
384Replace `while (true)` in the previous example with `while (isActive)` and rerun it.
385
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300386```kotlin
387fun main(args: Array<String>) = runBlocking<Unit> {
388 val job = launch(CommonPool) {
389 var nextPrintTime = 0L
390 var i = 0
391 while (isActive) { // cancellable computation loop
392 val currentTime = System.currentTimeMillis()
393 if (currentTime >= nextPrintTime) {
394 println("I'm sleeping ${i++} ...")
395 nextPrintTime = currentTime + 500L
396 }
397 }
398 }
399 delay(1300L) // delay a bit
400 println("main: I'm tired of waiting!")
401 job.cancel() // cancels the job
402 delay(1300L) // delay a bit to see if it was cancelled....
403 println("main: Now I can quit.")
404}
405```
406
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300407> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300408
Roman Elizarov419a6c82017-02-09 18:36:22 +0300409As you can see, now this loop can be cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
410the code of coroutines via [CoroutineScope] object.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300411
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300412<!--- TEST
413I'm sleeping 0 ...
414I'm sleeping 1 ...
415I'm sleeping 2 ...
416main: I'm tired of waiting!
417main: Now I can quit.
418-->
419
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300420### Closing resources with finally
421
Roman Elizarov419a6c82017-02-09 18:36:22 +0300422Cancellable suspending functions throw [CancellationException] on cancellation which can be handled in
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300423all the usual way. For example, the `try {...} finally {...}` and Kotlin `use` function execute their
424finalization actions normally when coroutine is cancelled:
425
426```kotlin
427fun main(args: Array<String>) = runBlocking<Unit> {
428 val job = launch(CommonPool) {
429 try {
430 repeat(1000) { i ->
431 println("I'm sleeping $i ...")
432 delay(500L)
433 }
434 } finally {
435 println("I'm running finally")
436 }
437 }
438 delay(1300L) // delay a bit
439 println("main: I'm tired of waiting!")
440 job.cancel() // cancels the job
441 delay(1300L) // delay a bit to ensure it was cancelled indeed
442 println("main: Now I can quit.")
443}
444```
445
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300446> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300447
448The example above produces the following output:
449
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300450```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300451I'm sleeping 0 ...
452I'm sleeping 1 ...
453I'm sleeping 2 ...
454main: I'm tired of waiting!
455I'm running finally
456main: Now I can quit.
457```
458
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300459<!--- TEST -->
460
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300461### Run non-cancellable block
462
463Any attempt to use a suspending function in the `finally` block of the previous example will cause
Roman Elizarov419a6c82017-02-09 18:36:22 +0300464[CancellationException], because the coroutine running this code is cancelled. Usually, this is not a
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300465problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
466communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
467rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
Roman Elizarov419a6c82017-02-09 18:36:22 +0300468`run(NonCancellable) {...}` using [run] function and [NonCancellable] context as the following example shows:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300469
470```kotlin
471fun main(args: Array<String>) = runBlocking<Unit> {
472 val job = launch(CommonPool) {
473 try {
474 repeat(1000) { i ->
475 println("I'm sleeping $i ...")
476 delay(500L)
477 }
478 } finally {
479 run(NonCancellable) {
480 println("I'm running finally")
481 delay(1000L)
482 println("And I've just delayed for 1 sec because I'm non-cancellable")
483 }
484 }
485 }
486 delay(1300L) // delay a bit
487 println("main: I'm tired of waiting!")
488 job.cancel() // cancels the job
489 delay(1300L) // delay a bit to ensure it was cancelled indeed
490 println("main: Now I can quit.")
491}
492```
493
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300494> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300495
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300496<!--- TEST
497I'm sleeping 0 ...
498I'm sleeping 1 ...
499I'm sleeping 2 ...
500main: I'm tired of waiting!
501I'm running finally
502And I've just delayed for 1 sec because I'm non-cancellable
503main: Now I can quit.
504-->
505
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300506### Timeout
507
508The most obvious reason to cancel coroutine execution in practice,
509is because its execution time has exceeded some timeout.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300510While you can manually track the reference to the corresponding [Job] and launch a separate coroutine to cancel
511the tracked one after delay, there is a ready to use [withTimeout] function that does it.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300512Look at the following example:
513
514```kotlin
515fun main(args: Array<String>) = runBlocking<Unit> {
516 withTimeout(1300L) {
517 repeat(1000) { i ->
518 println("I'm sleeping $i ...")
519 delay(500L)
520 }
521 }
522}
523```
524
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300525> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-06.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300526
527It produces the following output:
528
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300529```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300530I'm sleeping 0 ...
531I'm sleeping 1 ...
532I'm sleeping 2 ...
533Exception in thread "main" java.util.concurrent.CancellationException: Timed out waiting for 1300 MILLISECONDS
534```
535
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300536<!--- TEST STARTS_WITH -->
537
Roman Elizarov419a6c82017-02-09 18:36:22 +0300538We have not seen the [CancellationException] stack trace printed on the console before. That is because
Roman Elizarov7c864d82017-02-27 10:17:50 +0300539inside a cancelled coroutine `CancellationException` is considered to be a normal reason for coroutine completion.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300540However, in this example we have used `withTimeout` right inside the `main` function.
541
542Because cancellation is just an exception, all the resources will be closed in a usual way.
543You can wrap the code with timeout in `try {...} catch (e: CancellationException) {...}` block if
544you need to do some additional action specifically on timeout.
545
546## Composing suspending functions
547
548This section covers various approaches to composition of suspending functions.
549
550### Sequential by default
551
552Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300553remote service call or computation. We just pretend they are useful, but actually each one just
554delays for a second for the purpose of this example:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300555
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300556<!--- INCLUDE .*/example-compose-([0-9]+).kt
557import kotlin.system.measureTimeMillis
558-->
559
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300560```kotlin
561suspend fun doSomethingUsefulOne(): Int {
562 delay(1000L) // pretend we are doing something useful here
563 return 13
564}
565
566suspend fun doSomethingUsefulTwo(): Int {
567 delay(1000L) // pretend we are doing something useful here, too
568 return 29
569}
570```
571
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300572<!--- INCLUDE .*/example-compose-([0-9]+).kt -->
573
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300574What do we do if need to invoke them _sequentially_ -- first `doSomethingUsefulOne` _and then_
575`doSomethingUsefulTwo` and compute the sum of their results?
576In practise we do this if we use the results of the first function to make a decision on whether we need
577to invoke the second one or to decide on how to invoke it.
578
579We just use a normal sequential invocation, because the code in the coroutine, just like in the regular
Roman Elizarov32d95322017-02-09 15:57:31 +0300580code, is _sequential_ by default. The following example demonstrates it by measuring the total
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300581time it takes to execute both suspending functions:
582
583```kotlin
584fun main(args: Array<String>) = runBlocking<Unit> {
585 val time = measureTimeMillis {
586 val one = doSomethingUsefulOne()
587 val two = doSomethingUsefulTwo()
588 println("The answer is ${one + two}")
589 }
590 println("Completed in $time ms")
591}
592```
593
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300594> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-01.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300595
596It produces something like this:
597
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300598```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300599The answer is 42
600Completed in 2017 ms
601```
602
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300603<!--- TEST FLEXIBLE_TIME -->
604
Roman Elizarov32d95322017-02-09 15:57:31 +0300605### Concurrent using async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300606
607What if there are no dependencies between invocation of `doSomethingUsefulOne` and `doSomethingUsefulTwo` and
Roman Elizarov419a6c82017-02-09 18:36:22 +0300608we want to get the answer faster, by doing both _concurrently_? This is where [async] comes to help.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300609
Roman Elizarov419a6c82017-02-09 18:36:22 +0300610Conceptually, [async] is just like [launch]. It starts a separate coroutine which is a light-weight thread
611that works concurrently with all the other coroutines. The difference is that `launch` returns a [Job] and
612does not carry any resulting value, while `async` returns a [Deferred] -- a light-weight non-blocking future
Roman Elizarov32d95322017-02-09 15:57:31 +0300613that represents a promise to provide a result later. You can use `.await()` on a deferred value to get its eventual result,
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300614but `Deferred` is also a `Job`, so you can cancel it if needed.
615
616```kotlin
617fun main(args: Array<String>) = runBlocking<Unit> {
618 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300619 val one = async(CommonPool) { doSomethingUsefulOne() }
620 val two = async(CommonPool) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300621 println("The answer is ${one.await() + two.await()}")
622 }
623 println("Completed in $time ms")
624}
625```
626
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300627> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-02.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300628
629It produces something like this:
630
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300631```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300632The answer is 42
633Completed in 1017 ms
634```
635
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300636<!--- TEST FLEXIBLE_TIME -->
637
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300638This is twice as fast, because we have concurrent execution of two coroutines.
639Note, that concurrency with coroutines is always explicit.
640
Roman Elizarov32d95322017-02-09 15:57:31 +0300641### Lazily started async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300642
Roman Elizarov419a6c82017-02-09 18:36:22 +0300643There is a laziness option to [async] with `start = false` parameter.
644It starts coroutine only when its result is needed by some
645[await][Deferred.await] or if a [start][Job.start] function
Roman Elizarov32d95322017-02-09 15:57:31 +0300646is invoked. Run the following example that differs from the previous one only by this option:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300647
648```kotlin
649fun main(args: Array<String>) = runBlocking<Unit> {
650 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300651 val one = async(CommonPool, start = false) { doSomethingUsefulOne() }
652 val two = async(CommonPool, start = false) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300653 println("The answer is ${one.await() + two.await()}")
654 }
655 println("Completed in $time ms")
656}
657```
658
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300659> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-03.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300660
661It produces something like this:
662
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300663```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300664The answer is 42
665Completed in 2017 ms
666```
667
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300668<!--- TEST FLEXIBLE_TIME -->
669
Roman Elizarov32d95322017-02-09 15:57:31 +0300670So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
671for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
672the standard `lazy` function in cases when computation of the value involves suspending functions.
673
674### Async-style functions
675
676We can define async-style functions that invoke `doSomethingUsefulOne` and `doSomethingUsefulTwo`
Roman Elizarov419a6c82017-02-09 18:36:22 +0300677_asynchronously_ using [async] coroutine builder. It is a good style to name such functions with
Roman Elizarov32d95322017-02-09 15:57:31 +0300678either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous
679computation and one needs to use the resulting deferred value to get the result.
680
681```kotlin
682// The result type of asyncSomethingUsefulOne is Deferred<Int>
683fun asyncSomethingUsefulOne() = async(CommonPool) {
684 doSomethingUsefulOne()
685}
686
687// The result type of asyncSomethingUsefulTwo is Deferred<Int>
688fun asyncSomethingUsefulTwo() = async(CommonPool) {
689 doSomethingUsefulTwo()
690}
691```
692
693Note, that these `asyncXXX` function are **not** _suspending_ functions. They can be used from anywhere.
694However, their use always implies asynchronous (here meaning _concurrent_) execution of their action
695with the invoking code.
696
697The following example shows their use outside of coroutine:
698
699```kotlin
700// note, that we don't have `runBlocking` to the right of `main` in this example
701fun main(args: Array<String>) {
702 val time = measureTimeMillis {
703 // we can initiate async actions outside of a coroutine
704 val one = asyncSomethingUsefulOne()
705 val two = asyncSomethingUsefulTwo()
706 // but waiting for a result must involve either suspending or blocking.
707 // here we use `runBlocking { ... }` to block the main thread while waiting for the result
708 runBlocking {
709 println("The answer is ${one.await() + two.await()}")
710 }
711 }
712 println("Completed in $time ms")
713}
714```
715
716> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300717
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300718<!--- TEST FLEXIBLE_TIME
719The answer is 42
720Completed in 1085 ms
721-->
722
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300723## Coroutine context and dispatchers
724
Roman Elizarov32d95322017-02-09 15:57:31 +0300725We've already seen `launch(CommonPool) {...}`, `async(CommonPool) {...}`, `run(NonCancellable) {...}`, etc.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300726In these code snippets [CommonPool] and [NonCancellable] are _coroutine contexts_.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300727This section covers other available choices.
728
729### Dispatchers and threads
730
Roman Elizarov419a6c82017-02-09 18:36:22 +0300731Coroutine context includes a [_coroutine dispatcher_][CoroutineDispatcher] which determines what thread or threads
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300732the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
733to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:
734
735```kotlin
736fun main(args: Array<String>) = runBlocking<Unit> {
737 val jobs = arrayListOf<Job>()
738 jobs += launch(Unconfined) { // not confined -- will work with main thread
739 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
740 }
741 jobs += launch(context) { // context of the parent, runBlocking coroutine
742 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
743 }
744 jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
745 println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
746 }
747 jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
748 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
749 }
750 jobs.forEach { it.join() }
751}
752```
753
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300754> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-01.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300755
756It produces the following output (maybe in different order):
757
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300758```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300759 'Unconfined': I'm working in thread main
760 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
761 'newSTC': I'm working in thread MyOwnThread
762 'context': I'm working in thread main
763```
764
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300765<!--- TEST LINES_START_UNORDERED -->
766
Roman Elizarov419a6c82017-02-09 18:36:22 +0300767The difference between parent [context][CoroutineScope.context] and [Unconfined] context will be shown later.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300768
769### Unconfined vs confined dispatcher
770
Roman Elizarov419a6c82017-02-09 18:36:22 +0300771The [Unconfined] coroutine dispatcher starts coroutine in the caller thread, but only until the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300772first suspension point. After suspension it resumes in the thread that is fully determined by the
773suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not
774consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
775
Roman Elizarov419a6c82017-02-09 18:36:22 +0300776On the other side, [context][CoroutineScope.context] property that is available inside the block of any coroutine
777via [CoroutineScope] interface, is a reference to a context of this particular coroutine.
778This way, a parent context can be inherited. The default context of [runBlocking], in particular,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300779is confined to be invoker thread, so inheriting it has the effect of confining execution to
780this thread with a predictable FIFO scheduling.
781
782```kotlin
783fun main(args: Array<String>) = runBlocking<Unit> {
784 val jobs = arrayListOf<Job>()
785 jobs += launch(Unconfined) { // not confined -- will work with main thread
786 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
787 delay(1000)
788 println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
789 }
790 jobs += launch(context) { // context of the parent, runBlocking coroutine
791 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
792 delay(1000)
793 println(" 'context': After delay in thread ${Thread.currentThread().name}")
794 }
795 jobs.forEach { it.join() }
796}
797```
798
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300799> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-contest-02.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300800
801Produces the output:
802
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300803```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300804 'Unconfined': I'm working in thread main
805 'context': I'm working in thread main
806 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
807 'context': After delay in thread main
808```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300809
810<!--- TEST LINES_START -->
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300811
Roman Elizarov7c864d82017-02-27 10:17:50 +0300812So, the coroutine that had inherited `context` of `runBlocking {...}` continues to execute in the `main` thread,
Roman Elizarov419a6c82017-02-09 18:36:22 +0300813while the unconfined one had resumed in the scheduler thread that [delay] function is using.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300814
815### Debugging coroutines and threads
816
Roman Elizarov419a6c82017-02-09 18:36:22 +0300817Coroutines can suspend on one thread and resume on another thread with [Unconfined] dispatcher or
818with a multi-threaded dispatcher like [CommonPool]. Even with a single-threaded dispatcher it might be hard to
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300819figure out what coroutine was doing what, where, and when. The common approach to debugging applications with
820threads is to print the thread name in the log file on each log statement. This feature is universally supported
821by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
822`kotlinx.coroutines` includes debugging facilities to make it easier.
823
824Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
825
826```kotlin
827fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
828
829fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov32d95322017-02-09 15:57:31 +0300830 val a = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300831 log("I'm computing a piece of the answer")
832 6
833 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300834 val b = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300835 log("I'm computing another piece of the answer")
836 7
837 }
838 log("The answer is ${a.await() * b.await()}")
839}
840```
841
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300842> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300843
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300844There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300845and two coroutines computing deferred values `a` (#2) and `b` (#3).
846They are all executing in the context of `runBlocking` and are confined to the main thread.
847The output of this code is:
848
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300849```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300850[main @coroutine#2] I'm computing a piece of the answer
851[main @coroutine#3] I'm computing another piece of the answer
852[main @coroutine#1] The answer is 42
853```
854
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300855<!--- TEST -->
856
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300857The `log` function prints the name of the thread in square brackets and you can see, that it is the `main`
858thread, but the identifier of the currently executing coroutine is appended to it. This identifier
859is consecutively assigned to all created coroutines when debugging mode is turned on.
860
Roman Elizarov419a6c82017-02-09 18:36:22 +0300861You can read more about debugging facilities in the documentation for [newCoroutineContext] function.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300862
863### Jumping between threads
864
865Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
866
867```kotlin
868fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
869
870fun main(args: Array<String>) {
871 val ctx1 = newSingleThreadContext("Ctx1")
872 val ctx2 = newSingleThreadContext("Ctx2")
873 runBlocking(ctx1) {
874 log("Started in ctx1")
875 run(ctx2) {
876 log("Working in ctx2")
877 }
878 log("Back to ctx1")
879 }
880}
881```
882
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300883> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300884
Roman Elizarov419a6c82017-02-09 18:36:22 +0300885It demonstrates two new techniques. One is using [runBlocking] with an explicitly specified context, and
886the second one is using [run] function to change a context of a coroutine while still staying in the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300887same coroutine as you can see in the output below:
888
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300889```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300890[Ctx1 @coroutine#1] Started in ctx1
891[Ctx2 @coroutine#1] Working in ctx2
892[Ctx1 @coroutine#1] Back to ctx1
893```
894
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300895<!--- TEST -->
896
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300897### Job in the context
898
Roman Elizarov419a6c82017-02-09 18:36:22 +0300899The coroutine [Job] is part of its context. The coroutine can retrieve it from its own context
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300900using `context[Job]` expression:
901
902```kotlin
903fun main(args: Array<String>) = runBlocking<Unit> {
904 println("My job is ${context[Job]}")
905}
906```
907
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300908> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-05.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300909
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300910It produces somethine like
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300911
912```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300913My job is BlockingCoroutine{Active}@65ae6ba4
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300914```
915
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300916<!--- TEST lines.size == 1 && lines[0].startsWith("My job is BlockingCoroutine{Active}@") -->
917
Roman Elizarov419a6c82017-02-09 18:36:22 +0300918So, [isActive][CoroutineScope.isActive] in [CoroutineScope] is just a convenient shortcut for `context[Job]!!.isActive`.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300919
920### Children of a coroutine
921
Roman Elizarov419a6c82017-02-09 18:36:22 +0300922When [context][CoroutineScope.context] of a coroutine is used to launch another coroutine,
923the [Job] of the new coroutine becomes
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300924a _child_ of the parent coroutine's job. When the parent coroutine is cancelled, all its children
925are recursively cancelled, too.
926
927```kotlin
928fun main(args: Array<String>) = runBlocking<Unit> {
929 // start a coroutine to process some kind of incoming request
930 val request = launch(CommonPool) {
931 // it spawns two other jobs, one with its separate context
932 val job1 = launch(CommonPool) {
933 println("job1: I have my own context and execute independently!")
934 delay(1000)
935 println("job1: I am not affected by cancellation of the request")
936 }
937 // and the other inherits the parent context
938 val job2 = launch(context) {
939 println("job2: I am a child of the request coroutine")
940 delay(1000)
941 println("job2: I will not execute this line if my parent request is cancelled")
942 }
943 // request completes when both its sub-jobs complete:
944 job1.join()
945 job2.join()
946 }
947 delay(500)
948 request.cancel() // cancel processing of the request
949 delay(1000) // delay a second to see what happens
950 println("main: Who has survived request cancellation?")
951}
952```
953
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300954> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-06.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300955
956The output of this code is:
957
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300958```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300959job1: I have my own context and execute independently!
960job2: I am a child of the request coroutine
961job1: I am not affected by cancellation of the request
962main: Who has survived request cancellation?
963```
964
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300965<!--- TEST -->
966
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300967### Combining contexts
968
969Coroutine context can be combined using `+` operator. The context on the right-hand side replaces relevant entries
Roman Elizarov419a6c82017-02-09 18:36:22 +0300970of the context on the left-hand side. For example, a [Job] of the parent coroutine can be inherited, while
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300971its dispatcher replaced:
972
973```kotlin
974fun main(args: Array<String>) = runBlocking<Unit> {
975 // start a coroutine to process some kind of incoming request
976 val request = launch(context) { // use the context of `runBlocking`
977 // spawns CPU-intensive child job in CommonPool !!!
978 val job = launch(context + CommonPool) {
979 println("job: I am a child of the request coroutine, but with a different dispatcher")
980 delay(1000)
981 println("job: I will not execute this line if my parent request is cancelled")
982 }
983 job.join() // request completes when its sub-job completes
984 }
985 delay(500)
986 request.cancel() // cancel processing of the request
987 delay(1000) // delay a second to see what happens
988 println("main: Who has survived request cancellation?")
989}
990```
991
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300992> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-07.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300993
994The expected outcome of this code is:
995
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300996```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300997job: I am a child of the request coroutine, but with a different dispatcher
998main: Who has survived request cancellation?
999```
1000
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001001<!--- TEST -->
1002
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001003### Naming coroutines for debugging
1004
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001005Automatically assigned ids are good when coroutines log often and you just need to correlate log records
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001006coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
1007or doing some specific background task, it is better to name it explicitly for debugging purposes.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001008[CoroutineName] serves the same function as a thread name. It'll get displayed in the thread name that
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001009is executing this coroutine when debugging more is turned on.
1010
1011The following example demonstrates this concept:
1012
1013```kotlin
1014fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
1015
1016fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
1017 log("Started main coroutine")
1018 // run two background value computations
Roman Elizarov32d95322017-02-09 15:57:31 +03001019 val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001020 log("Computing v1")
1021 delay(500)
1022 252
1023 }
Roman Elizarov32d95322017-02-09 15:57:31 +03001024 val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001025 log("Computing v2")
1026 delay(1000)
1027 6
1028 }
1029 log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
1030}
1031```
1032
Roman Elizarovfa7723e2017-02-06 11:17:51 +03001033> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001034
1035The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
1036
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001037```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001038[main @main#1] Started main coroutine
1039[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
1040[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
1041[main @main#1] The answer for v1 / v2 = 42
1042```
Roman Elizarov1293ccd2017-02-01 18:49:54 +03001043
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001044<!--- TEST FLEXIBLE_THREAD -->
1045
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001046### Cancellation via explicit job
1047
1048Let us put our knowledge about contexts, children and jobs together. Assume that our application has
1049an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application
1050and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch
1051and update data, do animations, etc. All of these coroutines must be cancelled when activity is destroyed
1052to avoid memory leaks.
1053
1054We can manage a lifecycle of our coroutines by creating an instance of [Job] that is tied to
1055the lifecycle of our activity. A job instance is created using [Job()][Job.invoke] factory function
1056as the following example shows. We need to make sure that all the coroutines are started
1057with this job in their context and then a single invocation of [Job.cancel] terminates them all.
1058
1059```kotlin
1060fun main(args: Array<String>) = runBlocking<Unit> {
1061 val job = Job() // create a job object to manage our lifecycle
1062 // now launch ten coroutines for a demo, each working for a different time
1063 val coroutines = List(10) { i ->
1064 // they are all children of our job object
1065 launch(context + job) { // we use the context of main runBlocking thread, but with our own job object
1066 delay(i * 200L) // variable delay 0ms, 200ms, 400ms, ... etc
1067 println("Coroutine $i is done")
1068 }
1069 }
1070 println("Launched ${coroutines.size} coroutines")
1071 delay(500L) // delay for half a second
1072 println("Cancelling job!")
1073 job.cancel() // cancel our job.. !!!
1074 delay(1000L) // delay for more to see if our coroutines are still working
1075}
1076```
1077
1078> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt)
1079
1080The output of this example is:
1081
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001082```text
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001083Launched 10 coroutines
1084Coroutine 0 is done
1085Coroutine 1 is done
1086Coroutine 2 is done
1087Cancelling job!
1088```
1089
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001090<!--- TEST -->
1091
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001092As you can see, only the first three coroutines had printed a message and the others were cancelled
1093by a single invocation of `job.cancel()`. So all we need to do in our hypothetical Android
1094application is to create a parent job object when activity is created, use it for child coroutines,
1095and cancel it when activity is destroyed.
1096
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001097## Channels
Roman Elizarov7deefb82017-01-31 10:33:17 +03001098
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001099Deferred values provide a convenient way to transfer a single value between coroutines.
1100Channels provide a way to transfer a stream of values.
1101
1102<!--- INCLUDE .*/example-channel-([0-9]+).kt
1103import kotlinx.coroutines.experimental.channels.*
1104-->
1105
1106### Channel basics
1107
Roman Elizarov419a6c82017-02-09 18:36:22 +03001108A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
1109instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
1110a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001111
1112```kotlin
1113fun main(args: Array<String>) = runBlocking<Unit> {
1114 val channel = Channel<Int>()
1115 launch(CommonPool) {
1116 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
1117 for (x in 1..5) channel.send(x * x)
1118 }
1119 // here we print five received integers:
1120 repeat(5) { println(channel.receive()) }
1121 println("Done!")
1122}
1123```
1124
1125> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
1126
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001127The output of this code is:
1128
1129```text
11301
11314
11329
113316
113425
1135Done!
1136```
1137
1138<!--- TEST -->
1139
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001140### Closing and iteration over channels
1141
1142Unlike a queue, a channel can be closed to indicate that no more elements are coming.
1143On the receiver side it is convenient to use a regular `for` loop to receive elements
1144from the channel.
1145
Roman Elizarov419a6c82017-02-09 18:36:22 +03001146Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001147The iteration stops as soon as this close token is received, so there is a guarantee
1148that all previously sent elements before the close are received:
1149
1150```kotlin
1151fun main(args: Array<String>) = runBlocking<Unit> {
1152 val channel = Channel<Int>()
1153 launch(CommonPool) {
1154 for (x in 1..5) channel.send(x * x)
1155 channel.close() // we're done sending
1156 }
1157 // here we print received values using `for` loop (until the channel is closed)
1158 for (y in channel) println(y)
1159 println("Done!")
1160}
1161```
1162
1163> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
1164
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001165<!--- TEST
11661
11674
11689
116916
117025
1171Done!
1172-->
1173
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001174### Building channel producers
1175
Roman Elizarova5e653f2017-02-13 13:49:55 +03001176The pattern where a coroutine is producing a sequence of elements is quite common.
1177This is a part of _producer-consumer_ pattern that is often found in concurrent code.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001178You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
Roman Elizarova5e653f2017-02-13 13:49:55 +03001179to common sense that results must be returned from functions.
1180
1181There is a convenience coroutine builder named [produce] that makes it easy to do it right:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001182
1183```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001184fun produceSquares() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001185 for (x in 1..5) send(x * x)
1186}
1187
1188fun main(args: Array<String>) = runBlocking<Unit> {
1189 val squares = produceSquares()
1190 for (y in squares) println(y)
1191 println("Done!")
1192}
1193```
1194
1195> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
1196
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001197<!--- TEST
11981
11994
12009
120116
120225
1203Done!
1204-->
1205
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001206### Pipelines
1207
1208Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
1209
1210```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001211fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001212 var x = 1
1213 while (true) send(x++) // infinite stream of integers starting from 1
1214}
1215```
1216
Roman Elizarova5e653f2017-02-13 13:49:55 +03001217And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001218In the below example the numbers are just squared:
1219
1220```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001221fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001222 for (x in numbers) send(x * x)
1223}
1224```
1225
Roman Elizarova5e653f2017-02-13 13:49:55 +03001226The main code starts and connects the whole pipeline:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001227
1228```kotlin
1229fun main(args: Array<String>) = runBlocking<Unit> {
1230 val numbers = produceNumbers() // produces integers from 1 and on
1231 val squares = square(numbers) // squares integers
1232 for (i in 1..5) println(squares.receive()) // print first five
1233 println("Done!") // we are done
1234 squares.cancel() // need to cancel these coroutines in a larger app
1235 numbers.cancel()
1236}
1237```
1238
1239> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
1240
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001241<!--- TEST
12421
12434
12449
124516
124625
1247Done!
1248-->
1249
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001250We don't have to cancel these coroutines in this example app, because
1251[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
1252but in a larger app we'll need to stop our pipeline if we don't need it anymore.
1253Alternatively, we could have run pipeline coroutines as
1254[children of a coroutine](#children-of-a-coroutine).
1255
1256### Prime numbers with pipeline
1257
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001258Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001259of coroutines. We start with an infinite sequence of numbers. This time we introduce an
1260explicit context parameter, so that caller can control where our coroutines run:
1261
1262<!--- INCLUDE kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
1263import kotlin.coroutines.experimental.CoroutineContext
1264-->
1265
1266```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001267fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001268 var x = start
1269 while (true) send(x++) // infinite stream of integers from start
1270}
1271```
1272
1273The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1274that are divisible by the given prime number:
1275
1276```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001277fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001278 for (x in numbers) if (x % prime != 0) send(x)
1279}
1280```
1281
1282Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001283and launching new pipeline stage for each prime number found:
1284
1285```
Roman Elizarova5e653f2017-02-13 13:49:55 +03001286numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
Roman Elizarov62500ba2017-02-09 18:55:40 +03001287```
1288
1289The following example prints the first ten prime numbers,
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001290running the whole pipeline in the context of the main thread:
1291
1292```kotlin
1293fun main(args: Array<String>) = runBlocking<Unit> {
1294 var cur = numbersFrom(context, 2)
1295 for (i in 1..10) {
1296 val prime = cur.receive()
1297 println(prime)
1298 cur = filter(context, cur, prime)
1299 }
1300}
1301```
1302
1303> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
1304
1305The output of this code is:
1306
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001307```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +030013082
13093
13105
13117
131211
131313
131417
131519
131623
131729
1318```
1319
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001320<!--- TEST -->
1321
Roman Elizarova5e653f2017-02-13 13:49:55 +03001322Note, that you can build the same pipeline using `buildIterator` coroutine builder from the standard library.
1323Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001324`ReceiveChannel` with `Iterator`, and get rid of the context. You will not need `runBlocking` either.
1325However, the benefit of a pipeline that uses channels as shown above is that it can actually use
1326multiple CPU cores if you run it in [CommonPool] context.
1327
Roman Elizarova5e653f2017-02-13 13:49:55 +03001328Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
Roman Elizarov62500ba2017-02-09 18:55:40 +03001329other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
1330built using `buildSeqeunce`/`buildIterator`, because they do not allow arbitrary suspension, unlike
Roman Elizarova5e653f2017-02-13 13:49:55 +03001331`produce` which is fully asynchronous.
Roman Elizarov62500ba2017-02-09 18:55:40 +03001332
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001333### Fan-out
1334
1335Multiple coroutines may receive from the same channel, distributing work between themselves.
1336Let us start with a producer coroutine that is periodically producing integers
1337(ten numbers per second):
1338
1339```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001340fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001341 var x = 1 // start from 1
1342 while (true) {
1343 send(x++) // produce next
1344 delay(100) // wait 0.1s
1345 }
1346}
1347```
1348
1349Then we can have several processor coroutines. In this example, they just print their id and
1350received number:
1351
1352```kotlin
1353fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
Roman Elizarovec9384c2017-03-02 22:09:08 +03001354 for (x in channel) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001355 println("Processor #$id received $x")
Roman Elizarovec9384c2017-03-02 22:09:08 +03001356 }
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001357}
1358```
1359
1360Now let us launch five processors and let them work for a second. See what happens:
1361
1362```kotlin
1363fun main(args: Array<String>) = runBlocking<Unit> {
1364 val producer = produceNumbers()
1365 repeat(5) { launchProcessor(it, producer) }
1366 delay(1000)
1367 producer.cancel() // cancel producer coroutine and thus kill them all
1368}
1369```
1370
1371> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
1372
1373The output will be similar to the the following one, albeit the processor ids that receive
1374each specific integer may be different:
1375
1376```
1377Processor #2 received 1
1378Processor #4 received 2
1379Processor #0 received 3
1380Processor #1 received 4
1381Processor #3 received 5
1382Processor #2 received 6
1383Processor #4 received 7
1384Processor #0 received 8
1385Processor #1 received 9
1386Processor #3 received 10
1387```
1388
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001389<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
1390
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001391Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1392over the channel that processor coroutines are doing.
1393
1394### Fan-in
1395
1396Multiple coroutines may send to the same channel.
1397For example, let us have a channel of strings, and a suspending function that
1398repeatedly sends a specified string to this channel with a specified delay:
1399
1400```kotlin
1401suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1402 while (true) {
1403 delay(time)
1404 channel.send(s)
1405 }
1406}
1407```
1408
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001409Now, let us see what happens if we launch a couple of coroutines sending strings
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001410(in this example we launch them in the context of the main thread):
1411
1412```kotlin
1413fun main(args: Array<String>) = runBlocking<Unit> {
1414 val channel = Channel<String>()
1415 launch(context) { sendString(channel, "foo", 200L) }
1416 launch(context) { sendString(channel, "BAR!", 500L) }
1417 repeat(6) { // receive first six
1418 println(channel.receive())
1419 }
1420}
1421```
1422
1423> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
1424
1425The output is:
1426
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001427```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001428foo
1429foo
1430BAR!
1431foo
1432foo
1433BAR!
1434```
1435
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001436<!--- TEST -->
1437
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001438### Buffered channels
1439
1440The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1441meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1442if receive is invoked first, it is suspended until send is invoked.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001443
Roman Elizarova5e653f2017-02-13 13:49:55 +03001444Both [Channel()][Channel.invoke] factory function and [produce] builder take an optional `capacity` parameter to
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001445specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1446similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1447
1448Take a look at the behavior of the following code:
1449
1450```kotlin
1451fun main(args: Array<String>) = runBlocking<Unit> {
1452 val channel = Channel<Int>(4) // create buffered channel
1453 launch(context) { // launch sender coroutine
1454 repeat(10) {
1455 println("Sending $it") // print before sending each element
1456 channel.send(it) // will suspend when buffer is full
1457 }
1458 }
1459 // don't receive anything... just wait....
1460 delay(1000)
1461}
1462```
1463
1464> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
1465
1466It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1467
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001468```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001469Sending 0
1470Sending 1
1471Sending 2
1472Sending 3
1473Sending 4
1474```
1475
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001476<!--- TEST -->
1477
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001478The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001479
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001480
1481### Channels are fair
1482
1483Send and receive operations to channels are _fair_ with respect to the order of their invocation from
1484multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
1485gets the element. In the following example two coroutines "ping" and "pong" are
1486receiving the "ball" object from the shared "table" channel.
1487
1488```kotlin
1489data class Ball(var hits: Int)
1490
1491fun main(args: Array<String>) = runBlocking<Unit> {
1492 val table = Channel<Ball>() // a shared table
1493 launch(context) { player("ping", table) }
1494 launch(context) { player("pong", table) }
1495 table.send(Ball(0)) // serve the ball
1496 delay(1000) // delay 1 second
1497 table.receive() // game over, grab the ball
1498}
1499
1500suspend fun player(name: String, table: Channel<Ball>) {
1501 for (ball in table) { // receive the ball in a loop
1502 ball.hits++
1503 println("$name $ball")
1504 delay(200) // wait a bit
1505 table.send(ball) // send the ball back
1506 }
1507}
1508```
1509
1510> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-09.kt)
1511
1512The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
1513coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
1514received by the "pong" coroutine, because it was already waiting for it:
1515
1516```text
1517ping Ball(hits=1)
1518pong Ball(hits=2)
1519ping Ball(hits=3)
1520pong Ball(hits=4)
1521ping Ball(hits=5)
1522pong Ball(hits=6)
1523```
1524
1525<!--- TEST -->
1526
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001527## Shared mutable state and concurrency
1528
1529Coroutines can be executed concurrently using a multi-threaded dispatcher like [CommonPool]. It presents
1530all the usual concurrency problems. The main problem being synchronization of access to **shared mutable state**.
1531Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
1532but others are unique.
1533
1534### The problem
1535
Roman Elizarov1e459602017-02-27 11:05:17 +03001536Let us launch a thousand coroutines all doing the same action thousand times (for a total of a million executions).
1537We'll also measure their completion time for further comparisons:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001538
1539<!--- INCLUDE .*/example-sync-([0-9]+).kt
Roman Elizarov1e459602017-02-27 11:05:17 +03001540import kotlin.coroutines.experimental.CoroutineContext
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001541import kotlin.system.measureTimeMillis
1542-->
1543
Roman Elizarov1e459602017-02-27 11:05:17 +03001544<!--- INCLUDE .*/example-sync-03.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001545import java.util.concurrent.atomic.AtomicInteger
1546-->
1547
Roman Elizarov1e459602017-02-27 11:05:17 +03001548<!--- INCLUDE .*/example-sync-06.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001549import kotlinx.coroutines.experimental.sync.Mutex
1550-->
1551
Roman Elizarov1e459602017-02-27 11:05:17 +03001552<!--- INCLUDE .*/example-sync-07.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001553import kotlinx.coroutines.experimental.channels.*
1554-->
1555
1556```kotlin
Roman Elizarov1e459602017-02-27 11:05:17 +03001557suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
1558 val n = 1000 // number of coroutines to launch
1559 val k = 1000 // times an action is repeated by each coroutine
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001560 val time = measureTimeMillis {
1561 val jobs = List(n) {
Roman Elizarov1e459602017-02-27 11:05:17 +03001562 launch(context) {
1563 repeat(k) { action() }
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001564 }
1565 }
1566 jobs.forEach { it.join() }
1567 }
Roman Elizarov1e459602017-02-27 11:05:17 +03001568 println("Completed ${n * k} actions in $time ms")
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001569}
1570```
1571
1572<!--- INCLUDE .*/example-sync-([0-9]+).kt -->
1573
Roman Elizarov1e459602017-02-27 11:05:17 +03001574We start with a very simple action that increments a shared mutable variable using
1575multi-threaded [CommonPool] context.
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001576
1577```kotlin
1578var counter = 0
1579
1580fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001581 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001582 counter++
1583 }
1584 println("Counter = $counter")
1585}
1586```
1587
1588> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt)
1589
Roman Elizarov1e459602017-02-27 11:05:17 +03001590<!--- TEST LINES_START
1591Completed 1000000 actions in
1592Counter =
1593-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001594
Roman Elizarov1e459602017-02-27 11:05:17 +03001595What does it print at the end? It is highly unlikely to ever print "Counter = 1000000", because a thousand coroutines
1596increment the `counter` concurrently from multiple threads without any synchronization.
1597
1598### Volatiles are of no help
1599
1600There is common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
1601
1602```kotlin
1603@Volatile // in Kotlin `volatile` is an annotation
1604var counter = 0
1605
1606fun main(args: Array<String>) = runBlocking<Unit> {
1607 massiveRun(CommonPool) {
1608 counter++
1609 }
1610 println("Counter = $counter")
1611}
1612```
1613
1614> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt)
1615
1616<!--- TEST LINES_START
1617Completed 1000000 actions in
1618Counter =
1619-->
1620
1621This code works slower, but we still don't get "Counter = 1000000" at the end, because volatile variables guarantee
1622linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but
1623do not provide atomicity of larger actions (increment in our case).
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001624
1625### Thread-safe data structures
1626
1627The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
1628linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
1629operations that needs to be performed on a shared state.
Roman Elizarov1e459602017-02-27 11:05:17 +03001630In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001631
1632```kotlin
1633var counter = AtomicInteger()
1634
1635fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001636 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001637 counter.incrementAndGet()
1638 }
1639 println("Counter = ${counter.get()}")
1640}
1641```
1642
Roman Elizarov1e459602017-02-27 11:05:17 +03001643> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001644
Roman Elizarov1e459602017-02-27 11:05:17 +03001645<!--- TEST ARBITRARY_TIME
1646Completed 1000000 actions in xxx ms
1647Counter = 1000000
1648-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001649
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001650This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
1651standard data structures and basic operations on them. However, it does not easily scale to complex
1652state or to complex operations that do not have ready-to-use thread-safe implementations.
1653
Roman Elizarov1e459602017-02-27 11:05:17 +03001654### Thread confinement fine-grained
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001655
Roman Elizarov1e459602017-02-27 11:05:17 +03001656_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001657state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
1658the single event-dispatch/application thread. It is easy to apply with coroutines by using a
1659single-threaded context:
1660
1661```kotlin
1662val counterContext = newSingleThreadContext("CounterContext")
1663var counter = 0
1664
1665fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001666 massiveRun(CommonPool) { // run each coroutine in CommonPool
1667 run(counterContext) { // but confine each increment to the single-threaded context
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001668 counter++
1669 }
1670 }
1671 println("Counter = $counter")
1672}
1673```
1674
Roman Elizarov1e459602017-02-27 11:05:17 +03001675> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001676
Roman Elizarov1e459602017-02-27 11:05:17 +03001677<!--- TEST ARBITRARY_TIME
1678Completed 1000000 actions in xxx ms
1679Counter = 1000000
1680-->
1681
1682This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches
1683from multi-threaded `CommonPool` context to the single-threaded context using [run] block.
1684
1685### Thread confinement coarse-grained
1686
1687In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic
1688are confined to the single thread. The following example does it like that, running each coroutine in
1689the single-threaded context to start with.
1690
1691```kotlin
1692val counterContext = newSingleThreadContext("CounterContext")
1693var counter = 0
1694
1695fun main(args: Array<String>) = runBlocking<Unit> {
1696 massiveRun(counterContext) { // run each coroutine in the single-threaded context
1697 counter++
1698 }
1699 println("Counter = $counter")
1700}
1701```
1702
1703> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt)
1704
1705<!--- TEST ARBITRARY_TIME
1706Completed 1000000 actions in xxx ms
1707Counter = 1000000
1708-->
1709
1710This now works much faster and produces correct result.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001711
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001712### Mutual exclusion
1713
1714Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
1715that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
1716Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
1717delimit a critical section. The key difference is that `Mutex.lock` is a suspending function. It does not block a thread.
1718
1719```kotlin
1720val mutex = Mutex()
1721var counter = 0
1722
1723fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001724 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001725 mutex.lock()
1726 try { counter++ }
1727 finally { mutex.unlock() }
1728 }
1729 println("Counter = $counter")
1730}
1731```
1732
Roman Elizarov1e459602017-02-27 11:05:17 +03001733> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001734
Roman Elizarov1e459602017-02-27 11:05:17 +03001735<!--- TEST ARBITRARY_TIME
1736Completed 1000000 actions in xxx ms
1737Counter = 1000000
1738-->
1739
1740The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations
1741where you absolutely must modify some shared state periodically, but there is no natural thread that this state
1742is confined to.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001743
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001744### Actors
1745
1746An actor is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine,
1747and a channel to communicate with other coroutines. A simple actor can be written as a function,
1748but an actor with a complex state is better suited for a class.
1749
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001750There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its
1751scope to receive messages from and combines the send channel into the resulting job object, so that a
1752single reference to the actor can be carried around as its handle.
1753
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001754```kotlin
1755// Message types for counterActor
1756sealed class CounterMsg
1757object IncCounter : CounterMsg() // one-way message to increment counter
1758class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply
1759
1760// This function launches a new counter actor
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001761fun counterActor() = actor<CounterMsg>(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001762 var counter = 0 // actor state
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001763 for (msg in channel) { // iterate over incoming messages
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001764 when (msg) {
1765 is IncCounter -> counter++
1766 is GetCounter -> msg.response.send(counter)
1767 }
1768 }
1769}
1770
1771fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001772 val counter = counterActor() // create the actor
Roman Elizarov1e459602017-02-27 11:05:17 +03001773 massiveRun(CommonPool) {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001774 counter.send(IncCounter)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001775 }
1776 val response = Channel<Int>()
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001777 counter.send(GetCounter(response))
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001778 println("Counter = ${response.receive()}")
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001779 counter.close() // shutdown the actor
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001780}
1781```
1782
Roman Elizarov1e459602017-02-27 11:05:17 +03001783> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001784
Roman Elizarov1e459602017-02-27 11:05:17 +03001785<!--- TEST ARBITRARY_TIME
1786Completed 1000000 actions in xxx ms
1787Counter = 1000000
1788-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001789
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001790It does not matter (for correctness) what context the actor itself is executed in. An actor is
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001791a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
1792works as a solution to the problem of shared mutable state.
1793
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001794Actor is more efficient than locking under load, because in this case it always has work to do and it does not
1795have to switch to a different context at all.
1796
1797> Note, that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated
1798 with the channel that it receives messages from, while a producer is associated with the channel that it
1799 sends elements to.
Roman Elizarov1e459602017-02-27 11:05:17 +03001800
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001801## Select expression
1802
Roman Elizarova84730b2017-02-22 11:58:50 +03001803Select expression makes it possible to await multiple suspending functions simultaneously and _select_
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001804the first one that becomes available.
1805
1806<!--- INCLUDE .*/example-select-([0-9]+).kt
1807import kotlinx.coroutines.experimental.channels.*
1808import kotlinx.coroutines.experimental.selects.*
1809-->
1810
1811### Selecting from channels
1812
1813Let us have two channels of strings `fizz` and `buzz`. The `fizz` channel produces "Fizz" string every 300 ms:
1814
1815```kotlin
1816val fizz = produce<String>(CommonPool) { // produce using common thread pool
1817 while (true) {
1818 delay(300)
1819 send("Fizz")
1820 }
1821}
1822```
1823
1824And the `buzz` channel produces "Buzz!" string every 500 ms:
1825
1826```kotlin
1827val buzz = produce<String>(CommonPool) {
1828 while (true) {
1829 delay(500)
1830 send("Buzz!")
1831 }
1832}
1833```
1834
1835Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the
1836other. But [select] expression allows us to receive from _both_ simultaneously using its
1837[onReceive][SelectBuilder.onReceive] clauses:
1838
1839```kotlin
1840suspend fun selectFizzBuzz() {
1841 select<Unit> { // <Unit> means that this select expression does not produce any result
1842 fizz.onReceive { value -> // this is the first select clause
1843 println("fizz -> '$value'")
1844 }
1845 buzz.onReceive { value -> // this is the second select clause
1846 println("buzz -> '$value'")
1847 }
1848 }
1849}
1850```
1851
Roman Elizarova84730b2017-02-22 11:58:50 +03001852Let us run it seven times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001853
1854```kotlin
1855fun main(args: Array<String>) = runBlocking<Unit> {
1856 repeat(7) {
1857 selectFizzBuzz()
1858 }
1859}
1860```
1861
1862> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt)
1863
1864The result of this code is:
1865
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001866```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001867fizz -> 'Fizz'
1868buzz -> 'Buzz!'
1869fizz -> 'Fizz'
1870fizz -> 'Fizz'
1871buzz -> 'Buzz!'
1872fizz -> 'Fizz'
1873buzz -> 'Buzz!'
1874```
1875
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001876<!--- TEST -->
1877
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001878### Selecting on close
1879
1880The [onReceive][SelectBuilder.onReceive] clause in `select` fails when the channel is closed and the corresponding
1881`select` throws an exception. We can use [onReceiveOrNull][SelectBuilder.onReceiveOrNull] clause to perform a
Roman Elizarova84730b2017-02-22 11:58:50 +03001882specific action when the channel is closed. The following example also shows that `select` is an expression that returns
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001883the result of its selected clause:
1884
1885```kotlin
1886suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
1887 select<String> {
1888 a.onReceiveOrNull { value ->
1889 if (value == null)
1890 "Channel 'a' is closed"
1891 else
1892 "a -> '$value'"
1893 }
1894 b.onReceiveOrNull { value ->
1895 if (value == null)
1896 "Channel 'b' is closed"
1897 else
1898 "b -> '$value'"
1899 }
1900 }
1901```
1902
Roman Elizarova84730b2017-02-22 11:58:50 +03001903Let's use it with channel `a` that produces "Hello" string four times and
1904channel `b` that produces "World" four times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001905
1906```kotlin
1907fun main(args: Array<String>) = runBlocking<Unit> {
1908 // we are using the context of the main thread in this example for predictability ...
1909 val a = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001910 repeat(4) { send("Hello $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001911 }
1912 val b = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001913 repeat(4) { send("World $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001914 }
1915 repeat(8) { // print first eight results
1916 println(selectAorB(a, b))
1917 }
1918}
1919```
1920
1921> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt)
1922
Roman Elizarova84730b2017-02-22 11:58:50 +03001923The result of this code is quite interesting, so we'll analyze it in mode detail:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001924
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001925```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001926a -> 'Hello 0'
1927a -> 'Hello 1'
1928b -> 'World 0'
1929a -> 'Hello 2'
1930a -> 'Hello 3'
1931b -> 'World 1'
1932Channel 'a' is closed
1933Channel 'a' is closed
1934```
1935
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001936<!--- TEST -->
1937
Roman Elizarova84730b2017-02-22 11:58:50 +03001938There are couple of observations to make out of it.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001939
1940First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time,
1941the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel,
Roman Elizarova84730b2017-02-22 11:58:50 +03001942being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001943time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
1944
1945The second observation, is that [onReceiveOrNull][SelectBuilder.onReceiveOrNull] gets immediately selected when the
1946channel is already closed.
1947
1948### Selecting to send
1949
1950Select expression has [onSend][SelectBuilder.onSend] clause that can be used for a great good in combination
1951with a biased nature of selection.
1952
Roman Elizarova84730b2017-02-22 11:58:50 +03001953Let us write an example of producer of integers that sends its values to a `side` channel when
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001954the consumers on its primary channel cannot keep up with it:
1955
1956```kotlin
1957fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
1958 for (num in 1..10) { // produce 10 numbers from 1 to 10
1959 delay(100) // every 100 ms
1960 select<Unit> {
Roman Elizarova84730b2017-02-22 11:58:50 +03001961 onSend(num) {} // Send to the primary channel
1962 side.onSend(num) {} // or to the side channel
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001963 }
1964 }
1965}
1966```
1967
1968Consumer is going to be quite slow, taking 250 ms to process each number:
1969
1970```kotlin
1971fun main(args: Array<String>) = runBlocking<Unit> {
1972 val side = Channel<Int>() // allocate side channel
1973 launch(context) { // this is a very fast consumer for the side channel
1974 for (num in side) println("Side channel has $num")
1975 }
1976 for (num in produceNumbers(side)) {
1977 println("Consuming $num")
1978 delay(250) // let us digest the consumed number properly, do not hurry
1979 }
1980 println("Done consuming")
1981}
1982```
1983
1984> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt)
1985
1986So let us see what happens:
1987
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001988```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001989Consuming 1
1990Side channel has 2
1991Side channel has 3
1992Consuming 4
1993Side channel has 5
1994Side channel has 6
1995Consuming 7
1996Side channel has 8
1997Side channel has 9
1998Consuming 10
1999Done consuming
2000```
2001
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002002<!--- TEST -->
2003
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002004### Selecting deferred values
2005
Roman Elizarova84730b2017-02-22 11:58:50 +03002006Deferred values can be selected using [onAwait][SelectBuilder.onAwait] clause.
2007Let us start with an async function that returns a deferred string value after
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002008a random delay:
2009
2010<!--- INCLUDE .*/example-select-04.kt
2011import java.util.*
2012-->
2013
2014```kotlin
2015fun asyncString(time: Int) = async(CommonPool) {
2016 delay(time.toLong())
2017 "Waited for $time ms"
2018}
2019```
2020
Roman Elizarova84730b2017-02-22 11:58:50 +03002021Let us start a dozen of them with a random delay.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002022
2023```kotlin
2024fun asyncStringsList(): List<Deferred<String>> {
2025 val random = Random(3)
Roman Elizarova84730b2017-02-22 11:58:50 +03002026 return List(12) { asyncString(random.nextInt(1000)) }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002027}
2028```
2029
Roman Elizarova84730b2017-02-22 11:58:50 +03002030Now the main function awaits for the first of them to complete and counts the number of deferred values
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002031that are still active. Note, that we've used here the fact that `select` expression is a Kotlin DSL,
Roman Elizarova84730b2017-02-22 11:58:50 +03002032so we can provide clauses for it using an arbitrary code. In this case we iterate over a list
2033of deferred values to provide `onAwait` clause for each deferred value.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002034
2035```kotlin
2036fun main(args: Array<String>) = runBlocking<Unit> {
2037 val list = asyncStringsList()
2038 val result = select<String> {
2039 list.withIndex().forEach { (index, deferred) ->
2040 deferred.onAwait { answer ->
2041 "Deferred $index produced answer '$answer'"
2042 }
2043 }
2044 }
2045 println(result)
Roman Elizarov7c864d82017-02-27 10:17:50 +03002046 val countActive = list.count { it.isActive }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002047 println("$countActive coroutines are still active")
2048}
2049```
2050
2051> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-04.kt)
2052
2053The output is:
2054
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002055```text
Roman Elizarova84730b2017-02-22 11:58:50 +03002056Deferred 4 produced answer 'Waited for 128 ms'
Roman Elizarovd4dcbe22017-02-22 09:57:46 +0300205711 coroutines are still active
2058```
2059
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002060<!--- TEST -->
2061
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002062### Switch over a channel of deferred values
2063
Roman Elizarova84730b2017-02-22 11:58:50 +03002064Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
2065deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002066[onReceiveOrNull][SelectBuilder.onReceiveOrNull] and [onAwait][SelectBuilder.onAwait] clauses in the same `select`:
2067
2068```kotlin
2069fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String>(CommonPool) {
Roman Elizarova84730b2017-02-22 11:58:50 +03002070 var current = input.receive() // start with first received deferred value
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002071 while (isActive) { // loop while not cancelled/closed
2072 val next = select<Deferred<String>?> { // return next deferred value from this select or null
2073 input.onReceiveOrNull { update ->
2074 update // replaces next value to wait
2075 }
2076 current.onAwait { value ->
2077 send(value) // send value that current deferred has produced
2078 input.receiveOrNull() // and use the next deferred from the input channel
2079 }
2080 }
2081 if (next == null) {
2082 println("Channel was closed")
2083 break // out of loop
2084 } else {
2085 current = next
2086 }
2087 }
2088}
2089```
2090
2091To test it, we'll use a simple async function that resolves to a specified string after a specified time:
2092
2093```kotlin
2094fun asyncString(str: String, time: Long) = async(CommonPool) {
2095 delay(time)
2096 str
2097}
2098```
2099
2100The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
2101data to it:
2102
2103```kotlin
2104fun main(args: Array<String>) = runBlocking<Unit> {
2105 val chan = Channel<Deferred<String>>() // the channel for test
Roman Elizarova84730b2017-02-22 11:58:50 +03002106 launch(context) { // launch printing coroutine
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002107 for (s in switchMapDeferreds(chan))
2108 println(s) // print each received string
2109 }
2110 chan.send(asyncString("BEGIN", 100))
2111 delay(200) // enough time for "BEGIN" to be produced
2112 chan.send(asyncString("Slow", 500))
Roman Elizarova84730b2017-02-22 11:58:50 +03002113 delay(100) // not enough time to produce slow
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002114 chan.send(asyncString("Replace", 100))
Roman Elizarova84730b2017-02-22 11:58:50 +03002115 delay(500) // give it time before the last one
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002116 chan.send(asyncString("END", 500))
2117 delay(1000) // give it time to process
Roman Elizarova84730b2017-02-22 11:58:50 +03002118 chan.close() // close the channel ...
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002119 delay(500) // and wait some time to let it finish
2120}
2121```
2122
2123> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-05.kt)
2124
2125The result of this code:
2126
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002127```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002128BEGIN
2129Replace
2130END
2131Channel was closed
2132```
2133
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002134<!--- TEST -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002135
Roman Elizarove0c817d2017-02-10 10:22:01 +03002136<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core -->
2137<!--- DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
2138<!--- INDEX kotlinx.coroutines.experimental -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002139[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
2140[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/delay.html
2141[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
2142[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
2143[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-cancellation-exception.html
2144[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002145[CoroutineScope.isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/is-active.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002146[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
2147[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
2148[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
2149[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
2150[async]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/async.html
2151[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002152[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html
2153[Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/start.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002154[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
2155[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002156[CoroutineScope.context]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/context.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002157[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002158[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002159[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002160[Job.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/invoke.html
2161[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/cancel.html
Roman Elizarovf5bc0472017-02-22 11:38:13 +03002162<!--- INDEX kotlinx.coroutines.experimental.sync -->
2163[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002164[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/lock.html
2165[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/unlock.html
Roman Elizarove0c817d2017-02-10 10:22:01 +03002166<!--- INDEX kotlinx.coroutines.experimental.channels -->
Roman Elizarov419a6c82017-02-09 18:36:22 +03002167[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002168[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
2169[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
2170[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html
Roman Elizarova5e653f2017-02-13 13:49:55 +03002171[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002172[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/invoke.html
Roman Elizarovc0e19f82017-02-27 11:59:14 +03002173[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002174<!--- INDEX kotlinx.coroutines.experimental.selects -->
2175[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002176[SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive.html
2177[SelectBuilder.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive-or-null.html
2178[SelectBuilder.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-send.html
2179[SelectBuilder.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-await.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002180<!--- END -->