blob: 602e2a836ea36c13d1b73622b2bd0ae47074212c [file] [log] [blame] [view]
Roman Elizarova5e653f2017-02-13 13:49:55 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9]+)\.kt
2/*
3 * Copyright 2016-2017 JetBrains s.r.o.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
Roman Elizarovf16fd272017-02-07 11:26:00 +030017
Roman Elizarova5e653f2017-02-13 13:49:55 +030018// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
19package guide.$$1.example$$2
Roman Elizarovf16fd272017-02-07 11:26:00 +030020
Roman Elizarova5e653f2017-02-13 13:49:55 +030021import kotlinx.coroutines.experimental.*
Roman Elizarovf16fd272017-02-07 11:26:00 +030022-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +030023<!--- KNIT kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
24<!--- TEST_OUT kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
25// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
26package guide.test
27
28import org.junit.Test
29
30class GuideTest {
31-->
Roman Elizarovf16fd272017-02-07 11:26:00 +030032
Roman Elizarov7deefb82017-01-31 10:33:17 +030033# Guide to kotlinx.coroutines by example
34
35This is a short guide on core features of `kotlinx.coroutines` with a series of examples.
36
Roman Elizarov2a638922017-03-04 10:22:43 +030037## Introduction and setup
38
39Kotlin, as a language, provides only minimal low-level APIs in its standard library to enable various other
40libraries to utilize coroutines. Unlike many other languages with similar capabilities, `async` and `await`
41are not keywords in Kotlin and are not even part of its standard library.
42
43`kotlinx.coroutines` in one such rich library. It contains a number of high-level
44coroutine-enabled primitives that this guide covers, including `async` and `await`.
45You need to add a dependency on `kotlinx-coroutines-core` module as explained
46[here](README.md#using-in-your-projects) to use primitives from this guide in your projects.
47
Roman Elizarov1293ccd2017-02-01 18:49:54 +030048## Table of contents
49
Roman Elizarovfa7723e2017-02-06 11:17:51 +030050<!--- TOC -->
51
Roman Elizarov1293ccd2017-02-01 18:49:54 +030052* [Coroutine basics](#coroutine-basics)
53 * [Your first coroutine](#your-first-coroutine)
54 * [Bridging blocking and non-blocking worlds](#bridging-blocking-and-non-blocking-worlds)
55 * [Waiting for a job](#waiting-for-a-job)
56 * [Extract function refactoring](#extract-function-refactoring)
57 * [Coroutines ARE light-weight](#coroutines-are-light-weight)
58 * [Coroutines are like daemon threads](#coroutines-are-like-daemon-threads)
59* [Cancellation and timeouts](#cancellation-and-timeouts)
60 * [Cancelling coroutine execution](#cancelling-coroutine-execution)
61 * [Cancellation is cooperative](#cancellation-is-cooperative)
62 * [Making computation code cancellable](#making-computation-code-cancellable)
63 * [Closing resources with finally](#closing-resources-with-finally)
64 * [Run non-cancellable block](#run-non-cancellable-block)
65 * [Timeout](#timeout)
66* [Composing suspending functions](#composing-suspending-functions)
67 * [Sequential by default](#sequential-by-default)
Roman Elizarov32d95322017-02-09 15:57:31 +030068 * [Concurrent using async](#concurrent-using-async)
69 * [Lazily started async](#lazily-started-async)
70 * [Async-style functions](#async-style-functions)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030071* [Coroutine context and dispatchers](#coroutine-context-and-dispatchers)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030072 * [Dispatchers and threads](#dispatchers-and-threads)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030073 * [Unconfined vs confined dispatcher](#unconfined-vs-confined-dispatcher)
74 * [Debugging coroutines and threads](#debugging-coroutines-and-threads)
75 * [Jumping between threads](#jumping-between-threads)
76 * [Job in the context](#job-in-the-context)
77 * [Children of a coroutine](#children-of-a-coroutine)
78 * [Combining contexts](#combining-contexts)
79 * [Naming coroutines for debugging](#naming-coroutines-for-debugging)
Roman Elizarov2fd7cb32017-02-11 23:18:59 +030080 * [Cancellation via explicit job](#cancellation-via-explicit-job)
Roman Elizarovb7721cf2017-02-03 19:23:08 +030081* [Channels](#channels)
82 * [Channel basics](#channel-basics)
83 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
84 * [Building channel producers](#building-channel-producers)
85 * [Pipelines](#pipelines)
86 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
87 * [Fan-out](#fan-out)
88 * [Fan-in](#fan-in)
89 * [Buffered channels](#buffered-channels)
Roman Elizarovb0517ba2017-02-27 14:03:14 +030090 * [Channels are fair](#channels-are-fair)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030091* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
92 * [The problem](#the-problem)
Roman Elizarov1e459602017-02-27 11:05:17 +030093 * [Volatiles are of no help](#volatiles-are-of-no-help)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030094 * [Thread-safe data structures](#thread-safe-data-structures)
Roman Elizarov1e459602017-02-27 11:05:17 +030095 * [Thread confinement fine-grained](#thread-confinement-fine-grained)
96 * [Thread confinement coarse-grained](#thread-confinement-coarse-grained)
Roman Elizarovf5bc0472017-02-22 11:38:13 +030097 * [Mutual exclusion](#mutual-exclusion)
98 * [Actors](#actors)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +030099* [Select expression](#select-expression)
100 * [Selecting from channels](#selecting-from-channels)
101 * [Selecting on close](#selecting-on-close)
102 * [Selecting to send](#selecting-to-send)
103 * [Selecting deferred values](#selecting-deferred-values)
104 * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values)
Roman Elizarov8db17332017-03-09 12:40:45 +0300105* [Further reading](#further-reading)
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300106
Roman Elizarova5e653f2017-02-13 13:49:55 +0300107<!--- END_TOC -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300108
109## Coroutine basics
110
111This section covers basic coroutine concepts.
112
113### Your first coroutine
Roman Elizarov7deefb82017-01-31 10:33:17 +0300114
115Run the following code:
116
117```kotlin
118fun main(args: Array<String>) {
119 launch(CommonPool) { // create new coroutine in common thread pool
120 delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
121 println("World!") // print after delay
122 }
123 println("Hello,") // main function continues while coroutine is delayed
124 Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
125}
126```
127
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300128> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300129
130Run this code:
131
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300132```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300133Hello,
134World!
135```
136
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300137<!--- TEST -->
138
Roman Elizarov419a6c82017-02-09 18:36:22 +0300139Essentially, coroutines are light-weight threads.
140They are launched with [launch] _coroutine builder_.
141You can achieve the same result replacing
Roman Elizarov7deefb82017-01-31 10:33:17 +0300142`launch(CommonPool) { ... }` with `thread { ... }` and `delay(...)` with `Thread.sleep(...)`. Try it.
143
144If you start by replacing `launch(CommonPool)` by `thread`, the compiler produces the following error:
145
146```
147Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
148```
149
Roman Elizarov419a6c82017-02-09 18:36:22 +0300150That is because [delay] is a special _suspending function_ that does not block a thread, but _suspends_
Roman Elizarov7deefb82017-01-31 10:33:17 +0300151coroutine and it can be only used from a coroutine.
152
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300153### Bridging blocking and non-blocking worlds
Roman Elizarov7deefb82017-01-31 10:33:17 +0300154
155The first example mixes _non-blocking_ `delay(...)` and _blocking_ `Thread.sleep(...)` in the same
156code of `main` function. It is easy to get lost. Let's cleanly separate blocking and non-blocking
Roman Elizarov419a6c82017-02-09 18:36:22 +0300157worlds by using [runBlocking]:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300158
159```kotlin
160fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
161 launch(CommonPool) { // create new coroutine in common thread pool
162 delay(1000L)
163 println("World!")
164 }
165 println("Hello,") // main coroutine continues while child is delayed
166 delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
167}
168```
169
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300170> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300171
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300172<!--- TEST
173Hello,
174World!
175-->
176
Roman Elizarov419a6c82017-02-09 18:36:22 +0300177The result is the same, but this code uses only non-blocking [delay].
Roman Elizarov7deefb82017-01-31 10:33:17 +0300178
179`runBlocking { ... }` works as an adaptor that is used here to start the top-level main coroutine.
180The regular code outside of `runBlocking` _blocks_, until the coroutine inside `runBlocking` is active.
181
182This is also a way to write unit-tests for suspending functions:
183
184```kotlin
185class MyTest {
186 @Test
187 fun testMySuspendingFunction() = runBlocking<Unit> {
188 // here we can use suspending functions using any assertion style that we like
189 }
190}
191```
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300192
193<!--- CLEAR -->
Roman Elizarov7deefb82017-01-31 10:33:17 +0300194
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300195### Waiting for a job
Roman Elizarov7deefb82017-01-31 10:33:17 +0300196
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300197Delaying for a time while another coroutine is working is not a good approach. Let's explicitly
Roman Elizarov419a6c82017-02-09 18:36:22 +0300198wait (in a non-blocking way) until the background [Job] that we have launched is complete:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300199
200```kotlin
201fun main(args: Array<String>) = runBlocking<Unit> {
202 val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
203 delay(1000L)
204 println("World!")
205 }
206 println("Hello,")
207 job.join() // wait until child coroutine completes
208}
209```
210
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300211> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300212
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300213<!--- TEST
214Hello,
215World!
216-->
217
Roman Elizarov7deefb82017-01-31 10:33:17 +0300218Now the result is still the same, but the code of the main coroutine is not tied to the duration of
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300219the background job in any way. Much better.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300220
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300221### Extract function refactoring
Roman Elizarov7deefb82017-01-31 10:33:17 +0300222
223Let's extract the block of code inside `launch(CommonPool} { ... }` into a separate function. When you
224perform "Extract function" refactoring on this code you get a new function with `suspend` modifier.
225That is your first _suspending function_. Suspending functions can be used inside coroutines
226just like regular functions, but their additional feature is that they can, in turn,
227use other suspending functions, like `delay` in this example, to _suspend_ execution of a coroutine.
228
229```kotlin
230fun main(args: Array<String>) = runBlocking<Unit> {
231 val job = launch(CommonPool) { doWorld() }
232 println("Hello,")
233 job.join()
234}
235
236// this is your first suspending function
237suspend fun doWorld() {
238 delay(1000L)
239 println("World!")
240}
241```
242
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300243> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-04.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300244
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300245<!--- TEST
246Hello,
247World!
248-->
249
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300250### Coroutines ARE light-weight
Roman Elizarov7deefb82017-01-31 10:33:17 +0300251
252Run the following code:
253
254```kotlin
255fun main(args: Array<String>) = runBlocking<Unit> {
256 val jobs = List(100_000) { // create a lot of coroutines and list their jobs
257 launch(CommonPool) {
258 delay(1000L)
259 print(".")
260 }
261 }
262 jobs.forEach { it.join() } // wait for all jobs to complete
263}
264```
265
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300266> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-05.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300267
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300268<!--- TEST lines.size == 1 && lines[0] == ".".repeat(100_000) -->
269
Roman Elizarov7deefb82017-01-31 10:33:17 +0300270It starts 100K coroutines and, after a second, each coroutine prints a dot.
271Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
272
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300273### Coroutines are like daemon threads
Roman Elizarov7deefb82017-01-31 10:33:17 +0300274
275The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300276returns from the main function after some delay:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300277
278```kotlin
279fun main(args: Array<String>) = runBlocking<Unit> {
280 launch(CommonPool) {
281 repeat(1000) { i ->
282 println("I'm sleeping $i ...")
283 delay(500L)
284 }
285 }
286 delay(1300L) // just quit after delay
287}
288```
289
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300290> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-06.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300291
292You can run and see that it prints three lines and terminates:
293
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300294```text
Roman Elizarov7deefb82017-01-31 10:33:17 +0300295I'm sleeping 0 ...
296I'm sleeping 1 ...
297I'm sleeping 2 ...
298```
299
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300300<!--- TEST -->
301
Roman Elizarov7deefb82017-01-31 10:33:17 +0300302Active coroutines do not keep the process alive. They are like daemon threads.
303
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300304## Cancellation and timeouts
305
306This section covers coroutine cancellation and timeouts.
307
308### Cancelling coroutine execution
Roman Elizarov7deefb82017-01-31 10:33:17 +0300309
310In small application the return from "main" method might sound like a good idea to get all coroutines
311implicitly terminated. In a larger, long-running application, you need finer-grained control.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300312The [launch] function returns a [Job] that can be used to cancel running coroutine:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300313
314```kotlin
315fun main(args: Array<String>) = runBlocking<Unit> {
316 val job = launch(CommonPool) {
317 repeat(1000) { i ->
318 println("I'm sleeping $i ...")
319 delay(500L)
320 }
321 }
322 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300323 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300324 job.cancel() // cancels the job
325 delay(1300L) // delay a bit to ensure it was cancelled indeed
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300326 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300327}
328```
329
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300330> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300331
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300332It produces the following output:
333
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300334```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300335I'm sleeping 0 ...
336I'm sleeping 1 ...
337I'm sleeping 2 ...
338main: I'm tired of waiting!
339main: Now I can quit.
340```
341
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300342<!--- TEST -->
343
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300344As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
345
346### Cancellation is cooperative
Roman Elizarov7deefb82017-01-31 10:33:17 +0300347
Tair Rzayevaf734622017-02-01 22:30:16 +0200348Coroutine cancellation is _cooperative_. A coroutine code has to cooperate to be cancellable.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300349All the suspending functions in `kotlinx.coroutines` are _cancellable_. They check for cancellation of
Roman Elizarov419a6c82017-02-09 18:36:22 +0300350coroutine and throw [CancellationException] when cancelled. However, if a coroutine is working in
Roman Elizarov7deefb82017-01-31 10:33:17 +0300351a computation and does not check for cancellation, then it cannot be cancelled, like the following
352example shows:
353
354```kotlin
355fun main(args: Array<String>) = runBlocking<Unit> {
356 val job = launch(CommonPool) {
357 var nextPrintTime = 0L
358 var i = 0
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300359 while (i < 10) { // computation loop
Roman Elizarov7deefb82017-01-31 10:33:17 +0300360 val currentTime = System.currentTimeMillis()
361 if (currentTime >= nextPrintTime) {
362 println("I'm sleeping ${i++} ...")
363 nextPrintTime = currentTime + 500L
364 }
365 }
366 }
367 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300368 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300369 job.cancel() // cancels the job
370 delay(1300L) // delay a bit to see if it was cancelled....
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300371 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300372}
373```
374
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300375> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300376
377Run it to see that it continues to print "I'm sleeping" even after cancellation.
378
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300379<!--- TEST
380I'm sleeping 0 ...
381I'm sleeping 1 ...
382I'm sleeping 2 ...
383main: I'm tired of waiting!
384I'm sleeping 3 ...
385I'm sleeping 4 ...
386I'm sleeping 5 ...
387main: Now I can quit.
388-->
389
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300390### Making computation code cancellable
Roman Elizarov7deefb82017-01-31 10:33:17 +0300391
392There are two approaches to making computation code cancellable. The first one is to periodically
Roman Elizarov419a6c82017-02-09 18:36:22 +0300393invoke a suspending function. There is a [yield] function that is a good choice for that purpose.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300394The other one is to explicitly check the cancellation status. Let us try the later approach.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300395
396Replace `while (true)` in the previous example with `while (isActive)` and rerun it.
397
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300398```kotlin
399fun main(args: Array<String>) = runBlocking<Unit> {
400 val job = launch(CommonPool) {
401 var nextPrintTime = 0L
402 var i = 0
403 while (isActive) { // cancellable computation loop
404 val currentTime = System.currentTimeMillis()
405 if (currentTime >= nextPrintTime) {
406 println("I'm sleeping ${i++} ...")
407 nextPrintTime = currentTime + 500L
408 }
409 }
410 }
411 delay(1300L) // delay a bit
412 println("main: I'm tired of waiting!")
413 job.cancel() // cancels the job
414 delay(1300L) // delay a bit to see if it was cancelled....
415 println("main: Now I can quit.")
416}
417```
418
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300419> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300420
Roman Elizarov419a6c82017-02-09 18:36:22 +0300421As you can see, now this loop can be cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
422the code of coroutines via [CoroutineScope] object.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300423
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300424<!--- TEST
425I'm sleeping 0 ...
426I'm sleeping 1 ...
427I'm sleeping 2 ...
428main: I'm tired of waiting!
429main: Now I can quit.
430-->
431
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300432### Closing resources with finally
433
Roman Elizarov419a6c82017-02-09 18:36:22 +0300434Cancellable suspending functions throw [CancellationException] on cancellation which can be handled in
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300435all the usual way. For example, the `try {...} finally {...}` and Kotlin `use` function execute their
436finalization actions normally when coroutine is cancelled:
437
438```kotlin
439fun main(args: Array<String>) = runBlocking<Unit> {
440 val job = launch(CommonPool) {
441 try {
442 repeat(1000) { i ->
443 println("I'm sleeping $i ...")
444 delay(500L)
445 }
446 } finally {
447 println("I'm running finally")
448 }
449 }
450 delay(1300L) // delay a bit
451 println("main: I'm tired of waiting!")
452 job.cancel() // cancels the job
453 delay(1300L) // delay a bit to ensure it was cancelled indeed
454 println("main: Now I can quit.")
455}
456```
457
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300458> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300459
460The example above produces the following output:
461
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300462```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300463I'm sleeping 0 ...
464I'm sleeping 1 ...
465I'm sleeping 2 ...
466main: I'm tired of waiting!
467I'm running finally
468main: Now I can quit.
469```
470
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300471<!--- TEST -->
472
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300473### Run non-cancellable block
474
475Any attempt to use a suspending function in the `finally` block of the previous example will cause
Roman Elizarov419a6c82017-02-09 18:36:22 +0300476[CancellationException], because the coroutine running this code is cancelled. Usually, this is not a
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300477problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
478communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
479rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
Roman Elizarov419a6c82017-02-09 18:36:22 +0300480`run(NonCancellable) {...}` using [run] function and [NonCancellable] context as the following example shows:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300481
482```kotlin
483fun main(args: Array<String>) = runBlocking<Unit> {
484 val job = launch(CommonPool) {
485 try {
486 repeat(1000) { i ->
487 println("I'm sleeping $i ...")
488 delay(500L)
489 }
490 } finally {
491 run(NonCancellable) {
492 println("I'm running finally")
493 delay(1000L)
494 println("And I've just delayed for 1 sec because I'm non-cancellable")
495 }
496 }
497 }
498 delay(1300L) // delay a bit
499 println("main: I'm tired of waiting!")
500 job.cancel() // cancels the job
501 delay(1300L) // delay a bit to ensure it was cancelled indeed
502 println("main: Now I can quit.")
503}
504```
505
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300506> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300507
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300508<!--- TEST
509I'm sleeping 0 ...
510I'm sleeping 1 ...
511I'm sleeping 2 ...
512main: I'm tired of waiting!
513I'm running finally
514And I've just delayed for 1 sec because I'm non-cancellable
515main: Now I can quit.
516-->
517
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300518### Timeout
519
520The most obvious reason to cancel coroutine execution in practice,
521is because its execution time has exceeded some timeout.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300522While you can manually track the reference to the corresponding [Job] and launch a separate coroutine to cancel
523the tracked one after delay, there is a ready to use [withTimeout] function that does it.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300524Look at the following example:
525
526```kotlin
527fun main(args: Array<String>) = runBlocking<Unit> {
528 withTimeout(1300L) {
529 repeat(1000) { i ->
530 println("I'm sleeping $i ...")
531 delay(500L)
532 }
533 }
534}
535```
536
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300537> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-06.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300538
539It produces the following output:
540
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300541```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300542I'm sleeping 0 ...
543I'm sleeping 1 ...
544I'm sleeping 2 ...
545Exception in thread "main" java.util.concurrent.CancellationException: Timed out waiting for 1300 MILLISECONDS
546```
547
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300548<!--- TEST STARTS_WITH -->
549
Roman Elizarov419a6c82017-02-09 18:36:22 +0300550We have not seen the [CancellationException] stack trace printed on the console before. That is because
Roman Elizarov7c864d82017-02-27 10:17:50 +0300551inside a cancelled coroutine `CancellationException` is considered to be a normal reason for coroutine completion.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300552However, in this example we have used `withTimeout` right inside the `main` function.
553
554Because cancellation is just an exception, all the resources will be closed in a usual way.
555You can wrap the code with timeout in `try {...} catch (e: CancellationException) {...}` block if
556you need to do some additional action specifically on timeout.
557
558## Composing suspending functions
559
560This section covers various approaches to composition of suspending functions.
561
562### Sequential by default
563
564Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300565remote service call or computation. We just pretend they are useful, but actually each one just
566delays for a second for the purpose of this example:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300567
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300568<!--- INCLUDE .*/example-compose-([0-9]+).kt
569import kotlin.system.measureTimeMillis
570-->
571
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300572```kotlin
573suspend fun doSomethingUsefulOne(): Int {
574 delay(1000L) // pretend we are doing something useful here
575 return 13
576}
577
578suspend fun doSomethingUsefulTwo(): Int {
579 delay(1000L) // pretend we are doing something useful here, too
580 return 29
581}
582```
583
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300584<!--- INCLUDE .*/example-compose-([0-9]+).kt -->
585
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300586What do we do if need to invoke them _sequentially_ -- first `doSomethingUsefulOne` _and then_
587`doSomethingUsefulTwo` and compute the sum of their results?
588In practise we do this if we use the results of the first function to make a decision on whether we need
589to invoke the second one or to decide on how to invoke it.
590
591We just use a normal sequential invocation, because the code in the coroutine, just like in the regular
Roman Elizarov32d95322017-02-09 15:57:31 +0300592code, is _sequential_ by default. The following example demonstrates it by measuring the total
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300593time it takes to execute both suspending functions:
594
595```kotlin
596fun main(args: Array<String>) = runBlocking<Unit> {
597 val time = measureTimeMillis {
598 val one = doSomethingUsefulOne()
599 val two = doSomethingUsefulTwo()
600 println("The answer is ${one + two}")
601 }
602 println("Completed in $time ms")
603}
604```
605
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300606> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-01.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300607
608It produces something like this:
609
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300610```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300611The answer is 42
612Completed in 2017 ms
613```
614
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300615<!--- TEST FLEXIBLE_TIME -->
616
Roman Elizarov32d95322017-02-09 15:57:31 +0300617### Concurrent using async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300618
619What if there are no dependencies between invocation of `doSomethingUsefulOne` and `doSomethingUsefulTwo` and
Roman Elizarov419a6c82017-02-09 18:36:22 +0300620we want to get the answer faster, by doing both _concurrently_? This is where [async] comes to help.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300621
Roman Elizarov419a6c82017-02-09 18:36:22 +0300622Conceptually, [async] is just like [launch]. It starts a separate coroutine which is a light-weight thread
623that works concurrently with all the other coroutines. The difference is that `launch` returns a [Job] and
624does not carry any resulting value, while `async` returns a [Deferred] -- a light-weight non-blocking future
Roman Elizarov32d95322017-02-09 15:57:31 +0300625that represents a promise to provide a result later. You can use `.await()` on a deferred value to get its eventual result,
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300626but `Deferred` is also a `Job`, so you can cancel it if needed.
627
628```kotlin
629fun main(args: Array<String>) = runBlocking<Unit> {
630 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300631 val one = async(CommonPool) { doSomethingUsefulOne() }
632 val two = async(CommonPool) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300633 println("The answer is ${one.await() + two.await()}")
634 }
635 println("Completed in $time ms")
636}
637```
638
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300639> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-02.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300640
641It produces something like this:
642
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300643```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300644The answer is 42
645Completed in 1017 ms
646```
647
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300648<!--- TEST FLEXIBLE_TIME -->
649
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300650This is twice as fast, because we have concurrent execution of two coroutines.
651Note, that concurrency with coroutines is always explicit.
652
Roman Elizarov32d95322017-02-09 15:57:31 +0300653### Lazily started async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300654
Roman Elizarov419a6c82017-02-09 18:36:22 +0300655There is a laziness option to [async] with `start = false` parameter.
656It starts coroutine only when its result is needed by some
657[await][Deferred.await] or if a [start][Job.start] function
Roman Elizarov32d95322017-02-09 15:57:31 +0300658is invoked. Run the following example that differs from the previous one only by this option:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300659
660```kotlin
661fun main(args: Array<String>) = runBlocking<Unit> {
662 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300663 val one = async(CommonPool, start = false) { doSomethingUsefulOne() }
664 val two = async(CommonPool, start = false) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300665 println("The answer is ${one.await() + two.await()}")
666 }
667 println("Completed in $time ms")
668}
669```
670
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300671> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-03.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300672
673It produces something like this:
674
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300675```text
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300676The answer is 42
677Completed in 2017 ms
678```
679
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300680<!--- TEST FLEXIBLE_TIME -->
681
Roman Elizarov32d95322017-02-09 15:57:31 +0300682So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
683for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
684the standard `lazy` function in cases when computation of the value involves suspending functions.
685
686### Async-style functions
687
688We can define async-style functions that invoke `doSomethingUsefulOne` and `doSomethingUsefulTwo`
Roman Elizarov419a6c82017-02-09 18:36:22 +0300689_asynchronously_ using [async] coroutine builder. It is a good style to name such functions with
Roman Elizarov32d95322017-02-09 15:57:31 +0300690either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous
691computation and one needs to use the resulting deferred value to get the result.
692
693```kotlin
694// The result type of asyncSomethingUsefulOne is Deferred<Int>
695fun asyncSomethingUsefulOne() = async(CommonPool) {
696 doSomethingUsefulOne()
697}
698
699// The result type of asyncSomethingUsefulTwo is Deferred<Int>
700fun asyncSomethingUsefulTwo() = async(CommonPool) {
701 doSomethingUsefulTwo()
702}
703```
704
705Note, that these `asyncXXX` function are **not** _suspending_ functions. They can be used from anywhere.
706However, their use always implies asynchronous (here meaning _concurrent_) execution of their action
707with the invoking code.
708
709The following example shows their use outside of coroutine:
710
711```kotlin
712// note, that we don't have `runBlocking` to the right of `main` in this example
713fun main(args: Array<String>) {
714 val time = measureTimeMillis {
715 // we can initiate async actions outside of a coroutine
716 val one = asyncSomethingUsefulOne()
717 val two = asyncSomethingUsefulTwo()
718 // but waiting for a result must involve either suspending or blocking.
719 // here we use `runBlocking { ... }` to block the main thread while waiting for the result
720 runBlocking {
721 println("The answer is ${one.await() + two.await()}")
722 }
723 }
724 println("Completed in $time ms")
725}
726```
727
728> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300729
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300730<!--- TEST FLEXIBLE_TIME
731The answer is 42
732Completed in 1085 ms
733-->
734
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300735## Coroutine context and dispatchers
736
Roman Elizarov32d95322017-02-09 15:57:31 +0300737We've already seen `launch(CommonPool) {...}`, `async(CommonPool) {...}`, `run(NonCancellable) {...}`, etc.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300738In these code snippets [CommonPool] and [NonCancellable] are _coroutine contexts_.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300739This section covers other available choices.
740
741### Dispatchers and threads
742
Roman Elizarov419a6c82017-02-09 18:36:22 +0300743Coroutine context includes a [_coroutine dispatcher_][CoroutineDispatcher] which determines what thread or threads
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300744the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
745to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:
746
747```kotlin
748fun main(args: Array<String>) = runBlocking<Unit> {
749 val jobs = arrayListOf<Job>()
750 jobs += launch(Unconfined) { // not confined -- will work with main thread
751 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
752 }
753 jobs += launch(context) { // context of the parent, runBlocking coroutine
754 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
755 }
756 jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
757 println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
758 }
759 jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
760 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
761 }
762 jobs.forEach { it.join() }
763}
764```
765
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300766> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-01.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300767
768It produces the following output (maybe in different order):
769
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300770```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300771 'Unconfined': I'm working in thread main
772 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
773 'newSTC': I'm working in thread MyOwnThread
774 'context': I'm working in thread main
775```
776
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300777<!--- TEST LINES_START_UNORDERED -->
778
Roman Elizarov419a6c82017-02-09 18:36:22 +0300779The difference between parent [context][CoroutineScope.context] and [Unconfined] context will be shown later.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300780
781### Unconfined vs confined dispatcher
782
Roman Elizarov419a6c82017-02-09 18:36:22 +0300783The [Unconfined] coroutine dispatcher starts coroutine in the caller thread, but only until the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300784first suspension point. After suspension it resumes in the thread that is fully determined by the
785suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not
786consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
787
Roman Elizarov419a6c82017-02-09 18:36:22 +0300788On the other side, [context][CoroutineScope.context] property that is available inside the block of any coroutine
789via [CoroutineScope] interface, is a reference to a context of this particular coroutine.
790This way, a parent context can be inherited. The default context of [runBlocking], in particular,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300791is confined to be invoker thread, so inheriting it has the effect of confining execution to
792this thread with a predictable FIFO scheduling.
793
794```kotlin
795fun main(args: Array<String>) = runBlocking<Unit> {
796 val jobs = arrayListOf<Job>()
797 jobs += launch(Unconfined) { // not confined -- will work with main thread
798 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
Roman Elizarovd0021622017-03-10 15:43:38 +0300799 delay(500)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300800 println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
801 }
802 jobs += launch(context) { // context of the parent, runBlocking coroutine
803 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
804 delay(1000)
805 println(" 'context': After delay in thread ${Thread.currentThread().name}")
806 }
807 jobs.forEach { it.join() }
808}
809```
810
Roman Elizarovd0021622017-03-10 15:43:38 +0300811> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-02.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300812
813Produces the output:
814
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300815```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300816 'Unconfined': I'm working in thread main
817 'context': I'm working in thread main
818 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
819 'context': After delay in thread main
820```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300821
822<!--- TEST LINES_START -->
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300823
Roman Elizarov7c864d82017-02-27 10:17:50 +0300824So, the coroutine that had inherited `context` of `runBlocking {...}` continues to execute in the `main` thread,
Roman Elizarov419a6c82017-02-09 18:36:22 +0300825while the unconfined one had resumed in the scheduler thread that [delay] function is using.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300826
827### Debugging coroutines and threads
828
Roman Elizarov419a6c82017-02-09 18:36:22 +0300829Coroutines can suspend on one thread and resume on another thread with [Unconfined] dispatcher or
830with a multi-threaded dispatcher like [CommonPool]. Even with a single-threaded dispatcher it might be hard to
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300831figure out what coroutine was doing what, where, and when. The common approach to debugging applications with
832threads is to print the thread name in the log file on each log statement. This feature is universally supported
833by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
834`kotlinx.coroutines` includes debugging facilities to make it easier.
835
836Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
837
838```kotlin
839fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
840
841fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov32d95322017-02-09 15:57:31 +0300842 val a = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300843 log("I'm computing a piece of the answer")
844 6
845 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300846 val b = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300847 log("I'm computing another piece of the answer")
848 7
849 }
850 log("The answer is ${a.await() * b.await()}")
851}
852```
853
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300854> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300855
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300856There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300857and two coroutines computing deferred values `a` (#2) and `b` (#3).
858They are all executing in the context of `runBlocking` and are confined to the main thread.
859The output of this code is:
860
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300861```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300862[main @coroutine#2] I'm computing a piece of the answer
863[main @coroutine#3] I'm computing another piece of the answer
864[main @coroutine#1] The answer is 42
865```
866
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300867<!--- TEST -->
868
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300869The `log` function prints the name of the thread in square brackets and you can see, that it is the `main`
870thread, but the identifier of the currently executing coroutine is appended to it. This identifier
871is consecutively assigned to all created coroutines when debugging mode is turned on.
872
Roman Elizarov419a6c82017-02-09 18:36:22 +0300873You can read more about debugging facilities in the documentation for [newCoroutineContext] function.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300874
875### Jumping between threads
876
877Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
878
879```kotlin
880fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
881
882fun main(args: Array<String>) {
883 val ctx1 = newSingleThreadContext("Ctx1")
884 val ctx2 = newSingleThreadContext("Ctx2")
885 runBlocking(ctx1) {
886 log("Started in ctx1")
887 run(ctx2) {
888 log("Working in ctx2")
889 }
890 log("Back to ctx1")
891 }
892}
893```
894
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300895> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300896
Roman Elizarov419a6c82017-02-09 18:36:22 +0300897It demonstrates two new techniques. One is using [runBlocking] with an explicitly specified context, and
898the second one is using [run] function to change a context of a coroutine while still staying in the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300899same coroutine as you can see in the output below:
900
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300901```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300902[Ctx1 @coroutine#1] Started in ctx1
903[Ctx2 @coroutine#1] Working in ctx2
904[Ctx1 @coroutine#1] Back to ctx1
905```
906
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300907<!--- TEST -->
908
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300909### Job in the context
910
Roman Elizarov419a6c82017-02-09 18:36:22 +0300911The coroutine [Job] is part of its context. The coroutine can retrieve it from its own context
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300912using `context[Job]` expression:
913
914```kotlin
915fun main(args: Array<String>) = runBlocking<Unit> {
916 println("My job is ${context[Job]}")
917}
918```
919
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300920> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-05.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300921
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300922It produces somethine like
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300923
924```
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300925My job is BlockingCoroutine{Active}@65ae6ba4
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300926```
927
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300928<!--- TEST lines.size == 1 && lines[0].startsWith("My job is BlockingCoroutine{Active}@") -->
929
Roman Elizarov419a6c82017-02-09 18:36:22 +0300930So, [isActive][CoroutineScope.isActive] in [CoroutineScope] is just a convenient shortcut for `context[Job]!!.isActive`.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300931
932### Children of a coroutine
933
Roman Elizarov419a6c82017-02-09 18:36:22 +0300934When [context][CoroutineScope.context] of a coroutine is used to launch another coroutine,
935the [Job] of the new coroutine becomes
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300936a _child_ of the parent coroutine's job. When the parent coroutine is cancelled, all its children
937are recursively cancelled, too.
938
939```kotlin
940fun main(args: Array<String>) = runBlocking<Unit> {
941 // start a coroutine to process some kind of incoming request
942 val request = launch(CommonPool) {
943 // it spawns two other jobs, one with its separate context
944 val job1 = launch(CommonPool) {
945 println("job1: I have my own context and execute independently!")
946 delay(1000)
947 println("job1: I am not affected by cancellation of the request")
948 }
949 // and the other inherits the parent context
950 val job2 = launch(context) {
951 println("job2: I am a child of the request coroutine")
952 delay(1000)
953 println("job2: I will not execute this line if my parent request is cancelled")
954 }
955 // request completes when both its sub-jobs complete:
956 job1.join()
957 job2.join()
958 }
959 delay(500)
960 request.cancel() // cancel processing of the request
961 delay(1000) // delay a second to see what happens
962 println("main: Who has survived request cancellation?")
963}
964```
965
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300966> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-06.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300967
968The output of this code is:
969
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300970```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300971job1: I have my own context and execute independently!
972job2: I am a child of the request coroutine
973job1: I am not affected by cancellation of the request
974main: Who has survived request cancellation?
975```
976
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300977<!--- TEST -->
978
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300979### Combining contexts
980
981Coroutine context can be combined using `+` operator. The context on the right-hand side replaces relevant entries
Roman Elizarov419a6c82017-02-09 18:36:22 +0300982of the context on the left-hand side. For example, a [Job] of the parent coroutine can be inherited, while
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300983its dispatcher replaced:
984
985```kotlin
986fun main(args: Array<String>) = runBlocking<Unit> {
987 // start a coroutine to process some kind of incoming request
988 val request = launch(context) { // use the context of `runBlocking`
989 // spawns CPU-intensive child job in CommonPool !!!
990 val job = launch(context + CommonPool) {
991 println("job: I am a child of the request coroutine, but with a different dispatcher")
992 delay(1000)
993 println("job: I will not execute this line if my parent request is cancelled")
994 }
995 job.join() // request completes when its sub-job completes
996 }
997 delay(500)
998 request.cancel() // cancel processing of the request
999 delay(1000) // delay a second to see what happens
1000 println("main: Who has survived request cancellation?")
1001}
1002```
1003
Roman Elizarovfa7723e2017-02-06 11:17:51 +03001004> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-07.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001005
1006The expected outcome of this code is:
1007
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001008```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001009job: I am a child of the request coroutine, but with a different dispatcher
1010main: Who has survived request cancellation?
1011```
1012
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001013<!--- TEST -->
1014
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001015### Naming coroutines for debugging
1016
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001017Automatically assigned ids are good when coroutines log often and you just need to correlate log records
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001018coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
1019or doing some specific background task, it is better to name it explicitly for debugging purposes.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001020[CoroutineName] serves the same function as a thread name. It'll get displayed in the thread name that
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001021is executing this coroutine when debugging more is turned on.
1022
1023The following example demonstrates this concept:
1024
1025```kotlin
1026fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
1027
1028fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
1029 log("Started main coroutine")
1030 // run two background value computations
Roman Elizarov32d95322017-02-09 15:57:31 +03001031 val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001032 log("Computing v1")
1033 delay(500)
1034 252
1035 }
Roman Elizarov32d95322017-02-09 15:57:31 +03001036 val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001037 log("Computing v2")
1038 delay(1000)
1039 6
1040 }
1041 log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
1042}
1043```
1044
Roman Elizarovfa7723e2017-02-06 11:17:51 +03001045> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001046
1047The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
1048
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001049```text
Roman Elizarov2f6d7c92017-02-03 15:16:07 +03001050[main @main#1] Started main coroutine
1051[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
1052[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
1053[main @main#1] The answer for v1 / v2 = 42
1054```
Roman Elizarov1293ccd2017-02-01 18:49:54 +03001055
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001056<!--- TEST FLEXIBLE_THREAD -->
1057
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001058### Cancellation via explicit job
1059
1060Let us put our knowledge about contexts, children and jobs together. Assume that our application has
1061an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application
1062and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch
1063and update data, do animations, etc. All of these coroutines must be cancelled when activity is destroyed
1064to avoid memory leaks.
1065
1066We can manage a lifecycle of our coroutines by creating an instance of [Job] that is tied to
1067the lifecycle of our activity. A job instance is created using [Job()][Job.invoke] factory function
1068as the following example shows. We need to make sure that all the coroutines are started
1069with this job in their context and then a single invocation of [Job.cancel] terminates them all.
1070
1071```kotlin
1072fun main(args: Array<String>) = runBlocking<Unit> {
1073 val job = Job() // create a job object to manage our lifecycle
1074 // now launch ten coroutines for a demo, each working for a different time
1075 val coroutines = List(10) { i ->
1076 // they are all children of our job object
1077 launch(context + job) { // we use the context of main runBlocking thread, but with our own job object
1078 delay(i * 200L) // variable delay 0ms, 200ms, 400ms, ... etc
1079 println("Coroutine $i is done")
1080 }
1081 }
1082 println("Launched ${coroutines.size} coroutines")
1083 delay(500L) // delay for half a second
1084 println("Cancelling job!")
1085 job.cancel() // cancel our job.. !!!
1086 delay(1000L) // delay for more to see if our coroutines are still working
1087}
1088```
1089
1090> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt)
1091
1092The output of this example is:
1093
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001094```text
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001095Launched 10 coroutines
1096Coroutine 0 is done
1097Coroutine 1 is done
1098Coroutine 2 is done
1099Cancelling job!
1100```
1101
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001102<!--- TEST -->
1103
Roman Elizarov2fd7cb32017-02-11 23:18:59 +03001104As you can see, only the first three coroutines had printed a message and the others were cancelled
1105by a single invocation of `job.cancel()`. So all we need to do in our hypothetical Android
1106application is to create a parent job object when activity is created, use it for child coroutines,
1107and cancel it when activity is destroyed.
1108
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001109## Channels
Roman Elizarov7deefb82017-01-31 10:33:17 +03001110
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001111Deferred values provide a convenient way to transfer a single value between coroutines.
1112Channels provide a way to transfer a stream of values.
1113
1114<!--- INCLUDE .*/example-channel-([0-9]+).kt
1115import kotlinx.coroutines.experimental.channels.*
1116-->
1117
1118### Channel basics
1119
Roman Elizarov419a6c82017-02-09 18:36:22 +03001120A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
1121instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
1122a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001123
1124```kotlin
1125fun main(args: Array<String>) = runBlocking<Unit> {
1126 val channel = Channel<Int>()
1127 launch(CommonPool) {
1128 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
1129 for (x in 1..5) channel.send(x * x)
1130 }
1131 // here we print five received integers:
1132 repeat(5) { println(channel.receive()) }
1133 println("Done!")
1134}
1135```
1136
1137> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
1138
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001139The output of this code is:
1140
1141```text
11421
11434
11449
114516
114625
1147Done!
1148```
1149
1150<!--- TEST -->
1151
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001152### Closing and iteration over channels
1153
1154Unlike a queue, a channel can be closed to indicate that no more elements are coming.
1155On the receiver side it is convenient to use a regular `for` loop to receive elements
1156from the channel.
1157
Roman Elizarov419a6c82017-02-09 18:36:22 +03001158Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001159The iteration stops as soon as this close token is received, so there is a guarantee
1160that all previously sent elements before the close are received:
1161
1162```kotlin
1163fun main(args: Array<String>) = runBlocking<Unit> {
1164 val channel = Channel<Int>()
1165 launch(CommonPool) {
1166 for (x in 1..5) channel.send(x * x)
1167 channel.close() // we're done sending
1168 }
1169 // here we print received values using `for` loop (until the channel is closed)
1170 for (y in channel) println(y)
1171 println("Done!")
1172}
1173```
1174
1175> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
1176
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001177<!--- TEST
11781
11794
11809
118116
118225
1183Done!
1184-->
1185
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001186### Building channel producers
1187
Roman Elizarova5e653f2017-02-13 13:49:55 +03001188The pattern where a coroutine is producing a sequence of elements is quite common.
1189This is a part of _producer-consumer_ pattern that is often found in concurrent code.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001190You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
Roman Elizarova5e653f2017-02-13 13:49:55 +03001191to common sense that results must be returned from functions.
1192
1193There is a convenience coroutine builder named [produce] that makes it easy to do it right:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001194
1195```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001196fun produceSquares() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001197 for (x in 1..5) send(x * x)
1198}
1199
1200fun main(args: Array<String>) = runBlocking<Unit> {
1201 val squares = produceSquares()
1202 for (y in squares) println(y)
1203 println("Done!")
1204}
1205```
1206
1207> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
1208
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001209<!--- TEST
12101
12114
12129
121316
121425
1215Done!
1216-->
1217
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001218### Pipelines
1219
1220Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
1221
1222```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001223fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001224 var x = 1
1225 while (true) send(x++) // infinite stream of integers starting from 1
1226}
1227```
1228
Roman Elizarova5e653f2017-02-13 13:49:55 +03001229And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001230In the below example the numbers are just squared:
1231
1232```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001233fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001234 for (x in numbers) send(x * x)
1235}
1236```
1237
Roman Elizarova5e653f2017-02-13 13:49:55 +03001238The main code starts and connects the whole pipeline:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001239
1240```kotlin
1241fun main(args: Array<String>) = runBlocking<Unit> {
1242 val numbers = produceNumbers() // produces integers from 1 and on
1243 val squares = square(numbers) // squares integers
1244 for (i in 1..5) println(squares.receive()) // print first five
1245 println("Done!") // we are done
1246 squares.cancel() // need to cancel these coroutines in a larger app
1247 numbers.cancel()
1248}
1249```
1250
1251> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
1252
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001253<!--- TEST
12541
12554
12569
125716
125825
1259Done!
1260-->
1261
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001262We don't have to cancel these coroutines in this example app, because
1263[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
1264but in a larger app we'll need to stop our pipeline if we don't need it anymore.
1265Alternatively, we could have run pipeline coroutines as
1266[children of a coroutine](#children-of-a-coroutine).
1267
1268### Prime numbers with pipeline
1269
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001270Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001271of coroutines. We start with an infinite sequence of numbers. This time we introduce an
1272explicit context parameter, so that caller can control where our coroutines run:
1273
1274<!--- INCLUDE kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
1275import kotlin.coroutines.experimental.CoroutineContext
1276-->
1277
1278```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001279fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001280 var x = start
1281 while (true) send(x++) // infinite stream of integers from start
1282}
1283```
1284
1285The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1286that are divisible by the given prime number:
1287
1288```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001289fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001290 for (x in numbers) if (x % prime != 0) send(x)
1291}
1292```
1293
1294Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001295and launching new pipeline stage for each prime number found:
1296
1297```
Roman Elizarova5e653f2017-02-13 13:49:55 +03001298numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
Roman Elizarov62500ba2017-02-09 18:55:40 +03001299```
1300
1301The following example prints the first ten prime numbers,
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001302running the whole pipeline in the context of the main thread:
1303
1304```kotlin
1305fun main(args: Array<String>) = runBlocking<Unit> {
1306 var cur = numbersFrom(context, 2)
1307 for (i in 1..10) {
1308 val prime = cur.receive()
1309 println(prime)
1310 cur = filter(context, cur, prime)
1311 }
1312}
1313```
1314
1315> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
1316
1317The output of this code is:
1318
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001319```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +030013202
13213
13225
13237
132411
132513
132617
132719
132823
132929
1330```
1331
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001332<!--- TEST -->
1333
Roman Elizarova5e653f2017-02-13 13:49:55 +03001334Note, that you can build the same pipeline using `buildIterator` coroutine builder from the standard library.
1335Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001336`ReceiveChannel` with `Iterator`, and get rid of the context. You will not need `runBlocking` either.
1337However, the benefit of a pipeline that uses channels as shown above is that it can actually use
1338multiple CPU cores if you run it in [CommonPool] context.
1339
Roman Elizarova5e653f2017-02-13 13:49:55 +03001340Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
Roman Elizarov62500ba2017-02-09 18:55:40 +03001341other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
1342built using `buildSeqeunce`/`buildIterator`, because they do not allow arbitrary suspension, unlike
Roman Elizarova5e653f2017-02-13 13:49:55 +03001343`produce` which is fully asynchronous.
Roman Elizarov62500ba2017-02-09 18:55:40 +03001344
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001345### Fan-out
1346
1347Multiple coroutines may receive from the same channel, distributing work between themselves.
1348Let us start with a producer coroutine that is periodically producing integers
1349(ten numbers per second):
1350
1351```kotlin
Roman Elizarova5e653f2017-02-13 13:49:55 +03001352fun produceNumbers() = produce<Int>(CommonPool) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001353 var x = 1 // start from 1
1354 while (true) {
1355 send(x++) // produce next
1356 delay(100) // wait 0.1s
1357 }
1358}
1359```
1360
1361Then we can have several processor coroutines. In this example, they just print their id and
1362received number:
1363
1364```kotlin
1365fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
Roman Elizarovec9384c2017-03-02 22:09:08 +03001366 for (x in channel) {
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001367 println("Processor #$id received $x")
Roman Elizarovec9384c2017-03-02 22:09:08 +03001368 }
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001369}
1370```
1371
1372Now let us launch five processors and let them work for a second. See what happens:
1373
1374```kotlin
1375fun main(args: Array<String>) = runBlocking<Unit> {
1376 val producer = produceNumbers()
1377 repeat(5) { launchProcessor(it, producer) }
1378 delay(1000)
1379 producer.cancel() // cancel producer coroutine and thus kill them all
1380}
1381```
1382
1383> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
1384
1385The output will be similar to the the following one, albeit the processor ids that receive
1386each specific integer may be different:
1387
1388```
1389Processor #2 received 1
1390Processor #4 received 2
1391Processor #0 received 3
1392Processor #1 received 4
1393Processor #3 received 5
1394Processor #2 received 6
1395Processor #4 received 7
1396Processor #0 received 8
1397Processor #1 received 9
1398Processor #3 received 10
1399```
1400
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001401<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
1402
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001403Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1404over the channel that processor coroutines are doing.
1405
1406### Fan-in
1407
1408Multiple coroutines may send to the same channel.
1409For example, let us have a channel of strings, and a suspending function that
1410repeatedly sends a specified string to this channel with a specified delay:
1411
1412```kotlin
1413suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1414 while (true) {
1415 delay(time)
1416 channel.send(s)
1417 }
1418}
1419```
1420
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001421Now, let us see what happens if we launch a couple of coroutines sending strings
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001422(in this example we launch them in the context of the main thread):
1423
1424```kotlin
1425fun main(args: Array<String>) = runBlocking<Unit> {
1426 val channel = Channel<String>()
1427 launch(context) { sendString(channel, "foo", 200L) }
1428 launch(context) { sendString(channel, "BAR!", 500L) }
1429 repeat(6) { // receive first six
1430 println(channel.receive())
1431 }
1432}
1433```
1434
1435> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
1436
1437The output is:
1438
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001439```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001440foo
1441foo
1442BAR!
1443foo
1444foo
1445BAR!
1446```
1447
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001448<!--- TEST -->
1449
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001450### Buffered channels
1451
1452The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1453meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1454if receive is invoked first, it is suspended until send is invoked.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001455
Roman Elizarova5e653f2017-02-13 13:49:55 +03001456Both [Channel()][Channel.invoke] factory function and [produce] builder take an optional `capacity` parameter to
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001457specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1458similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1459
1460Take a look at the behavior of the following code:
1461
1462```kotlin
1463fun main(args: Array<String>) = runBlocking<Unit> {
1464 val channel = Channel<Int>(4) // create buffered channel
1465 launch(context) { // launch sender coroutine
1466 repeat(10) {
1467 println("Sending $it") // print before sending each element
1468 channel.send(it) // will suspend when buffer is full
1469 }
1470 }
1471 // don't receive anything... just wait....
1472 delay(1000)
1473}
1474```
1475
1476> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
1477
1478It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1479
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001480```text
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001481Sending 0
1482Sending 1
1483Sending 2
1484Sending 3
1485Sending 4
1486```
1487
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001488<!--- TEST -->
1489
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001490The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001491
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001492
1493### Channels are fair
1494
1495Send and receive operations to channels are _fair_ with respect to the order of their invocation from
1496multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
1497gets the element. In the following example two coroutines "ping" and "pong" are
1498receiving the "ball" object from the shared "table" channel.
1499
1500```kotlin
1501data class Ball(var hits: Int)
1502
1503fun main(args: Array<String>) = runBlocking<Unit> {
1504 val table = Channel<Ball>() // a shared table
1505 launch(context) { player("ping", table) }
1506 launch(context) { player("pong", table) }
1507 table.send(Ball(0)) // serve the ball
1508 delay(1000) // delay 1 second
1509 table.receive() // game over, grab the ball
1510}
1511
1512suspend fun player(name: String, table: Channel<Ball>) {
1513 for (ball in table) { // receive the ball in a loop
1514 ball.hits++
1515 println("$name $ball")
Roman Elizarovf526b132017-03-10 16:07:14 +03001516 delay(300) // wait a bit
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001517 table.send(ball) // send the ball back
1518 }
1519}
1520```
1521
1522> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-09.kt)
1523
1524The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
1525coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
1526received by the "pong" coroutine, because it was already waiting for it:
1527
1528```text
1529ping Ball(hits=1)
1530pong Ball(hits=2)
1531ping Ball(hits=3)
1532pong Ball(hits=4)
1533ping Ball(hits=5)
Roman Elizarovb0517ba2017-02-27 14:03:14 +03001534```
1535
1536<!--- TEST -->
1537
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001538## Shared mutable state and concurrency
1539
1540Coroutines can be executed concurrently using a multi-threaded dispatcher like [CommonPool]. It presents
1541all the usual concurrency problems. The main problem being synchronization of access to **shared mutable state**.
1542Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
1543but others are unique.
1544
1545### The problem
1546
Roman Elizarov1e459602017-02-27 11:05:17 +03001547Let us launch a thousand coroutines all doing the same action thousand times (for a total of a million executions).
1548We'll also measure their completion time for further comparisons:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001549
1550<!--- INCLUDE .*/example-sync-([0-9]+).kt
Roman Elizarov1e459602017-02-27 11:05:17 +03001551import kotlin.coroutines.experimental.CoroutineContext
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001552import kotlin.system.measureTimeMillis
1553-->
1554
Roman Elizarov1e459602017-02-27 11:05:17 +03001555<!--- INCLUDE .*/example-sync-03.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001556import java.util.concurrent.atomic.AtomicInteger
1557-->
1558
Roman Elizarov1e459602017-02-27 11:05:17 +03001559<!--- INCLUDE .*/example-sync-06.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001560import kotlinx.coroutines.experimental.sync.Mutex
1561-->
1562
Roman Elizarov1e459602017-02-27 11:05:17 +03001563<!--- INCLUDE .*/example-sync-07.kt
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001564import kotlinx.coroutines.experimental.channels.*
1565-->
1566
1567```kotlin
Roman Elizarov1e459602017-02-27 11:05:17 +03001568suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
1569 val n = 1000 // number of coroutines to launch
1570 val k = 1000 // times an action is repeated by each coroutine
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001571 val time = measureTimeMillis {
1572 val jobs = List(n) {
Roman Elizarov1e459602017-02-27 11:05:17 +03001573 launch(context) {
1574 repeat(k) { action() }
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001575 }
1576 }
1577 jobs.forEach { it.join() }
1578 }
Roman Elizarov1e459602017-02-27 11:05:17 +03001579 println("Completed ${n * k} actions in $time ms")
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001580}
1581```
1582
1583<!--- INCLUDE .*/example-sync-([0-9]+).kt -->
1584
Roman Elizarov1e459602017-02-27 11:05:17 +03001585We start with a very simple action that increments a shared mutable variable using
1586multi-threaded [CommonPool] context.
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001587
1588```kotlin
1589var counter = 0
1590
1591fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001592 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001593 counter++
1594 }
1595 println("Counter = $counter")
1596}
1597```
1598
1599> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt)
1600
Roman Elizarov1e459602017-02-27 11:05:17 +03001601<!--- TEST LINES_START
1602Completed 1000000 actions in
1603Counter =
1604-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001605
Roman Elizarov1e459602017-02-27 11:05:17 +03001606What does it print at the end? It is highly unlikely to ever print "Counter = 1000000", because a thousand coroutines
1607increment the `counter` concurrently from multiple threads without any synchronization.
1608
1609### Volatiles are of no help
1610
1611There is common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
1612
1613```kotlin
1614@Volatile // in Kotlin `volatile` is an annotation
1615var counter = 0
1616
1617fun main(args: Array<String>) = runBlocking<Unit> {
1618 massiveRun(CommonPool) {
1619 counter++
1620 }
1621 println("Counter = $counter")
1622}
1623```
1624
1625> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt)
1626
1627<!--- TEST LINES_START
1628Completed 1000000 actions in
1629Counter =
1630-->
1631
1632This code works slower, but we still don't get "Counter = 1000000" at the end, because volatile variables guarantee
1633linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but
1634do not provide atomicity of larger actions (increment in our case).
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001635
1636### Thread-safe data structures
1637
1638The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
1639linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
1640operations that needs to be performed on a shared state.
Roman Elizarov1e459602017-02-27 11:05:17 +03001641In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001642
1643```kotlin
1644var counter = AtomicInteger()
1645
1646fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001647 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001648 counter.incrementAndGet()
1649 }
1650 println("Counter = ${counter.get()}")
1651}
1652```
1653
Roman Elizarov1e459602017-02-27 11:05:17 +03001654> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001655
Roman Elizarov1e459602017-02-27 11:05:17 +03001656<!--- TEST ARBITRARY_TIME
1657Completed 1000000 actions in xxx ms
1658Counter = 1000000
1659-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001660
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001661This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
1662standard data structures and basic operations on them. However, it does not easily scale to complex
1663state or to complex operations that do not have ready-to-use thread-safe implementations.
1664
Roman Elizarov1e459602017-02-27 11:05:17 +03001665### Thread confinement fine-grained
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001666
Roman Elizarov1e459602017-02-27 11:05:17 +03001667_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001668state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
1669the single event-dispatch/application thread. It is easy to apply with coroutines by using a
1670single-threaded context:
1671
1672```kotlin
1673val counterContext = newSingleThreadContext("CounterContext")
1674var counter = 0
1675
1676fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001677 massiveRun(CommonPool) { // run each coroutine in CommonPool
1678 run(counterContext) { // but confine each increment to the single-threaded context
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001679 counter++
1680 }
1681 }
1682 println("Counter = $counter")
1683}
1684```
1685
Roman Elizarov1e459602017-02-27 11:05:17 +03001686> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001687
Roman Elizarov1e459602017-02-27 11:05:17 +03001688<!--- TEST ARBITRARY_TIME
1689Completed 1000000 actions in xxx ms
1690Counter = 1000000
1691-->
1692
1693This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches
1694from multi-threaded `CommonPool` context to the single-threaded context using [run] block.
1695
1696### Thread confinement coarse-grained
1697
1698In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic
1699are confined to the single thread. The following example does it like that, running each coroutine in
1700the single-threaded context to start with.
1701
1702```kotlin
1703val counterContext = newSingleThreadContext("CounterContext")
1704var counter = 0
1705
1706fun main(args: Array<String>) = runBlocking<Unit> {
1707 massiveRun(counterContext) { // run each coroutine in the single-threaded context
1708 counter++
1709 }
1710 println("Counter = $counter")
1711}
1712```
1713
1714> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt)
1715
1716<!--- TEST ARBITRARY_TIME
1717Completed 1000000 actions in xxx ms
1718Counter = 1000000
1719-->
1720
1721This now works much faster and produces correct result.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001722
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001723### Mutual exclusion
1724
1725Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
1726that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
1727Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
1728delimit a critical section. The key difference is that `Mutex.lock` is a suspending function. It does not block a thread.
1729
1730```kotlin
1731val mutex = Mutex()
1732var counter = 0
1733
1734fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov1e459602017-02-27 11:05:17 +03001735 massiveRun(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001736 mutex.lock()
1737 try { counter++ }
1738 finally { mutex.unlock() }
1739 }
1740 println("Counter = $counter")
1741}
1742```
1743
Roman Elizarov1e459602017-02-27 11:05:17 +03001744> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001745
Roman Elizarov1e459602017-02-27 11:05:17 +03001746<!--- TEST ARBITRARY_TIME
1747Completed 1000000 actions in xxx ms
1748Counter = 1000000
1749-->
1750
1751The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations
1752where you absolutely must modify some shared state periodically, but there is no natural thread that this state
1753is confined to.
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001754
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001755### Actors
1756
1757An actor is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine,
1758and a channel to communicate with other coroutines. A simple actor can be written as a function,
1759but an actor with a complex state is better suited for a class.
1760
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001761There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its
1762scope to receive messages from and combines the send channel into the resulting job object, so that a
1763single reference to the actor can be carried around as its handle.
1764
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001765```kotlin
1766// Message types for counterActor
1767sealed class CounterMsg
1768object IncCounter : CounterMsg() // one-way message to increment counter
1769class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply
1770
1771// This function launches a new counter actor
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001772fun counterActor() = actor<CounterMsg>(CommonPool) {
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001773 var counter = 0 // actor state
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001774 for (msg in channel) { // iterate over incoming messages
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001775 when (msg) {
1776 is IncCounter -> counter++
1777 is GetCounter -> msg.response.send(counter)
1778 }
1779 }
1780}
1781
1782fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001783 val counter = counterActor() // create the actor
Roman Elizarov1e459602017-02-27 11:05:17 +03001784 massiveRun(CommonPool) {
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001785 counter.send(IncCounter)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001786 }
1787 val response = Channel<Int>()
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001788 counter.send(GetCounter(response))
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001789 println("Counter = ${response.receive()}")
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001790 counter.close() // shutdown the actor
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001791}
1792```
1793
Roman Elizarov1e459602017-02-27 11:05:17 +03001794> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt)
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001795
Roman Elizarov1e459602017-02-27 11:05:17 +03001796<!--- TEST ARBITRARY_TIME
1797Completed 1000000 actions in xxx ms
1798Counter = 1000000
1799-->
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001800
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001801It does not matter (for correctness) what context the actor itself is executed in. An actor is
Roman Elizarovf5bc0472017-02-22 11:38:13 +03001802a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
1803works as a solution to the problem of shared mutable state.
1804
Roman Elizarovc0e19f82017-02-27 11:59:14 +03001805Actor is more efficient than locking under load, because in this case it always has work to do and it does not
1806have to switch to a different context at all.
1807
1808> Note, that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated
1809 with the channel that it receives messages from, while a producer is associated with the channel that it
1810 sends elements to.
Roman Elizarov1e459602017-02-27 11:05:17 +03001811
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001812## Select expression
1813
Roman Elizarova84730b2017-02-22 11:58:50 +03001814Select expression makes it possible to await multiple suspending functions simultaneously and _select_
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001815the first one that becomes available.
1816
1817<!--- INCLUDE .*/example-select-([0-9]+).kt
1818import kotlinx.coroutines.experimental.channels.*
1819import kotlinx.coroutines.experimental.selects.*
1820-->
1821
1822### Selecting from channels
1823
Roman Elizarov57857202017-03-02 23:17:25 +03001824Let us have two producers of strings: `fizz` and `buzz`. The `fizz` produces "Fizz" string every 300 ms:
1825
1826<!--- INCLUDE .*/example-select-01.kt
1827import kotlin.coroutines.experimental.CoroutineContext
1828-->
1829
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001830```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001831fun fizz(context: CoroutineContext) = produce<String>(context) {
1832 while (true) { // sends "Fizz" every 300 ms
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001833 delay(300)
1834 send("Fizz")
1835 }
1836}
1837```
1838
Roman Elizarov57857202017-03-02 23:17:25 +03001839And the `buzz` produces "Buzz!" string every 500 ms:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001840
1841```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001842fun buzz(context: CoroutineContext) = produce<String>(context) {
1843 while (true) { // sends "Buzz!" every 500 ms
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001844 delay(500)
1845 send("Buzz!")
1846 }
1847}
1848```
1849
1850Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the
1851other. But [select] expression allows us to receive from _both_ simultaneously using its
1852[onReceive][SelectBuilder.onReceive] clauses:
1853
1854```kotlin
Roman Elizarov57857202017-03-02 23:17:25 +03001855suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001856 select<Unit> { // <Unit> means that this select expression does not produce any result
1857 fizz.onReceive { value -> // this is the first select clause
1858 println("fizz -> '$value'")
1859 }
1860 buzz.onReceive { value -> // this is the second select clause
1861 println("buzz -> '$value'")
1862 }
1863 }
1864}
1865```
1866
Roman Elizarov57857202017-03-02 23:17:25 +03001867Let us run it all seven times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001868
1869```kotlin
1870fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov57857202017-03-02 23:17:25 +03001871 val fizz = fizz(context)
1872 val buzz = buzz(context)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001873 repeat(7) {
Roman Elizarov57857202017-03-02 23:17:25 +03001874 selectFizzBuzz(fizz, buzz)
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001875 }
1876}
1877```
1878
1879> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt)
1880
1881The result of this code is:
1882
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001883```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001884fizz -> 'Fizz'
1885buzz -> 'Buzz!'
1886fizz -> 'Fizz'
1887fizz -> 'Fizz'
1888buzz -> 'Buzz!'
1889fizz -> 'Fizz'
1890buzz -> 'Buzz!'
1891```
1892
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001893<!--- TEST -->
1894
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001895### Selecting on close
1896
1897The [onReceive][SelectBuilder.onReceive] clause in `select` fails when the channel is closed and the corresponding
1898`select` throws an exception. We can use [onReceiveOrNull][SelectBuilder.onReceiveOrNull] clause to perform a
Roman Elizarova84730b2017-02-22 11:58:50 +03001899specific action when the channel is closed. The following example also shows that `select` is an expression that returns
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001900the result of its selected clause:
1901
1902```kotlin
1903suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
1904 select<String> {
1905 a.onReceiveOrNull { value ->
1906 if (value == null)
1907 "Channel 'a' is closed"
1908 else
1909 "a -> '$value'"
1910 }
1911 b.onReceiveOrNull { value ->
1912 if (value == null)
1913 "Channel 'b' is closed"
1914 else
1915 "b -> '$value'"
1916 }
1917 }
1918```
1919
Roman Elizarova84730b2017-02-22 11:58:50 +03001920Let's use it with channel `a` that produces "Hello" string four times and
1921channel `b` that produces "World" four times:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001922
1923```kotlin
1924fun main(args: Array<String>) = runBlocking<Unit> {
1925 // we are using the context of the main thread in this example for predictability ...
1926 val a = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001927 repeat(4) { send("Hello $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001928 }
1929 val b = produce<String>(context) {
Roman Elizarova84730b2017-02-22 11:58:50 +03001930 repeat(4) { send("World $it") }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001931 }
1932 repeat(8) { // print first eight results
1933 println(selectAorB(a, b))
1934 }
1935}
1936```
1937
1938> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt)
1939
Roman Elizarova84730b2017-02-22 11:58:50 +03001940The result of this code is quite interesting, so we'll analyze it in mode detail:
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001941
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001942```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001943a -> 'Hello 0'
1944a -> 'Hello 1'
1945b -> 'World 0'
1946a -> 'Hello 2'
1947a -> 'Hello 3'
1948b -> 'World 1'
1949Channel 'a' is closed
1950Channel 'a' is closed
1951```
1952
Roman Elizarov731f0ad2017-02-22 20:48:45 +03001953<!--- TEST -->
1954
Roman Elizarova84730b2017-02-22 11:58:50 +03001955There are couple of observations to make out of it.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001956
1957First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time,
1958the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel,
Roman Elizarova84730b2017-02-22 11:58:50 +03001959being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001960time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
1961
1962The second observation, is that [onReceiveOrNull][SelectBuilder.onReceiveOrNull] gets immediately selected when the
1963channel is already closed.
1964
1965### Selecting to send
1966
1967Select expression has [onSend][SelectBuilder.onSend] clause that can be used for a great good in combination
1968with a biased nature of selection.
1969
Roman Elizarova84730b2017-02-22 11:58:50 +03001970Let us write an example of producer of integers that sends its values to a `side` channel when
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001971the consumers on its primary channel cannot keep up with it:
1972
1973```kotlin
1974fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
1975 for (num in 1..10) { // produce 10 numbers from 1 to 10
1976 delay(100) // every 100 ms
1977 select<Unit> {
Roman Elizarova84730b2017-02-22 11:58:50 +03001978 onSend(num) {} // Send to the primary channel
1979 side.onSend(num) {} // or to the side channel
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03001980 }
1981 }
1982}
1983```
1984
1985Consumer is going to be quite slow, taking 250 ms to process each number:
1986
1987```kotlin
1988fun main(args: Array<String>) = runBlocking<Unit> {
1989 val side = Channel<Int>() // allocate side channel
1990 launch(context) { // this is a very fast consumer for the side channel
1991 for (num in side) println("Side channel has $num")
1992 }
1993 for (num in produceNumbers(side)) {
1994 println("Consuming $num")
1995 delay(250) // let us digest the consumed number properly, do not hurry
1996 }
1997 println("Done consuming")
1998}
1999```
2000
2001> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt)
2002
2003So let us see what happens:
2004
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002005```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002006Consuming 1
2007Side channel has 2
2008Side channel has 3
2009Consuming 4
2010Side channel has 5
2011Side channel has 6
2012Consuming 7
2013Side channel has 8
2014Side channel has 9
2015Consuming 10
2016Done consuming
2017```
2018
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002019<!--- TEST -->
2020
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002021### Selecting deferred values
2022
Roman Elizarova84730b2017-02-22 11:58:50 +03002023Deferred values can be selected using [onAwait][SelectBuilder.onAwait] clause.
2024Let us start with an async function that returns a deferred string value after
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002025a random delay:
2026
2027<!--- INCLUDE .*/example-select-04.kt
2028import java.util.*
2029-->
2030
2031```kotlin
2032fun asyncString(time: Int) = async(CommonPool) {
2033 delay(time.toLong())
2034 "Waited for $time ms"
2035}
2036```
2037
Roman Elizarova84730b2017-02-22 11:58:50 +03002038Let us start a dozen of them with a random delay.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002039
2040```kotlin
2041fun asyncStringsList(): List<Deferred<String>> {
2042 val random = Random(3)
Roman Elizarova84730b2017-02-22 11:58:50 +03002043 return List(12) { asyncString(random.nextInt(1000)) }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002044}
2045```
2046
Roman Elizarova84730b2017-02-22 11:58:50 +03002047Now the main function awaits for the first of them to complete and counts the number of deferred values
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002048that are still active. Note, that we've used here the fact that `select` expression is a Kotlin DSL,
Roman Elizarova84730b2017-02-22 11:58:50 +03002049so we can provide clauses for it using an arbitrary code. In this case we iterate over a list
2050of deferred values to provide `onAwait` clause for each deferred value.
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002051
2052```kotlin
2053fun main(args: Array<String>) = runBlocking<Unit> {
2054 val list = asyncStringsList()
2055 val result = select<String> {
2056 list.withIndex().forEach { (index, deferred) ->
2057 deferred.onAwait { answer ->
2058 "Deferred $index produced answer '$answer'"
2059 }
2060 }
2061 }
2062 println(result)
Roman Elizarov7c864d82017-02-27 10:17:50 +03002063 val countActive = list.count { it.isActive }
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002064 println("$countActive coroutines are still active")
2065}
2066```
2067
2068> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-04.kt)
2069
2070The output is:
2071
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002072```text
Roman Elizarova84730b2017-02-22 11:58:50 +03002073Deferred 4 produced answer 'Waited for 128 ms'
Roman Elizarovd4dcbe22017-02-22 09:57:46 +0300207411 coroutines are still active
2075```
2076
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002077<!--- TEST -->
2078
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002079### Switch over a channel of deferred values
2080
Roman Elizarova84730b2017-02-22 11:58:50 +03002081Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
2082deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002083[onReceiveOrNull][SelectBuilder.onReceiveOrNull] and [onAwait][SelectBuilder.onAwait] clauses in the same `select`:
2084
2085```kotlin
2086fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String>(CommonPool) {
Roman Elizarova84730b2017-02-22 11:58:50 +03002087 var current = input.receive() // start with first received deferred value
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002088 while (isActive) { // loop while not cancelled/closed
2089 val next = select<Deferred<String>?> { // return next deferred value from this select or null
2090 input.onReceiveOrNull { update ->
2091 update // replaces next value to wait
2092 }
2093 current.onAwait { value ->
2094 send(value) // send value that current deferred has produced
2095 input.receiveOrNull() // and use the next deferred from the input channel
2096 }
2097 }
2098 if (next == null) {
2099 println("Channel was closed")
2100 break // out of loop
2101 } else {
2102 current = next
2103 }
2104 }
2105}
2106```
2107
2108To test it, we'll use a simple async function that resolves to a specified string after a specified time:
2109
2110```kotlin
2111fun asyncString(str: String, time: Long) = async(CommonPool) {
2112 delay(time)
2113 str
2114}
2115```
2116
2117The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
2118data to it:
2119
2120```kotlin
2121fun main(args: Array<String>) = runBlocking<Unit> {
2122 val chan = Channel<Deferred<String>>() // the channel for test
Roman Elizarova84730b2017-02-22 11:58:50 +03002123 launch(context) { // launch printing coroutine
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002124 for (s in switchMapDeferreds(chan))
2125 println(s) // print each received string
2126 }
2127 chan.send(asyncString("BEGIN", 100))
2128 delay(200) // enough time for "BEGIN" to be produced
2129 chan.send(asyncString("Slow", 500))
Roman Elizarova84730b2017-02-22 11:58:50 +03002130 delay(100) // not enough time to produce slow
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002131 chan.send(asyncString("Replace", 100))
Roman Elizarova84730b2017-02-22 11:58:50 +03002132 delay(500) // give it time before the last one
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002133 chan.send(asyncString("END", 500))
2134 delay(1000) // give it time to process
Roman Elizarova84730b2017-02-22 11:58:50 +03002135 chan.close() // close the channel ...
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002136 delay(500) // and wait some time to let it finish
2137}
2138```
2139
2140> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-select-05.kt)
2141
2142The result of this code:
2143
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002144```text
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002145BEGIN
2146Replace
2147END
2148Channel was closed
2149```
2150
Roman Elizarov731f0ad2017-02-22 20:48:45 +03002151<!--- TEST -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002152
Roman Elizarov8db17332017-03-09 12:40:45 +03002153## Further reading
2154
2155* [Guide to UI programming with coroutines](ui/coroutines-guide-ui.md)
2156* [Coroutines design document (KEEP)](https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md)
2157* [Full kotlinx.coroutines API reference](http://kotlin.github.io/kotlinx.coroutines)
2158
Roman Elizarove0c817d2017-02-10 10:22:01 +03002159<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core -->
2160<!--- DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
2161<!--- INDEX kotlinx.coroutines.experimental -->
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002162[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
2163[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/delay.html
2164[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
2165[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
2166[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-cancellation-exception.html
2167[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002168[CoroutineScope.isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/is-active.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002169[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
2170[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
2171[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
2172[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
2173[async]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/async.html
2174[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002175[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html
2176[Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/start.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002177[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
2178[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002179[CoroutineScope.context]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/context.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002180[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002181[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002182[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002183[Job.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/invoke.html
2184[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/cancel.html
Roman Elizarovf5bc0472017-02-22 11:38:13 +03002185<!--- INDEX kotlinx.coroutines.experimental.sync -->
2186[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002187[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/lock.html
2188[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/unlock.html
Roman Elizarove0c817d2017-02-10 10:22:01 +03002189<!--- INDEX kotlinx.coroutines.experimental.channels -->
Roman Elizarov419a6c82017-02-09 18:36:22 +03002190[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002191[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
2192[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
2193[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html
Roman Elizarova5e653f2017-02-13 13:49:55 +03002194[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002195[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/invoke.html
Roman Elizarovc0e19f82017-02-27 11:59:14 +03002196[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
Roman Elizarovd4dcbe22017-02-22 09:57:46 +03002197<!--- INDEX kotlinx.coroutines.experimental.selects -->
2198[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
Roman Elizarovbff3f372017-03-01 18:12:27 +03002199[SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive.html
2200[SelectBuilder.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive-or-null.html
2201[SelectBuilder.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-send.html
2202[SelectBuilder.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-await.html
Roman Elizarov419a6c82017-02-09 18:36:22 +03002203<!--- END -->