blob: b5274c5218458684ba78db7cdf4a22f4aad98de8 [file] [log] [blame] [view]
Roman Elizarova5e653f2017-02-13 13:49:55 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9]+)\.kt
2/*
3 * Copyright 2016-2017 JetBrains s.r.o.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
Roman Elizarovf16fd272017-02-07 11:26:00 +030017
Roman Elizarova5e653f2017-02-13 13:49:55 +030018// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
19package guide.$$1.example$$2
Roman Elizarovf16fd272017-02-07 11:26:00 +030020
Roman Elizarova5e653f2017-02-13 13:49:55 +030021import kotlinx.coroutines.experimental.*
Roman Elizarovf16fd272017-02-07 11:26:00 +030022-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +030023<!--- KNIT kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
24<!--- TEST_OUT kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
25// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
26package guide.test
27
28import org.junit.Test
29
30class GuideTest {
31-->
Roman Elizarovf16fd272017-02-07 11:26:00 +030032
Roman Elizarov7deefb82017-01-31 10:33:17 +030033# Guide to kotlinx.coroutines by example
34
35This is a short guide on core features of `kotlinx.coroutines` with a series of examples.
36
Roman Elizarov1293ccd2017-02-01 18:49:54 +030037## Table of contents
38
Roman Elizarovfa7723e2017-02-06 11:17:51 +030039<!--- TOC -->
40
Roman Elizarov1293ccd2017-02-01 18:49:54 +030041* [Coroutine basics](#coroutine-basics)
42 * [Your first coroutine](#your-first-coroutine)
43 * [Bridging blocking and non-blocking worlds](#bridging-blocking-and-non-blocking-worlds)
44 * [Waiting for a job](#waiting-for-a-job)
45 * [Extract function refactoring](#extract-function-refactoring)
46 * [Coroutines ARE light-weight](#coroutines-are-light-weight)
47 * [Coroutines are like daemon threads](#coroutines-are-like-daemon-threads)
48* [Cancellation and timeouts](#cancellation-and-timeouts)
49 * [Cancelling coroutine execution](#cancelling-coroutine-execution)
50 * [Cancellation is cooperative](#cancellation-is-cooperative)
51 * [Making computation code cancellable](#making-computation-code-cancellable)
52 * [Closing resources with finally](#closing-resources-with-finally)
53 * [Run non-cancellable block](#run-non-cancellable-block)
54 * [Timeout](#timeout)
55* [Composing suspending functions](#composing-suspending-functions)
56 * [Sequential by default](#sequential-by-default)
Roman Elizarov32d95322017-02-09 15:57:31 +030057 * [Concurrent using async](#concurrent-using-async)
58 * [Lazily started async](#lazily-started-async)
59 * [Async-style functions](#async-style-functions)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030060* [Coroutine context and dispatchers](#coroutine-context-and-dispatchers)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030061 * [Dispatchers and threads](#dispatchers-and-threads)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030062 * [Unconfined vs confined dispatcher](#unconfined-vs-confined-dispatcher)
63 * [Debugging coroutines and threads](#debugging-coroutines-and-threads)
64 * [Jumping between threads](#jumping-between-threads)
65 * [Job in the context](#job-in-the-context)
66 * [Children of a coroutine](#children-of-a-coroutine)
67 * [Combining contexts](#combining-contexts)
68 * [Naming coroutines for debugging](#naming-coroutines-for-debugging)
Roman Elizarov2fd7cb32017-02-11 23:18:59 +030069 * [Cancellation via explicit job](#cancellation-via-explicit-job)
Roman Elizarovb7721cf2017-02-03 19:23:08 +030070* [Channels](#channels)
71 * [Channel basics](#channel-basics)
72 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
73 * [Building channel producers](#building-channel-producers)
74 * [Pipelines](#pipelines)
75 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
76 * [Fan-out](#fan-out)
77 * [Fan-in](#fan-in)
78 * [Buffered channels](#buffered-channels)
Roman Elizarovb0517ba2017-02-27 14:03:14 +030079 * [Channels are fair](#channels-are-fair)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030080* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
81 * [The problem](#the-problem)
Roman Elizarov1e459602017-02-27 11:05:17 +030082 * [Volatiles are of no help](#volatiles-are-of-no-help)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030083 * [Thread-safe data structures](#thread-safe-data-structures)
Roman Elizarov1e459602017-02-27 11:05:17 +030084 * [Thread confinement fine-grained](#thread-confinement-fine-grained)
85 * [Thread confinement coarse-grained](#thread-confinement-coarse-grained)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030086 * [Mutual exclusion](#mutual-exclusion)
87 * [Actors](#actors)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +030088* [Select expression](#select-expression)
89 * [Selecting from channels](#selecting-from-channels)
90 * [Selecting on close](#selecting-on-close)
91 * [Selecting to send](#selecting-to-send)
92 * [Selecting deferred values](#selecting-deferred-values)
93 * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030094
Roman Elizarova5e653f2017-02-13 13:49:55 +030095<!--- END_TOC -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +030096
97## Coroutine basics
98
99This section covers basic coroutine concepts.
100
101### Your first coroutine
Roman Elizarov7deefb82017-01-31 10:33:17 +0300102
103Run the following code:
104
105```kotlin
106fun main(args: Array<String>) {
107 launch(CommonPool) { // create new coroutine in common thread pool
108 delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
109 println("World!") // print after delay
110 }
111 println("Hello,") // main function continues while coroutine is delayed
112 Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
113}
114```
115
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300116> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300117
118Run this code:
119
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300120```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300121Hello,
122World!
123```
124
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300125<!--- TEST -->
126
Roman Elizarov419a6c82017-02-09 18:36:22 +0300127Essentially, coroutines are light-weight threads.
128They are launched with [launch] _coroutine builder_.
129You can achieve the same result replacing
Roman Elizarov7deefb82017-01-31 10:33:17 +0300130`launch(CommonPool) { ... }` with `thread { ... }` and `delay(...)` with `Thread.sleep(...)`. Try it.
131
132If you start by replacing `launch(CommonPool)` by `thread`, the compiler produces the following error:
133
134```
135Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
136```
137
Roman Elizarov419a6c82017-02-09 18:36:22 +0300138That is because [delay] is a special _suspending function_ that does not block a thread, but _suspends_
Roman Elizarov7deefb82017-01-31 10:33:17 +0300139coroutine and it can be only used from a coroutine.
140
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300141### Bridging blocking and non-blocking worlds
Roman Elizarov7deefb82017-01-31 10:33:17 +0300142
143The first example mixes _non-blocking_ `delay(...)` and _blocking_ `Thread.sleep(...)` in the same
144code of `main` function. It is easy to get lost. Let's cleanly separate blocking and non-blocking
Roman Elizarov419a6c82017-02-09 18:36:22 +0300145worlds by using [runBlocking]:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300146
147```kotlin
148fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
149 launch(CommonPool) { // create new coroutine in common thread pool
150 delay(1000L)
151 println("World!")
152 }
153 println("Hello,") // main coroutine continues while child is delayed
154 delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
155}
156```
157
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300158> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300159
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300160<!--- TEST
161Hello,
162World!
163-->
164
Roman Elizarov419a6c82017-02-09 18:36:22 +0300165The result is the same, but this code uses only non-blocking [delay].
Roman Elizarov7deefb82017-01-31 10:33:17 +0300166
167`runBlocking { ... }` works as an adaptor that is used here to start the top-level main coroutine.
168The regular code outside of `runBlocking` _blocks_, until the coroutine inside `runBlocking` is active.
169
170This is also a way to write unit-tests for suspending functions:
171
172```kotlin
173class MyTest {
174 @Test
175 fun testMySuspendingFunction() = runBlocking<Unit> {
176 // here we can use suspending functions using any assertion style that we like
177 }
178}
179```
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300180
181<!--- CLEAR -->
Roman Elizarov7deefb82017-01-31 10:33:17 +0300182
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300183### Waiting for a job
Roman Elizarov7deefb82017-01-31 10:33:17 +0300184
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300185Delaying for a time while another coroutine is working is not a good approach. Let's explicitly
Roman Elizarov419a6c82017-02-09 18:36:22 +0300186wait (in a non-blocking way) until the background [Job] that we have launched is complete:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300187
188```kotlin
189fun main(args: Array<String>) = runBlocking<Unit> {
190 val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
191 delay(1000L)
192 println("World!")
193 }
194 println("Hello,")
195 job.join() // wait until child coroutine completes
196}
197```
198
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300199> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300200
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300201<!--- TEST
202Hello,
203World!
204-->
205
Roman Elizarov7deefb82017-01-31 10:33:17 +0300206Now the result is still the same, but the code of the main coroutine is not tied to the duration of
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300207the background job in any way. Much better.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300208
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300209### Extract function refactoring
Roman Elizarov7deefb82017-01-31 10:33:17 +0300210
211Let's extract the block of code inside `launch(CommonPool} { ... }` into a separate function. When you
212perform "Extract function" refactoring on this code you get a new function with `suspend` modifier.
213That is your first _suspending function_. Suspending functions can be used inside coroutines
214just like regular functions, but their additional feature is that they can, in turn,
215use other suspending functions, like `delay` in this example, to _suspend_ execution of a coroutine.
216
217```kotlin
218fun main(args: Array<String>) = runBlocking<Unit> {
219 val job = launch(CommonPool) { doWorld() }
220 println("Hello,")
221 job.join()
222}
223
224// this is your first suspending function
225suspend fun doWorld() {
226 delay(1000L)
227 println("World!")
228}
229```
230
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300231> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-04.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300232
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300233<!--- TEST
234Hello,
235World!
236-->
237
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300238### Coroutines ARE light-weight
Roman Elizarov7deefb82017-01-31 10:33:17 +0300239
240Run the following code:
241
242```kotlin
243fun main(args: Array<String>) = runBlocking<Unit> {
244 val jobs = List(100_000) { // create a lot of coroutines and list their jobs
245 launch(CommonPool) {
246 delay(1000L)
247 print(".")
248 }
249 }
250 jobs.forEach { it.join() } // wait for all jobs to complete
251}
252```
253
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300254> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-05.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300255
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300256<!--- TEST lines.size == 1 && lines[0] == ".".repeat(100_000) -->
257
Roman Elizarov7deefb82017-01-31 10:33:17 +0300258It starts 100K coroutines and, after a second, each coroutine prints a dot.
259Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
260
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300261### Coroutines are like daemon threads
Roman Elizarov7deefb82017-01-31 10:33:17 +0300262
263The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300264returns from the main function after some delay:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300265
266```kotlin
267fun main(args: Array<String>) = runBlocking<Unit> {
268 launch(CommonPool) {
269 repeat(1000) { i ->
270 println("I'm sleeping $i ...")
271 delay(500L)
272 }
273 }
274 delay(1300L) // just quit after delay
275}
276```
277
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300278> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-06.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300279
280You can run and see that it prints three lines and terminates:
281
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300282```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300283I'm sleeping 0 ...
284I'm sleeping 1 ...
285I'm sleeping 2 ...
286```
287
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300288<!--- TEST -->
289
Roman Elizarov7deefb82017-01-31 10:33:17 +0300290Active coroutines do not keep the process alive. They are like daemon threads.
291
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300292## Cancellation and timeouts
293
294This section covers coroutine cancellation and timeouts.
295
296### Cancelling coroutine execution
Roman Elizarov7deefb82017-01-31 10:33:17 +0300297
298In small application the return from "main" method might sound like a good idea to get all coroutines
299implicitly terminated. In a larger, long-running application, you need finer-grained control.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300300The [launch] function returns a [Job] that can be used to cancel running coroutine:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300301
302```kotlin
303fun main(args: Array<String>) = runBlocking<Unit> {
304 val job = launch(CommonPool) {
305 repeat(1000) { i ->
306 println("I'm sleeping $i ...")
307 delay(500L)
308 }
309 }
310 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300311 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300312 job.cancel() // cancels the job
313 delay(1300L) // delay a bit to ensure it was cancelled indeed
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300314 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300315}
316```
317
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300318> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300319
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300320It produces the following output:
321
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300322```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300323I'm sleeping 0 ...
324I'm sleeping 1 ...
325I'm sleeping 2 ...
326main: I'm tired of waiting!
327main: Now I can quit.
328```
329
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300330<!--- TEST -->
331
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300332As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
333
334### Cancellation is cooperative
Roman Elizarov7deefb82017-01-31 10:33:17 +0300335
Tair Rzayevaf734622017-02-01 22:30:16 +0200336Coroutine cancellation is _cooperative_. A coroutine code has to cooperate to be cancellable.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300337All the suspending functions in `kotlinx.coroutines` are _cancellable_. They check for cancellation of
Roman Elizarov419a6c82017-02-09 18:36:22 +0300338coroutine and throw [CancellationException] when cancelled. However, if a coroutine is working in
Roman Elizarov7deefb82017-01-31 10:33:17 +0300339a computation and does not check for cancellation, then it cannot be cancelled, like the following
340example shows:
341
342```kotlin
343fun main(args: Array<String>) = runBlocking<Unit> {
344 val job = launch(CommonPool) {
345 var nextPrintTime = 0L
346 var i = 0
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300347 while (i < 10) { // computation loop
Roman Elizarov7deefb82017-01-31 10:33:17 +0300348 val currentTime = System.currentTimeMillis()
349 if (currentTime >= nextPrintTime) {
350 println("I'm sleeping ${i++} ...")
351 nextPrintTime = currentTime + 500L
352 }
353 }
354 }
355 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300356 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300357 job.cancel() // cancels the job
358 delay(1300L) // delay a bit to see if it was cancelled....
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300359 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300360}
361```
362
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300363> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300364
365Run it to see that it continues to print "I'm sleeping" even after cancellation.
366
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300367<!--- TEST
368I'm sleeping 0 ...
369I'm sleeping 1 ...
370I'm sleeping 2 ...
371main: I'm tired of waiting!
372I'm sleeping 3 ...
373I'm sleeping 4 ...
374I'm sleeping 5 ...
375main: Now I can quit.
376-->
377
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300378### Making computation code cancellable
Roman Elizarov7deefb82017-01-31 10:33:17 +0300379
380There are two approaches to making computation code cancellable. The first one is to periodically
Roman Elizarov419a6c82017-02-09 18:36:22 +0300381invoke a suspending function. There is a [yield] function that is a good choice for that purpose.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300382The other one is to explicitly check the cancellation status. Let us try the later approach.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300383
384Replace `while (true)` in the previous example with `while (isActive)` and rerun it.
385
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300386```kotlin
387fun main(args: Array<String>) = runBlocking<Unit> {
388 val job = launch(CommonPool) {
389 var nextPrintTime = 0L
390 var i = 0
391 while (isActive) { // cancellable computation loop
392 val currentTime = System.currentTimeMillis()
393 if (currentTime >= nextPrintTime) {
394 println("I'm sleeping ${i++} ...")
395 nextPrintTime = currentTime + 500L
396 }
397 }
398 }
399 delay(1300L) // delay a bit
400 println("main: I'm tired of waiting!")
401 job.cancel() // cancels the job
402 delay(1300L) // delay a bit to see if it was cancelled....
403 println("main: Now I can quit.")
404}
405```
406
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300407> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300408
Roman Elizarov419a6c82017-02-09 18:36:22 +0300409As you can see, now this loop can be cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
410the code of coroutines via [CoroutineScope] object.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300411
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300412<!--- TEST
413I'm sleeping 0 ...
414I'm sleeping 1 ...
415I'm sleeping 2 ...
416main: I'm tired of waiting!
417main: Now I can quit.
418-->
419
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300420### Closing resources with finally
421
Roman Elizarov419a6c82017-02-09 18:36:22 +0300422Cancellable suspending functions throw [CancellationException] on cancellation which can be handled in
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300423all the usual way. For example, the `try {...} finally {...}` and Kotlin `use` function execute their
424finalization actions normally when coroutine is cancelled:
425
426```kotlin
427fun main(args: Array<String>) = runBlocking<Unit> {
428 val job = launch(CommonPool) {
429 try {
430 repeat(1000) { i ->
431 println("I'm sleeping $i ...")
432 delay(500L)
433 }
434 } finally {
435 println("I'm running finally")
436 }
437 }
438 delay(1300L) // delay a bit
439 println("main: I'm tired of waiting!")
440 job.cancel() // cancels the job
441 delay(1300L) // delay a bit to ensure it was cancelled indeed
442 println("main: Now I can quit.")
443}
444```
445
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300446> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300447
448The example above produces the following output:
449
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300450```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300451I'm sleeping 0 ...
452I'm sleeping 1 ...
453I'm sleeping 2 ...
454main: I'm tired of waiting!
455I'm running finally
456main: Now I can quit.
457```
458
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300459<!--- TEST -->
460
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300461### Run non-cancellable block
462
463Any attempt to use a suspending function in the `finally` block of the previous example will cause
Roman Elizarov419a6c82017-02-09 18:36:22 +0300464[CancellationException], because the coroutine running this code is cancelled. Usually, this is not a
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300465problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
466communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
467rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
Roman Elizarov419a6c82017-02-09 18:36:22 +0300468`run(NonCancellable) {...}` using [run] function and [NonCancellable] context as the following example shows:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300469
470```kotlin
471fun main(args: Array<String>) = runBlocking<Unit> {
472 val job = launch(CommonPool) {
473 try {
474 repeat(1000) { i ->
475 println("I'm sleeping $i ...")
476 delay(500L)
477 }
478 } finally {
479 run(NonCancellable) {
480 println("I'm running finally")
481 delay(1000L)
482 println("And I've just delayed for 1 sec because I'm non-cancellable")
483 }
484 }
485 }
486 delay(1300L) // delay a bit
487 println("main: I'm tired of waiting!")
488 job.cancel() // cancels the job
489 delay(1300L) // delay a bit to ensure it was cancelled indeed
490 println("main: Now I can quit.")
491}
492```
493
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300494> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300495
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300496<!--- TEST
497I'm sleeping 0 ...
498I'm sleeping 1 ...
499I'm sleeping 2 ...
500main: I'm tired of waiting!
501I'm running finally
502And I've just delayed for 1 sec because I'm non-cancellable
503main: Now I can quit.
504-->
505
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300506### Timeout
507
508The most obvious reason to cancel coroutine execution in practice,
509is because its execution time has exceeded some timeout.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300510While you can manually track the reference to the corresponding [Job] and launch a separate coroutine to cancel
511the tracked one after delay, there is a ready to use [withTimeout] function that does it.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300512Look at the following example:
513
514```kotlin
515fun main(args: Array<String>) = runBlocking<Unit> {
516 withTimeout(1300L) {
517 repeat(1000) { i ->
518 println("I'm sleeping $i ...")
519 delay(500L)
520 }
521 }
522}
523```
524
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300525> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-06.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300526
527It produces the following output:
528
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300529```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300530I'm sleeping 0 ...
531I'm sleeping 1 ...
532I'm sleeping 2 ...
533Exception in thread "main" java.util.concurrent.CancellationException: Timed out waiting for 1300 MILLISECONDS
534```
535
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300536<!--- TEST STARTS_WITH -->
537
Roman Elizarov419a6c82017-02-09 18:36:22 +0300538We have not seen the [CancellationException] stack trace printed on the console before. That is because
Roman Elizarov7c864d82017-02-27 10:17:50 +0300539inside a cancelled coroutine `CancellationException` is considered to be a normal reason for coroutine completion.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300540However, in this example we have used `withTimeout` right inside the `main` function.
541
542Because cancellation is just an exception, all the resources will be closed in a usual way.
543You can wrap the code with timeout in `try {...} catch (e: CancellationException) {...}` block if
544you need to do some additional action specifically on timeout.
545
546## Composing suspending functions
547
548This section covers various approaches to composition of suspending functions.
549
550### Sequential by default
551
552Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300553remote service call or computation. We just pretend they are useful, but actually each one just
554delays for a second for the purpose of this example:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300555
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300556<!--- INCLUDE .*/example-compose-([0-9]+).kt
557import kotlin.system.measureTimeMillis
558-->
559
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300560```kotlin
561suspend fun doSomethingUsefulOne(): Int {
562 delay(1000L) // pretend we are doing something useful here
563 return 13
564}
565
566suspend fun doSomethingUsefulTwo(): Int {
567 delay(1000L) // pretend we are doing something useful here, too
568 return 29
569}
570```
571
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300572<!--- INCLUDE .*/example-compose-([0-9]+).kt -->
573
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300574What do we do if need to invoke them _sequentially_ -- first `doSomethingUsefulOne` _and then_
575`doSomethingUsefulTwo` and compute the sum of their results?
576In practise we do this if we use the results of the first function to make a decision on whether we need
577to invoke the second one or to decide on how to invoke it.
578
579We just use a normal sequential invocation, because the code in the coroutine, just like in the regular
Roman Elizarov32d95322017-02-09 15:57:31 +0300580code, is _sequential_ by default. The following example demonstrates it by measuring the total
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300581time it takes to execute both suspending functions:
582
583```kotlin
584fun main(args: Array<String>) = runBlocking<Unit> {
585 val time = measureTimeMillis {
586 val one = doSomethingUsefulOne()
587 val two = doSomethingUsefulTwo()
588 println("The answer is ${one + two}")
589 }
590 println("Completed in $time ms")
591}
592```
593
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300594> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-01.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300595
596It produces something like this:
597
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300598```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300599The answer is 42
600Completed in 2017 ms
601```
602
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300603<!--- TEST FLEXIBLE_TIME -->
604
Roman Elizarov32d95322017-02-09 15:57:31 +0300605### Concurrent using async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300606
607What if there are no dependencies between invocation of `doSomethingUsefulOne` and `doSomethingUsefulTwo` and
Roman Elizarov419a6c82017-02-09 18:36:22 +0300608we want to get the answer faster, by doing both _concurrently_? This is where [async] comes to help.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300609
Roman Elizarov419a6c82017-02-09 18:36:22 +0300610Conceptually, [async] is just like [launch]. It starts a separate coroutine which is a light-weight thread
611that works concurrently with all the other coroutines. The difference is that `launch` returns a [Job] and
612does not carry any resulting value, while `async` returns a [Deferred] -- a light-weight non-blocking future
Roman Elizarov32d95322017-02-09 15:57:31 +0300613that represents a promise to provide a result later. You can use `.await()` on a deferred value to get its eventual result,
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300614but `Deferred` is also a `Job`, so you can cancel it if needed.
615
616```kotlin
617fun main(args: Array<String>) = runBlocking<Unit> {
618 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300619 val one = async(CommonPool) { doSomethingUsefulOne() }
620 val two = async(CommonPool) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300621 println("The answer is ${one.await() + two.await()}")
622 }
623 println("Completed in $time ms")
624}
625```
626
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300627> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-02.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300628
629It produces something like this:
630
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300631```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300632The answer is 42
633Completed in 1017 ms
634```
635
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300636<!--- TEST FLEXIBLE_TIME -->
637
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300638This is twice as fast, because we have concurrent execution of two coroutines.
639Note, that concurrency with coroutines is always explicit.
640
Roman Elizarov32d95322017-02-09 15:57:31 +0300641### Lazily started async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300642
Roman Elizarov419a6c82017-02-09 18:36:22 +0300643There is a laziness option to [async] with `start = false` parameter.
644It starts coroutine only when its result is needed by some
645[await][Deferred.await] or if a [start][Job.start] function
Roman Elizarov32d95322017-02-09 15:57:31 +0300646is invoked. Run the following example that differs from the previous one only by this option:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300647
648```kotlin
649fun main(args: Array<String>) = runBlocking<Unit> {
650 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300651 val one = async(CommonPool, start = false) { doSomethingUsefulOne() }
652 val two = async(CommonPool, start = false) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300653 println("The answer is ${one.await() + two.await()}")
654 }
655 println("Completed in $time ms")
656}
657```
658
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300659> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-03.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300660
661It produces something like this:
662
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300663```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300664The answer is 42
665Completed in 2017 ms
666```
667
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300668<!--- TEST FLEXIBLE_TIME -->
669
Roman Elizarov32d95322017-02-09 15:57:31 +0300670So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
671for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
672the standard `lazy` function in cases when computation of the value involves suspending functions.
673
674### Async-style functions
675
676We can define async-style functions that invoke `doSomethingUsefulOne` and `doSomethingUsefulTwo`
Roman Elizarov419a6c82017-02-09 18:36:22 +0300677_asynchronously_ using [async] coroutine builder. It is a good style to name such functions with
Roman Elizarov32d95322017-02-09 15:57:31 +0300678either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous
679computation and one needs to use the resulting deferred value to get the result.
680
681```kotlin
682// The result type of asyncSomethingUsefulOne is Deferred<Int>
683fun asyncSomethingUsefulOne() = async(CommonPool) {
684 doSomethingUsefulOne()
685}
686
687// The result type of asyncSomethingUsefulTwo is Deferred<Int>
688fun asyncSomethingUsefulTwo() = async(CommonPool) {
689 doSomethingUsefulTwo()
690}
691```
692
693Note, that these `asyncXXX` function are **not** _suspending_ functions. They can be used from anywhere.
694However, their use always implies asynchronous (here meaning _concurrent_) execution of their action
695with the invoking code.
696
697The following example shows their use outside of coroutine:
698
699```kotlin
700// note, that we don't have `runBlocking` to the right of `main` in this example
701fun main(args: Array<String>) {
702 val time = measureTimeMillis {
703 // we can initiate async actions outside of a coroutine
704 val one = asyncSomethingUsefulOne()
705 val two = asyncSomethingUsefulTwo()
706 // but waiting for a result must involve either suspending or blocking.
707 // here we use `runBlocking { ... }` to block the main thread while waiting for the result
708 runBlocking {
709 println("The answer is ${one.await() + two.await()}")
710 }
711 }
712 println("Completed in $time ms")
713}
714```
715
716> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300717
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300718<!--- TEST FLEXIBLE_TIME
719The answer is 42
720Completed in 1085 ms
721-->
722
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300723## Coroutine context and dispatchers
724
Roman Elizarov32d95322017-02-09 15:57:31 +0300725We've already seen `launch(CommonPool) {...}`, `async(CommonPool) {...}`, `run(NonCancellable) {...}`, etc.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300726In these code snippets [CommonPool] and [NonCancellable] are _coroutine contexts_.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300727This section covers other available choices.
728
729### Dispatchers and threads
730
Roman Elizarov419a6c82017-02-09 18:36:22 +0300731Coroutine context includes a [_coroutine dispatcher_][CoroutineDispatcher] which determines what thread or threads
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300732the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
733to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:
734
735```kotlin
736fun main(args: Array<String>) = runBlocking<Unit> {
737 val jobs = arrayListOf<Job>()
738 jobs += launch(Unconfined) { // not confined -- will work with main thread
739 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
740 }
741 jobs += launch(context) { // context of the parent, runBlocking coroutine
742 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
743 }
744 jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
745 println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
746 }
747 jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
748 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
749 }
750 jobs.forEach { it.join() }
751}
752```
753
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300754> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-01.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300755
756It produces the following output (maybe in different order):
757
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300758```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300759 'Unconfined': I'm working in thread main
760 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
761 'newSTC': I'm working in thread MyOwnThread
762 'context': I'm working in thread main
763```
764
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300765<!--- TEST LINES_START_UNORDERED -->
766
Roman Elizarov419a6c82017-02-09 18:36:22 +0300767The difference between parent [context][CoroutineScope.context] and [Unconfined] context will be shown later.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300768
769### Unconfined vs confined dispatcher
770
Roman Elizarov419a6c82017-02-09 18:36:22 +0300771The [Unconfined] coroutine dispatcher starts coroutine in the caller thread, but only until the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300772first suspension point. After suspension it resumes in the thread that is fully determined by the
773suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not
774consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
775
Roman Elizarov419a6c82017-02-09 18:36:22 +0300776On the other side, [context][CoroutineScope.context] property that is available inside the block of any coroutine
777via [CoroutineScope] interface, is a reference to a context of this particular coroutine.
778This way, a parent context can be inherited. The default context of [runBlocking], in particular,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300779is confined to be invoker thread, so inheriting it has the effect of confining execution to
780this thread with a predictable FIFO scheduling.
781
782```kotlin
783fun main(args: Array<String>) = runBlocking<Unit> {
784 val jobs = arrayListOf<Job>()
785 jobs += launch(Unconfined) { // not confined -- will work with main thread
786 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
787 delay(1000)
788 println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
789 }
790 jobs += launch(context) { // context of the parent, runBlocking coroutine
791 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
792 delay(1000)
793 println(" 'context': After delay in thread ${Thread.currentThread().name}")
794 }
795 jobs.forEach { it.join() }
796}
797```
798
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300799> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-contest-02.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300800
801Produces the output:
802
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300803```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300804 'Unconfined': I'm working in thread main
805 'context': I'm working in thread main
806 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
807 'context': After delay in thread main
808```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300809
810<!--- TEST LINES_START -->
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300811
Roman Elizarov7c864d82017-02-27 10:17:50 +0300812So, the coroutine that had inherited `context` of `runBlocking {...}` continues to execute in the `main` thread,
Roman Elizarov419a6c82017-02-09 18:36:22 +0300813while the unconfined one had resumed in the scheduler thread that [delay] function is using.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300814
815### Debugging coroutines and threads
816
Roman Elizarov419a6c82017-02-09 18:36:22 +0300817Coroutines can suspend on one thread and resume on another thread with [Unconfined] dispatcher or
818with a multi-threaded dispatcher like [CommonPool]. Even with a single-threaded dispatcher it might be hard to
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300819figure out what coroutine was doing what, where, and when. The common approach to debugging applications with
820threads is to print the thread name in the log file on each log statement. This feature is universally supported
821by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
822`kotlinx.coroutines` includes debugging facilities to make it easier.
823
824Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
825
826```kotlin
827fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
828
829fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov32d95322017-02-09 15:57:31 +0300830 val a = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300831 log("I'm computing a piece of the answer")
832 6
833 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300834 val b = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300835 log("I'm computing another piece of the answer")
836 7
837 }
838 log("The answer is ${a.await() * b.await()}")
839}
840```
841
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300842> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300843
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300844There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300845and two coroutines computing deferred values `a` (#2) and `b` (#3).
846They are all executing in the context of `runBlocking` and are confined to the main thread.
847The output of this code is:
848
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300849```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300850[main @coroutine#2] I'm computing a piece of the answer
851[main @coroutine#3] I'm computing another piece of the answer
852[main @coroutine#1] The answer is 42
853```
854
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300855<!--- TEST -->
856
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300857The `log` function prints the name of the thread in square brackets and you can see, that it is the `main`
858thread, but the identifier of the currently executing coroutine is appended to it. This identifier
859is consecutively assigned to all created coroutines when debugging mode is turned on.
860
Roman Elizarov419a6c82017-02-09 18:36:22 +0300861You can read more about debugging facilities in the documentation for [newCoroutineContext] function.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300862
863### Jumping between threads
864
865Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
866
867```kotlin
868fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
869
870fun main(args: Array<String>) {
871 val ctx1 = newSingleThreadContext("Ctx1")
872 val ctx2 = newSingleThreadContext("Ctx2")
873 runBlocking(ctx1) {
874 log("Started in ctx1")
875 run(ctx2) {
876 log("Working in ctx2")
877 }
878 log("Back to ctx1")
879 }
880}
881```
882
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300883> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300884
Roman Elizarov419a6c82017-02-09 18:36:22 +0300885It demonstrates two new techniques. One is using [runBlocking] with an explicitly specified context, and
886the second one is using [run] function to change a context of a coroutine while still staying in the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300887same coroutine as you can see in the output below:
888
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300889```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300890[Ctx1 @coroutine#1] Started in ctx1
891[Ctx2 @coroutine#1] Working in ctx2
892[Ctx1 @coroutine#1] Back to ctx1
893```
894
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300895<!--- TEST -->
896
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300897### Job in the context
898
Roman Elizarov419a6c82017-02-09 18:36:22 +0300899The coroutine [Job] is part of its context. The coroutine can retrieve it from its own context
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300900using `context[Job]` expression:
901
902```kotlin
903fun main(args: Array<String>) = runBlocking<Unit> {
904 println("My job is ${context[Job]}")
905}
906```
907
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300908> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-05.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300909
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300910It produces somethine like
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300911
912```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300913My job is BlockingCoroutine{Active}@65ae6ba4
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300914```
915
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300916<!--- TEST lines.size == 1 && lines[0].startsWith("My job is BlockingCoroutine{Active}@") -->
917
Roman Elizarov419a6c82017-02-09 18:36:22 +0300918So, [isActive][CoroutineScope.isActive] in [CoroutineScope] is just a convenient shortcut for `context[Job]!!.isActive`.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300919
920### Children of a coroutine
921
Roman Elizarov419a6c82017-02-09 18:36:22 +0300922When [context][CoroutineScope.context] of a coroutine is used to launch another coroutine,
923the [Job] of the new coroutine becomes
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300924a _child_ of the parent coroutine's job. When the parent coroutine is cancelled, all its children
925are recursively cancelled, too.
926
927```kotlin
928fun main(args: Array<String>) = runBlocking<Unit> {
929 // start a coroutine to process some kind of incoming request
930 val request = launch(CommonPool) {
931 // it spawns two other jobs, one with its separate context
932 val job1 = launch(CommonPool) {
933 println("job1: I have my own context and execute independently!")
934 delay(1000)
935 println("job1: I am not affected by cancellation of the request")
936 }
937 // and the other inherits the parent context
938 val job2 = launch(context) {
939 println("job2: I am a child of the request coroutine")
940 delay(1000)
941 println("job2: I will not execute this line if my parent request is cancelled")
942 }
943 // request completes when both its sub-jobs complete:
944 job1.join()
945 job2.join()
946 }
947 delay(500)
948 request.cancel() // cancel processing of the request
949 delay(1000) // delay a second to see what happens
950 println("main: Who has survived request cancellation?")
951}
952```
953
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300954> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-06.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300955
956The output of this code is:
957
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300958```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300959job1: I have my own context and execute independently!
960job2: I am a child of the request coroutine
961job1: I am not affected by cancellation of the request
962main: Who has survived request cancellation?
963```
964
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300965<!--- TEST -->
966
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300967### Combining contexts
968
969Coroutine context can be combined using `+` operator. The context on the right-hand side replaces relevant entries
Roman Elizarov419a6c82017-02-09 18:36:22 +0300970of the context on the left-hand side. For example, a [Job] of the parent coroutine can be inherited, while
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300971its dispatcher replaced:
972
973```kotlin
974fun main(args: Array<String>) = runBlocking<Unit> {
975 // start a coroutine to process some kind of incoming request
976 val request = launch(context) { // use the context of `runBlocking`
977 // spawns CPU-intensive child job in CommonPool !!!
978 val job = launch(context + CommonPool) {
979 println("job: I am a child of the request coroutine, but with a different dispatcher")
980 delay(1000)
981 println("job: I will not execute this line if my parent request is cancelled")
982 }
983 job.join() // request completes when its sub-job completes
984 }
985 delay(500)
986 request.cancel() // cancel processing of the request
987 delay(1000) // delay a second to see what happens
988 println("main: Who has survived request cancellation?")
989}
990```
991
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300992> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-07.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300993
994The expected outcome of this code is:
995
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300996```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300997job: I am a child of the request coroutine, but with a different dispatcher
998main: Who has survived request cancellation?
999```
1000
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001001<!--- TEST -->
1002
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001003### Naming coroutines for debugging
1004
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001005Automatically assigned ids are good when coroutines log often and you just need to correlate log records
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001006coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
1007or doing some specific background task, it is better to name it explicitly for debugging purposes.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001008[CoroutineName] serves the same function as a thread name. It'll get displayed in the thread name that
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001009is executing this coroutine when debugging more is turned on.
1010
1011The following example demonstrates this concept:
1012
1013```kotlin
1014fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
1015
1016fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
1017 log("Started main coroutine")
1018 // run two background value computations
Roman Elizarov32d95322017-02-09 15:57:31 +03001019 val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001020 log("Computing v1")
1021 delay(500)
1022 252
1023 }
Roman Elizarov32d95322017-02-09 15:57:31 +03001024 val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001025 log("Computing v2")
1026 delay(1000)
1027 6
1028 }
1029 log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
1030}
1031```
1032
Roman Elizarovfa7723e2017-02-06 11:17:51 +03001033> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001034
1035The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
1036
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001037```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001038[main @main#1] Started main coroutine
1039[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
1040[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
1041[main @main#1] The answer for v1 / v2 = 42
1042```
Roman Elizarov1293ccd2017-02-01 18:49:54 +03001043
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001044<!--- TEST FLEXIBLE_THREAD -->
1045
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001046### Cancellation via explicit job
1047
1048Let us put our knowledge about contexts, children and jobs together. Assume that our application has
1049an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application
1050and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch
1051and update data, do animations, etc. All of these coroutines must be cancelled when activity is destroyed
1052to avoid memory leaks.
1053
1054We can manage a lifecycle of our coroutines by creating an instance of [Job] that is tied to
1055the lifecycle of our activity. A job instance is created using [Job()][Job.invoke] factory function
1056as the following example shows. We need to make sure that all the coroutines are started
1057with this job in their context and then a single invocation of [Job.cancel] terminates them all.
1058
1059```kotlin
1060fun main(args: Array<String>) = runBlocking<Unit> {
1061 val job = Job() // create a job object to manage our lifecycle
1062 // now launch ten coroutines for a demo, each working for a different time
1063 val coroutines = List(10) { i ->
1064 // they are all children of our job object
1065 launch(context + job) { // we use the context of main runBlocking thread, but with our own job object
1066 delay(i * 200L) // variable delay 0ms, 200ms, 400ms, ... etc
1067 println("Coroutine $i is done")
1068 }
1069 }
1070 println("Launched ${coroutines.size} coroutines")
1071 delay(500L) // delay for half a second
1072 println("Cancelling job!")
1073 job.cancel() // cancel our job.. !!!
1074 delay(1000L) // delay for more to see if our coroutines are still working
1075}
1076```
1077
1078> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt)
1079
1080The output of this example is:
1081
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001082```text
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001083Launched 10 coroutines
1084Coroutine 0 is done
1085Coroutine 1 is done
1086Coroutine 2 is done
1087Cancelling job!
1088```
1089
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001090<!--- TEST -->
1091
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001092As you can see, only the first three coroutines had printed a message and the others were cancelled
1093by a single invocation of `job.cancel()`. So all we need to do in our hypothetical Android
1094application is to create a parent job object when activity is created, use it for child coroutines,
1095and cancel it when activity is destroyed.
1096
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001097## Channels
Roman Elizarov7deefb82017-01-31 10:33:17 +03001098
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001099Deferred values provide a convenient way to transfer a single value between coroutines.
1100Channels provide a way to transfer a stream of values.
1101
1102<!--- INCLUDE .*/example-channel-([0-9]+).kt
1103import kotlinx.coroutines.experimental.channels.*
1104-->
1105
1106### Channel basics
1107
Roman Elizarov419a6c82017-02-09 18:36:22 +03001108A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
1109instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
1110a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001111
1112```kotlin
1113fun main(args: Array<String>) = runBlocking<Unit> {
1114 val channel = Channel<Int>()
1115 launch(CommonPool) {
1116 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
1117 for (x in 1..5) channel.send(x * x)
1118 }
1119 // here we print five received integers:
1120 repeat(5) { println(channel.receive()) }
1121 println("Done!")
1122}
1123```
1124
1125> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
1126
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001127The output of this code is:
1128
1129```text
11301
11314
11329
113316
113425
1135Done!
1136```
1137
1138<!--- TEST -->
1139
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001140### Closing and iteration over channels
1141
1142Unlike a queue, a channel can be closed to indicate that no more elements are coming.
1143On the receiver side it is convenient to use a regular `for` loop to receive elements
1144from the channel.
1145
Roman Elizarov419a6c82017-02-09 18:36:22 +03001146Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001147The iteration stops as soon as this close token is received, so there is a guarantee
1148that all previously sent elements before the close are received:
1149
1150```kotlin
1151fun main(args: Array<String>) = runBlocking<Unit> {
1152 val channel = Channel<Int>()
1153 launch(CommonPool) {
1154 for (x in 1..5) channel.send(x * x)
1155 channel.close() // we're done sending
1156 }
1157 // here we print received values using `for` loop (until the channel is closed)
1158 for (y in channel) println(y)
1159 println("Done!")
1160}
1161```
1162
1163> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
1164
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001165<!--- TEST
11661
11674
11689
116916
117025
1171Done!
1172-->
1173
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001174### Building channel producers
1175
Roman Elizarova5e653f2017-02-13 13:49:55 +03001176The pattern where a coroutine is producing a sequence of elements is quite common.
1177This is a part of _producer-consumer_ pattern that is often found in concurrent code.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001178You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
Roman Elizarova5e653f2017-02-13 13:49:55 +03001179to common sense that results must be returned from functions.
1180
1181There is a convenience coroutine builder named [produce] that makes it easy to do it right:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001182
1183```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001184fun produceSquares() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001185 for (x in 1..5) send(x * x)
1186}
1187
1188fun main(args: Array<String>) = runBlocking<Unit> {
1189 val squares = produceSquares()
1190 for (y in squares) println(y)
1191 println("Done!")
1192}
1193```
1194
1195> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
1196
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001197<!--- TEST
11981
11994
12009
120116
120225
1203Done!
1204-->
1205
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001206### Pipelines
1207
1208Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
1209
1210```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001211fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001212 var x = 1
1213 while (true) send(x++) // infinite stream of integers starting from 1
1214}
1215```
1216
Roman Elizarova5e653f2017-02-13 13:49:55 +03001217And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001218In the below example the numbers are just squared:
1219
1220```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001221fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001222 for (x in numbers) send(x * x)
1223}
1224```
1225
Roman Elizarova5e653f2017-02-13 13:49:55 +03001226The main code starts and connects the whole pipeline:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001227
1228```kotlin
1229fun main(args: Array<String>) = runBlocking<Unit> {
1230 val numbers = produceNumbers() // produces integers from 1 and on
1231 val squares = square(numbers) // squares integers
1232 for (i in 1..5) println(squares.receive()) // print first five
1233 println("Done!") // we are done
1234 squares.cancel() // need to cancel these coroutines in a larger app
1235 numbers.cancel()
1236}
1237```
1238
1239> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
1240
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001241<!--- TEST
12421
12434
12449
124516
124625
1247Done!
1248-->
1249
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001250We don't have to cancel these coroutines in this example app, because
1251[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
1252but in a larger app we'll need to stop our pipeline if we don't need it anymore.
1253Alternatively, we could have run pipeline coroutines as
1254[children of a coroutine](#children-of-a-coroutine).
1255
1256### Prime numbers with pipeline
1257
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001258Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001259of coroutines. We start with an infinite sequence of numbers. This time we introduce an
1260explicit context parameter, so that caller can control where our coroutines run:
1261
1262<!--- INCLUDE kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
1263import kotlin.coroutines.experimental.CoroutineContext
1264-->
1265
1266```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001267fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001268 var x = start
1269 while (true) send(x++) // infinite stream of integers from start
1270}
1271```
1272
1273The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1274that are divisible by the given prime number:
1275
1276```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001277fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001278 for (x in numbers) if (x % prime != 0) send(x)
1279}
1280```
1281
1282Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001283and launching new pipeline stage for each prime number found:
1284
1285```
Roman Elizarova5e653f2017-02-13 13:49:55 +03001286numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
Roman Elizarov62500ba2017-02-09 18:55:40 +03001287```
1288
1289The following example prints the first ten prime numbers,
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001290running the whole pipeline in the context of the main thread:
1291
1292```kotlin
1293fun main(args: Array<String>) = runBlocking<Unit> {
1294 var cur = numbersFrom(context, 2)
1295 for (i in 1..10) {
1296 val prime = cur.receive()
1297 println(prime)
1298 cur = filter(context, cur, prime)
1299 }
1300}
1301```
1302
1303> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
1304
1305The output of this code is:
1306
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001307```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +030013082
13093
13105
13117
131211
131313
131417
131519
131623
131729
1318```
1319
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001320<!--- TEST -->
1321
Roman Elizarova5e653f2017-02-13 13:49:55 +03001322Note, that you can build the same pipeline using `buildIterator` coroutine builder from the standard library.
1323Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001324`ReceiveChannel` with `Iterator`, and get rid of the context. You will not need `runBlocking` either.
1325However, the benefit of a pipeline that uses channels as shown above is that it can actually use
1326multiple CPU cores if you run it in [CommonPool] context.
1327
Roman Elizarova5e653f2017-02-13 13:49:55 +03001328Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
Roman Elizarov62500ba2017-02-09 18:55:40 +03001329other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
1330built using `buildSeqeunce`/`buildIterator`, because they do not allow arbitrary suspension, unlike
Roman Elizarova5e653f2017-02-13 13:49:55 +03001331`produce` which is fully asynchronous.
Roman Elizarov62500ba2017-02-09 18:55:40 +03001332
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001333### Fan-out
1334
1335Multiple coroutines may receive from the same channel, distributing work between themselves.
1336Let us start with a producer coroutine that is periodically producing integers
1337(ten numbers per second):
1338
1339```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001340fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001341 var x = 1 // start from 1
1342 while (true) {
1343 send(x++) // produce next
1344 delay(100) // wait 0.1s
1345 }
1346}
1347```
1348
1349Then we can have several processor coroutines. In this example, they just print their id and
1350received number:
1351
1352```kotlin
1353fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
1354 while (true) {
1355 val x = channel.receive()
1356 println("Processor #$id received $x")
1357 }
1358}
1359```
1360
1361Now let us launch five processors and let them work for a second. See what happens:
1362
1363```kotlin
1364fun main(args: Array<String>) = runBlocking<Unit> {
1365 val producer = produceNumbers()
1366 repeat(5) { launchProcessor(it, producer) }
1367 delay(1000)
1368 producer.cancel() // cancel producer coroutine and thus kill them all
1369}
1370```
1371
1372> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
1373
1374The output will be similar to the the following one, albeit the processor ids that receive
1375each specific integer may be different:
1376
1377```
1378Processor #2 received 1
1379Processor #4 received 2
1380Processor #0 received 3
1381Processor #1 received 4
1382Processor #3 received 5
1383Processor #2 received 6
1384Processor #4 received 7
1385Processor #0 received 8
1386Processor #1 received 9
1387Processor #3 received 10
1388```
1389
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001390<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
1391
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001392Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1393over the channel that processor coroutines are doing.
1394
1395### Fan-in
1396
1397Multiple coroutines may send to the same channel.
1398For example, let us have a channel of strings, and a suspending function that
1399repeatedly sends a specified string to this channel with a specified delay:
1400
1401```kotlin
1402suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1403 while (true) {
1404 delay(time)
1405 channel.send(s)
1406 }
1407}
1408```
1409
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001410Now, let us see what happens if we launch a couple of coroutines sending strings
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001411(in this example we launch them in the context of the main thread):
1412
1413```kotlin
1414fun main(args: Array<String>) = runBlocking<Unit> {
1415 val channel = Channel<String>()
1416 launch(context) { sendString(channel, "foo", 200L) }
1417 launch(context) { sendString(channel, "BAR!", 500L) }
1418 repeat(6) { // receive first six
1419 println(channel.receive())
1420 }
1421}
1422```
1423
1424> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
1425
1426The output is:
1427
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001428```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001429foo
1430foo
1431BAR!
1432foo
1433foo
1434BAR!
1435```
1436
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001437<!--- TEST -->
1438
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001439### Buffered channels
1440
1441The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1442meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1443if receive is invoked first, it is suspended until send is invoked.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001444
Roman Elizarova5e653f2017-02-13 13:49:55 +03001445Both [Channel()][Channel.invoke] factory function and [produce] builder take an optional `capacity` parameter to
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001446specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1447similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1448
1449Take a look at the behavior of the following code:
1450
1451```kotlin
1452fun main(args: Array<String>) = runBlocking<Unit> {
1453 val channel = Channel<Int>(4) // create buffered channel
1454 launch(context) { // launch sender coroutine
1455 repeat(10) {
1456 println("Sending $it") // print before sending each element
1457 channel.send(it) // will suspend when buffer is full
1458 }
1459 }
1460 // don't receive anything... just wait....
1461 delay(1000)
1462}
1463```
1464
1465> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
1466
1467It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1468
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001469```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001470Sending 0
1471Sending 1
1472Sending 2
1473Sending 3
1474Sending 4
1475```
1476
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001477<!--- TEST -->
1478
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001479The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001480
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001481
1482### Channels are fair
1483
1484Send and receive operations to channels are _fair_ with respect to the order of their invocation from
1485multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
1486gets the element. In the following example two coroutines "ping" and "pong" are
1487receiving the "ball" object from the shared "table" channel.
1488
1489```kotlin
1490data class Ball(var hits: Int)
1491
1492fun main(args: Array<String>) = runBlocking<Unit> {
1493 val table = Channel<Ball>() // a shared table
1494 launch(context) { player("ping", table) }
1495 launch(context) { player("pong", table) }
1496 table.send(Ball(0)) // serve the ball
1497 delay(1000) // delay 1 second
1498 table.receive() // game over, grab the ball
1499}
1500
1501suspend fun player(name: String, table: Channel<Ball>) {
1502 for (ball in table) { // receive the ball in a loop
1503 ball.hits++
1504 println("$name $ball")
1505 delay(200) // wait a bit
1506 table.send(ball) // send the ball back
1507 }
1508}
1509```
1510
1511> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-09.kt)
1512
1513The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
1514coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
1515received by the "pong" coroutine, because it was already waiting for it:
1516
1517```text
1518ping Ball(hits=1)
1519pong Ball(hits=2)
1520ping Ball(hits=3)
1521pong Ball(hits=4)
1522ping Ball(hits=5)
1523pong Ball(hits=6)
1524```
1525
1526<!--- TEST -->
1527
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001528## Shared mutable state and concurrency
1529
1530Coroutines can be executed concurrently using a multi-threaded dispatcher like [CommonPool]. It presents
1531all the usual concurrency problems. The main problem being synchronization of access to **shared mutable state**.
1532Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
1533but others are unique.
1534
1535### The problem
1536
Roman Elizarov1e459602017-02-27 11:05:17 +03001537Let us launch a thousand coroutines all doing the same action thousand times (for a total of a million executions).
1538We'll also measure their completion time for further comparisons:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001539
1540<!--- INCLUDE .*/example-sync-([0-9]+).kt
Roman Elizarov1e459602017-02-27 11:05:17 +03001541import kotlin.coroutines.experimental.CoroutineContext
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001542import kotlin.system.measureTimeMillis
1543-->
1544
Roman Elizarov1e459602017-02-27 11:05:17 +03001545<!--- INCLUDE .*/example-sync-03.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001546import java.util.concurrent.atomic.AtomicInteger
1547-->
1548
Roman Elizarov1e459602017-02-27 11:05:17 +03001549<!--- INCLUDE .*/example-sync-06.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001550import kotlinx.coroutines.experimental.sync.Mutex
1551-->
1552
Roman Elizarov1e459602017-02-27 11:05:17 +03001553<!--- INCLUDE .*/example-sync-07.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001554import kotlinx.coroutines.experimental.channels.*
1555-->
1556
1557```kotlin
Roman Elizarov1e459602017-02-27 11:05:17 +03001558suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
1559 val n = 1000 // number of coroutines to launch
1560 val k = 1000 // times an action is repeated by each coroutine
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001561 val time = measureTimeMillis {
1562 val jobs = List(n) {
Roman Elizarov1e459602017-02-27 11:05:17 +03001563 launch(context) {
1564 repeat(k) { action() }
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001565 }
1566 }
1567 jobs.forEach { it.join() }
1568 }
Roman Elizarov1e459602017-02-27 11:05:17 +03001569 println("Completed ${n * k} actions in $time ms")
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001570}
1571```
1572
1573<!--- INCLUDE .*/example-sync-([0-9]+).kt -->
1574
Roman Elizarov1e459602017-02-27 11:05:17 +03001575We start with a very simple action that increments a shared mutable variable using
1576multi-threaded [CommonPool] context.
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001577
1578```kotlin
1579var counter = 0
1580
1581fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001582 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001583 counter++
1584 }
1585 println("Counter = $counter")
1586}
1587```
1588
1589> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt)
1590
Roman Elizarov1e459602017-02-27 11:05:17 +03001591<!--- TEST LINES_START
1592Completed 1000000 actions in
1593Counter =
1594-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001595
Roman Elizarov1e459602017-02-27 11:05:17 +03001596What does it print at the end? It is highly unlikely to ever print "Counter = 1000000", because a thousand coroutines
1597increment the `counter` concurrently from multiple threads without any synchronization.
1598
1599### Volatiles are of no help
1600
1601There is common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
1602
1603```kotlin
1604@Volatile // in Kotlin `volatile` is an annotation
1605var counter = 0
1606
1607fun main(args: Array<String>) = runBlocking<Unit> {
1608 massiveRun(CommonPool) {
1609 counter++
1610 }
1611 println("Counter = $counter")
1612}
1613```
1614
1615> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt)
1616
1617<!--- TEST LINES_START
1618Completed 1000000 actions in
1619Counter =
1620-->
1621
1622This code works slower, but we still don't get "Counter = 1000000" at the end, because volatile variables guarantee
1623linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but
1624do not provide atomicity of larger actions (increment in our case).
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001625
1626### Thread-safe data structures
1627
1628The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
1629linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
1630operations that needs to be performed on a shared state.
Roman Elizarov1e459602017-02-27 11:05:17 +03001631In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001632
1633```kotlin
1634var counter = AtomicInteger()
1635
1636fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001637 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001638 counter.incrementAndGet()
1639 }
1640 println("Counter = ${counter.get()}")
1641}
1642```
1643
Roman Elizarov1e459602017-02-27 11:05:17 +03001644> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001645
Roman Elizarov1e459602017-02-27 11:05:17 +03001646<!--- TEST ARBITRARY_TIME
1647Completed 1000000 actions in xxx ms
1648Counter = 1000000
1649-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001650
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001651This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
1652standard data structures and basic operations on them. However, it does not easily scale to complex
1653state or to complex operations that do not have ready-to-use thread-safe implementations.
1654
Roman Elizarov1e459602017-02-27 11:05:17 +03001655### Thread confinement fine-grained
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001656
Roman Elizarov1e459602017-02-27 11:05:17 +03001657_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001658state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
1659the single event-dispatch/application thread. It is easy to apply with coroutines by using a
1660single-threaded context:
1661
1662```kotlin
1663val counterContext = newSingleThreadContext("CounterContext")
1664var counter = 0
1665
1666fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001667 massiveRun(CommonPool) { // run each coroutine in CommonPool
1668 run(counterContext) { // but confine each increment to the single-threaded context
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001669 counter++
1670 }
1671 }
1672 println("Counter = $counter")
1673}
1674```
1675
Roman Elizarov1e459602017-02-27 11:05:17 +03001676> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001677
Roman Elizarov1e459602017-02-27 11:05:17 +03001678<!--- TEST ARBITRARY_TIME
1679Completed 1000000 actions in xxx ms
1680Counter = 1000000
1681-->
1682
1683This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches
1684from multi-threaded `CommonPool` context to the single-threaded context using [run] block.
1685
1686### Thread confinement coarse-grained
1687
1688In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic
1689are confined to the single thread. The following example does it like that, running each coroutine in
1690the single-threaded context to start with.
1691
1692```kotlin
1693val counterContext = newSingleThreadContext("CounterContext")
1694var counter = 0
1695
1696fun main(args: Array<String>) = runBlocking<Unit> {
1697 massiveRun(counterContext) { // run each coroutine in the single-threaded context
1698 counter++
1699 }
1700 println("Counter = $counter")
1701}
1702```
1703
1704> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt)
1705
1706<!--- TEST ARBITRARY_TIME
1707Completed 1000000 actions in xxx ms
1708Counter = 1000000
1709-->
1710
1711This now works much faster and produces correct result.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001712
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001713### Mutual exclusion
1714
1715Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
1716that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
1717Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
1718delimit a critical section. The key difference is that `Mutex.lock` is a suspending function. It does not block a thread.
1719
1720```kotlin
1721val mutex = Mutex()
1722var counter = 0
1723
1724fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001725 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001726 mutex.lock()
1727 try { counter++ }
1728 finally { mutex.unlock() }
1729 }
1730 println("Counter = $counter")
1731}
1732```
1733
Roman Elizarov1e459602017-02-27 11:05:17 +03001734> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001735
Roman Elizarov1e459602017-02-27 11:05:17 +03001736<!--- TEST ARBITRARY_TIME
1737Completed 1000000 actions in xxx ms
1738Counter = 1000000
1739-->
1740
1741The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations
1742where you absolutely must modify some shared state periodically, but there is no natural thread that this state
1743is confined to.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001744
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001745### Actors
1746
1747An actor is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine,
1748and a channel to communicate with other coroutines. A simple actor can be written as a function,
1749but an actor with a complex state is better suited for a class.
1750
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001751There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its
1752scope to receive messages from and combines the send channel into the resulting job object, so that a
1753single reference to the actor can be carried around as its handle.
1754
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001755```kotlin
1756// Message types for counterActor
1757sealed class CounterMsg
1758object IncCounter : CounterMsg() // one-way message to increment counter
1759class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply
1760
1761// This function launches a new counter actor
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001762fun counterActor() = actor<CounterMsg>(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001763 var counter = 0 // actor state
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001764 for (msg in channel) { // iterate over incoming messages
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001765 when (msg) {
1766 is IncCounter -> counter++
1767 is GetCounter -> msg.response.send(counter)
1768 }
1769 }
1770}
1771
1772fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001773 val counter = counterActor() // create the actor
Roman Elizarov1e459602017-02-27 11:05:17 +03001774 massiveRun(CommonPool) {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001775 counter.send(IncCounter)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001776 }
1777 val response = Channel<Int>()
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001778 counter.send(GetCounter(response))
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001779 println("Counter = ${response.receive()}")
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001780 counter.close() // shutdown the actor
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001781}
1782```
1783
Roman Elizarov1e459602017-02-27 11:05:17 +03001784> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001785
Roman Elizarov1e459602017-02-27 11:05:17 +03001786<!--- TEST ARBITRARY_TIME
1787Completed 1000000 actions in xxx ms
1788Counter = 1000000
1789-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001790
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001791It does not matter (for correctness) what context the actor itself is executed in. An actor is
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001792a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
1793works as a solution to the problem of shared mutable state.
1794
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001795Actor is more efficient than locking under load, because in this case it always has work to do and it does not
1796have to switch to a different context at all.
1797
1798> Note, that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated
1799 with the channel that it receives messages from, while a producer is associated with the channel that it
1800 sends elements to.
Roman Elizarov1e459602017-02-27 11:05:17 +03001801
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001802## Select expression
1803
Roman Elizarova84730b2017-02-22 11:58:50 +03001804Select expression makes it possible to await multiple suspending functions simultaneously and _select_
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001805the first one that becomes available.
1806
1807<!--- INCLUDE .*/example-select-([0-9]+).kt
1808import kotlinx.coroutines.experimental.channels.*
1809import kotlinx.coroutines.experimental.selects.*
1810-->
1811
1812### Selecting from channels
1813
1814Let us have two channels of strings `fizz` and `buzz`. The `fizz` channel produces "Fizz" string every 300 ms:
1815
1816```kotlin
1817val fizz = produce<String>(CommonPool) { // produce using common thread pool
1818 while (true) {
1819 delay(300)
1820 send("Fizz")
1821 }
1822}
1823```
1824
1825And the `buzz` channel produces "Buzz!" string every 500 ms:
1826
1827```kotlin
1828val buzz = produce<String>(CommonPool) {
1829 while (true) {
1830 delay(500)
1831 send("Buzz!")
1832 }
1833}
1834```
1835
1836Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the
1837other. But [select] expression allows us to receive from _both_ simultaneously using its
1838[onReceive][SelectBuilder.onReceive] clauses:
1839
1840```kotlin
1841suspend fun selectFizzBuzz() {
1842 select<Unit> { // <Unit> means that this select expression does not produce any result
1843 fizz.onReceive { value -> // this is the first select clause
1844 println("fizz -> '$value'")
1845 }
1846 buzz.onReceive { value -> // this is the second select clause
1847 println("buzz -> '$value'")
1848 }
1849 }
1850}
1851```
1852
Roman Elizarova84730b2017-02-22 11:58:50 +03001853Let us run it seven times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001854
1855```kotlin
1856fun main(args: Array<String>) = runBlocking<Unit> {
1857 repeat(7) {
1858 selectFizzBuzz()
1859 }
1860}
1861```
1862
1863> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt)
1864
1865The result of this code is:
1866
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001867```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001868fizz -> 'Fizz'
1869buzz -> 'Buzz!'
1870fizz -> 'Fizz'
1871fizz -> 'Fizz'
1872buzz -> 'Buzz!'
1873fizz -> 'Fizz'
1874buzz -> 'Buzz!'
1875```
1876
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001877<!--- TEST -->
1878
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001879### Selecting on close
1880
1881The [onReceive][SelectBuilder.onReceive] clause in `select` fails when the channel is closed and the corresponding
1882`select` throws an exception. We can use [onReceiveOrNull][SelectBuilder.onReceiveOrNull] clause to perform a
Roman Elizarova84730b2017-02-22 11:58:50 +03001883specific action when the channel is closed. The following example also shows that `select` is an expression that returns
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001884the result of its selected clause:
1885
1886```kotlin
1887suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
1888 select<String> {
1889 a.onReceiveOrNull { value ->
1890 if (value == null)
1891 "Channel 'a' is closed"
1892 else
1893 "a -> '$value'"
1894 }
1895 b.onReceiveOrNull { value ->
1896 if (value == null)
1897 "Channel 'b' is closed"
1898 else
1899 "b -> '$value'"
1900 }
1901 }
1902```
1903
Roman Elizarova84730b2017-02-22 11:58:50 +03001904Let's use it with channel `a` that produces "Hello" string four times and
1905channel `b` that produces "World" four times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001906
1907```kotlin
1908fun main(args: Array<String>) = runBlocking<Unit> {
1909 // we are using the context of the main thread in this example for predictability ...
1910 val a = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001911 repeat(4) { send("Hello $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001912 }
1913 val b = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001914 repeat(4) { send("World $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001915 }
1916 repeat(8) { // print first eight results
1917 println(selectAorB(a, b))
1918 }
1919}
1920```
1921
1922> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt)
1923
Roman Elizarova84730b2017-02-22 11:58:50 +03001924The result of this code is quite interesting, so we'll analyze it in mode detail:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001925
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001926```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001927a -> 'Hello 0'
1928a -> 'Hello 1'
1929b -> 'World 0'
1930a -> 'Hello 2'
1931a -> 'Hello 3'
1932b -> 'World 1'
1933Channel 'a' is closed
1934Channel 'a' is closed
1935```
1936
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001937<!--- TEST -->
1938
Roman Elizarova84730b2017-02-22 11:58:50 +03001939There are couple of observations to make out of it.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001940
1941First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time,
1942the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel,
Roman Elizarova84730b2017-02-22 11:58:50 +03001943being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001944time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
1945
1946The second observation, is that [onReceiveOrNull][SelectBuilder.onReceiveOrNull] gets immediately selected when the
1947channel is already closed.
1948
1949### Selecting to send
1950
1951Select expression has [onSend][SelectBuilder.onSend] clause that can be used for a great good in combination
1952with a biased nature of selection.
1953
Roman Elizarova84730b2017-02-22 11:58:50 +03001954Let us write an example of producer of integers that sends its values to a `side` channel when
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001955the consumers on its primary channel cannot keep up with it:
1956
1957```kotlin
1958fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
1959 for (num in 1..10) { // produce 10 numbers from 1 to 10
1960 delay(100) // every 100 ms
1961 select<Unit> {
Roman Elizarova84730b2017-02-22 11:58:50 +03001962 onSend(num) {} // Send to the primary channel
1963 side.onSend(num) {} // or to the side channel
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001964 }
1965 }
1966}
1967```
1968
1969Consumer is going to be quite slow, taking 250 ms to process each number:
1970
1971```kotlin
1972fun main(args: Array<String>) = runBlocking<Unit> {
1973 val side = Channel<Int>() // allocate side channel
1974 launch(context) { // this is a very fast consumer for the side channel
1975 for (num in side) println("Side channel has $num")
1976 }
1977 for (num in produceNumbers(side)) {
1978 println("Consuming $num")
1979 delay(250) // let us digest the consumed number properly, do not hurry
1980 }
1981 println("Done consuming")
1982}
1983```
1984
1985> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt)
1986
1987So let us see what happens:
1988
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001989```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001990Consuming 1
1991Side channel has 2
1992Side channel has 3
1993Consuming 4
1994Side channel has 5
1995Side channel has 6
1996Consuming 7
1997Side channel has 8
1998Side channel has 9
1999Consuming 10
2000Done consuming
2001```
2002
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002003<!--- TEST -->
2004
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002005### Selecting deferred values
2006
Roman Elizarova84730b2017-02-22 11:58:50 +03002007Deferred values can be selected using [onAwait][SelectBuilder.onAwait] clause.
2008Let us start with an async function that returns a deferred string value after
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002009a random delay:
2010
2011<!--- INCLUDE .*/example-select-04.kt
2012import java.util.*
2013-->
2014
2015```kotlin
2016fun asyncString(time: Int) = async(CommonPool) {
2017 delay(time.toLong())
2018 "Waited for $time ms"
2019}
2020```
2021
Roman Elizarova84730b2017-02-22 11:58:50 +03002022Let us start a dozen of them with a random delay.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002023
2024```kotlin
2025fun asyncStringsList(): List<Deferred<String>> {
2026 val random = Random(3)
Roman Elizarova84730b2017-02-22 11:58:50 +03002027 return List(12) { asyncString(random.nextInt(1000)) }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002028}
2029```
2030
Roman Elizarova84730b2017-02-22 11:58:50 +03002031Now the main function awaits for the first of them to complete and counts the number of deferred values
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002032that are still active. Note, that we've used here the fact that `select` expression is a Kotlin DSL,
Roman Elizarova84730b2017-02-22 11:58:50 +03002033so we can provide clauses for it using an arbitrary code. In this case we iterate over a list
2034of deferred values to provide `onAwait` clause for each deferred value.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002035
2036```kotlin
2037fun main(args: Array<String>) = runBlocking<Unit> {
2038 val list = asyncStringsList()
2039 val result = select<String> {
2040 list.withIndex().forEach { (index, deferred) ->
2041 deferred.onAwait { answer ->
2042 "Deferred $index produced answer '$answer'"
2043 }
2044 }
2045 }
2046 println(result)
Roman Elizarov7c864d82017-02-27 10:17:50 +03002047 val countActive = list.count { it.isActive }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002048 println("$countActive coroutines are still active")
2049}
2050```
2051
2052> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-04.kt)
2053
2054The output is:
2055
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002056```text
Roman Elizarova84730b2017-02-22 11:58:50 +03002057Deferred 4 produced answer 'Waited for 128 ms'
Roman Elizarovd4dcbe22017-02-22 09:57:46 +0300205811 coroutines are still active
2059```
2060
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002061<!--- TEST -->
2062
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002063### Switch over a channel of deferred values
2064
Roman Elizarova84730b2017-02-22 11:58:50 +03002065Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
2066deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002067[onReceiveOrNull][SelectBuilder.onReceiveOrNull] and [onAwait][SelectBuilder.onAwait] clauses in the same `select`:
2068
2069```kotlin
2070fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String>(CommonPool) {
Roman Elizarova84730b2017-02-22 11:58:50 +03002071 var current = input.receive() // start with first received deferred value
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002072 while (isActive) { // loop while not cancelled/closed
2073 val next = select<Deferred<String>?> { // return next deferred value from this select or null
2074 input.onReceiveOrNull { update ->
2075 update // replaces next value to wait
2076 }
2077 current.onAwait { value ->
2078 send(value) // send value that current deferred has produced
2079 input.receiveOrNull() // and use the next deferred from the input channel
2080 }
2081 }
2082 if (next == null) {
2083 println("Channel was closed")
2084 break // out of loop
2085 } else {
2086 current = next
2087 }
2088 }
2089}
2090```
2091
2092To test it, we'll use a simple async function that resolves to a specified string after a specified time:
2093
2094```kotlin
2095fun asyncString(str: String, time: Long) = async(CommonPool) {
2096 delay(time)
2097 str
2098}
2099```
2100
2101The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
2102data to it:
2103
2104```kotlin
2105fun main(args: Array<String>) = runBlocking<Unit> {
2106 val chan = Channel<Deferred<String>>() // the channel for test
Roman Elizarova84730b2017-02-22 11:58:50 +03002107 launch(context) { // launch printing coroutine
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002108 for (s in switchMapDeferreds(chan))
2109 println(s) // print each received string
2110 }
2111 chan.send(asyncString("BEGIN", 100))
2112 delay(200) // enough time for "BEGIN" to be produced
2113 chan.send(asyncString("Slow", 500))
Roman Elizarova84730b2017-02-22 11:58:50 +03002114 delay(100) // not enough time to produce slow
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002115 chan.send(asyncString("Replace", 100))
Roman Elizarova84730b2017-02-22 11:58:50 +03002116 delay(500) // give it time before the last one
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002117 chan.send(asyncString("END", 500))
2118 delay(1000) // give it time to process
Roman Elizarova84730b2017-02-22 11:58:50 +03002119 chan.close() // close the channel ...
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002120 delay(500) // and wait some time to let it finish
2121}
2122```
2123
2124> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-05.kt)
2125
2126The result of this code:
2127
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002128```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002129BEGIN
2130Replace
2131END
2132Channel was closed
2133```
2134
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002135<!--- TEST -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002136
Roman Elizarove0c817d2017-02-10 10:22:01 +03002137<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core -->
2138<!--- DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
2139<!--- INDEX kotlinx.coroutines.experimental -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002140[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
2141[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/delay.html
2142[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
2143[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
2144[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-cancellation-exception.html
2145[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002146[CoroutineScope.isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/is-active.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002147[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
2148[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
2149[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
2150[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
2151[async]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/async.html
2152[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002153[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html
2154[Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/start.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002155[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
2156[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002157[CoroutineScope.context]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/context.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002158[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002159[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002160[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002161[Job.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/invoke.html
2162[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/cancel.html
Roman Elizarovf5bc0472017-02-22 11:38:13 +03002163<!--- INDEX kotlinx.coroutines.experimental.sync -->
2164[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002165[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/lock.html
2166[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/unlock.html
Roman Elizarove0c817d2017-02-10 10:22:01 +03002167<!--- INDEX kotlinx.coroutines.experimental.channels -->
Roman Elizarov419a6c82017-02-09 18:36:22 +03002168[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002169[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
2170[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
2171[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html
Roman Elizarova5e653f2017-02-13 13:49:55 +03002172[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002173[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/invoke.html
Roman Elizarovc0e19f82017-02-27 11:59:14 +03002174[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002175<!--- INDEX kotlinx.coroutines.experimental.selects -->
2176[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002177[SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive.html
2178[SelectBuilder.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive-or-null.html
2179[SelectBuilder.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-send.html
2180[SelectBuilder.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-await.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002181<!--- END -->