blob: 171825647a0b80457a0b5aa1846d20256072cc46 [file] [log] [blame] [view]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001<!---
2
3Copyright 2016-2017 JetBrains s.r.o.
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17-->
18
Roman Elizarov7deefb82017-01-31 10:33:17 +030019# Guide to kotlinx.coroutines by example
20
21This is a short guide on core features of `kotlinx.coroutines` with a series of examples.
22
Roman Elizarov1293ccd2017-02-01 18:49:54 +030023## Table of contents
24
Roman Elizarovfa7723e2017-02-06 11:17:51 +030025<!--- TOC -->
26
Roman Elizarov1293ccd2017-02-01 18:49:54 +030027* [Coroutine basics](#coroutine-basics)
28 * [Your first coroutine](#your-first-coroutine)
29 * [Bridging blocking and non-blocking worlds](#bridging-blocking-and-non-blocking-worlds)
30 * [Waiting for a job](#waiting-for-a-job)
31 * [Extract function refactoring](#extract-function-refactoring)
32 * [Coroutines ARE light-weight](#coroutines-are-light-weight)
33 * [Coroutines are like daemon threads](#coroutines-are-like-daemon-threads)
34* [Cancellation and timeouts](#cancellation-and-timeouts)
35 * [Cancelling coroutine execution](#cancelling-coroutine-execution)
36 * [Cancellation is cooperative](#cancellation-is-cooperative)
37 * [Making computation code cancellable](#making-computation-code-cancellable)
38 * [Closing resources with finally](#closing-resources-with-finally)
39 * [Run non-cancellable block](#run-non-cancellable-block)
40 * [Timeout](#timeout)
41* [Composing suspending functions](#composing-suspending-functions)
42 * [Sequential by default](#sequential-by-default)
Roman Elizarov32d95322017-02-09 15:57:31 +030043 * [Concurrent using async](#concurrent-using-async)
44 * [Lazily started async](#lazily-started-async)
45 * [Async-style functions](#async-style-functions)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030046* [Coroutine context and dispatchers](#coroutine-context-and-dispatchers)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030047 * [Dispatchers and threads](#dispatchers-and-threads)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030048 * [Unconfined vs confined dispatcher](#unconfined-vs-confined-dispatcher)
49 * [Debugging coroutines and threads](#debugging-coroutines-and-threads)
50 * [Jumping between threads](#jumping-between-threads)
51 * [Job in the context](#job-in-the-context)
52 * [Children of a coroutine](#children-of-a-coroutine)
53 * [Combining contexts](#combining-contexts)
54 * [Naming coroutines for debugging](#naming-coroutines-for-debugging)
Roman Elizarovb7721cf2017-02-03 19:23:08 +030055* [Channels](#channels)
56 * [Channel basics](#channel-basics)
57 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
58 * [Building channel producers](#building-channel-producers)
59 * [Pipelines](#pipelines)
60 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
61 * [Fan-out](#fan-out)
62 * [Fan-in](#fan-in)
63 * [Buffered channels](#buffered-channels)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030064
Roman Elizarovb3d55a52017-02-03 12:47:21 +030065<!--- KNIT kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +030066
Roman Elizarovfa7723e2017-02-06 11:17:51 +030067<!--- INCLUDE .*/example-([a-z]+)-([0-9]+)\.kt
Roman Elizarovf16fd272017-02-07 11:26:00 +030068/*
69 * Copyright 2016-2017 JetBrains s.r.o.
70 *
71 * Licensed under the Apache License, Version 2.0 (the "License");
72 * you may not use this file except in compliance with the License.
73 * You may obtain a copy of the License at
74 *
75 * http://www.apache.org/licenses/LICENSE-2.0
76 *
77 * Unless required by applicable law or agreed to in writing, software
78 * distributed under the License is distributed on an "AS IS" BASIS,
79 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
80 * See the License for the specific language governing permissions and
81 * limitations under the License.
82 */
83
Roman Elizarovb3d55a52017-02-03 12:47:21 +030084// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
Roman Elizarovfa7723e2017-02-06 11:17:51 +030085package guide.$$1.example$$2
Roman Elizarovb3d55a52017-02-03 12:47:21 +030086
87import kotlinx.coroutines.experimental.*
88-->
89
Roman Elizarov1293ccd2017-02-01 18:49:54 +030090## Coroutine basics
91
92This section covers basic coroutine concepts.
93
94### Your first coroutine
Roman Elizarov7deefb82017-01-31 10:33:17 +030095
96Run the following code:
97
98```kotlin
99fun main(args: Array<String>) {
100 launch(CommonPool) { // create new coroutine in common thread pool
101 delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
102 println("World!") // print after delay
103 }
104 println("Hello,") // main function continues while coroutine is delayed
105 Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
106}
107```
108
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300109> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300110
111Run this code:
112
113```
114Hello,
115World!
116```
117
Roman Elizarov419a6c82017-02-09 18:36:22 +0300118Essentially, coroutines are light-weight threads.
119They are launched with [launch] _coroutine builder_.
120You can achieve the same result replacing
Roman Elizarov7deefb82017-01-31 10:33:17 +0300121`launch(CommonPool) { ... }` with `thread { ... }` and `delay(...)` with `Thread.sleep(...)`. Try it.
122
123If you start by replacing `launch(CommonPool)` by `thread`, the compiler produces the following error:
124
125```
126Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
127```
128
Roman Elizarov419a6c82017-02-09 18:36:22 +0300129That is because [delay] is a special _suspending function_ that does not block a thread, but _suspends_
Roman Elizarov7deefb82017-01-31 10:33:17 +0300130coroutine and it can be only used from a coroutine.
131
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300132### Bridging blocking and non-blocking worlds
Roman Elizarov7deefb82017-01-31 10:33:17 +0300133
134The first example mixes _non-blocking_ `delay(...)` and _blocking_ `Thread.sleep(...)` in the same
135code of `main` function. It is easy to get lost. Let's cleanly separate blocking and non-blocking
Roman Elizarov419a6c82017-02-09 18:36:22 +0300136worlds by using [runBlocking]:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300137
138```kotlin
139fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
140 launch(CommonPool) { // create new coroutine in common thread pool
141 delay(1000L)
142 println("World!")
143 }
144 println("Hello,") // main coroutine continues while child is delayed
145 delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
146}
147```
148
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300149> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300150
Roman Elizarov419a6c82017-02-09 18:36:22 +0300151The result is the same, but this code uses only non-blocking [delay].
Roman Elizarov7deefb82017-01-31 10:33:17 +0300152
153`runBlocking { ... }` works as an adaptor that is used here to start the top-level main coroutine.
154The regular code outside of `runBlocking` _blocks_, until the coroutine inside `runBlocking` is active.
155
156This is also a way to write unit-tests for suspending functions:
157
158```kotlin
159class MyTest {
160 @Test
161 fun testMySuspendingFunction() = runBlocking<Unit> {
162 // here we can use suspending functions using any assertion style that we like
163 }
164}
165```
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300166
167<!--- CLEAR -->
Roman Elizarov7deefb82017-01-31 10:33:17 +0300168
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300169### Waiting for a job
Roman Elizarov7deefb82017-01-31 10:33:17 +0300170
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300171Delaying for a time while another coroutine is working is not a good approach. Let's explicitly
Roman Elizarov419a6c82017-02-09 18:36:22 +0300172wait (in a non-blocking way) until the background [Job] that we have launched is complete:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300173
174```kotlin
175fun main(args: Array<String>) = runBlocking<Unit> {
176 val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
177 delay(1000L)
178 println("World!")
179 }
180 println("Hello,")
181 job.join() // wait until child coroutine completes
182}
183```
184
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300185> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300186
187Now the result is still the same, but the code of the main coroutine is not tied to the duration of
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300188the background job in any way. Much better.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300189
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300190### Extract function refactoring
Roman Elizarov7deefb82017-01-31 10:33:17 +0300191
192Let's extract the block of code inside `launch(CommonPool} { ... }` into a separate function. When you
193perform "Extract function" refactoring on this code you get a new function with `suspend` modifier.
194That is your first _suspending function_. Suspending functions can be used inside coroutines
195just like regular functions, but their additional feature is that they can, in turn,
196use other suspending functions, like `delay` in this example, to _suspend_ execution of a coroutine.
197
198```kotlin
199fun main(args: Array<String>) = runBlocking<Unit> {
200 val job = launch(CommonPool) { doWorld() }
201 println("Hello,")
202 job.join()
203}
204
205// this is your first suspending function
206suspend fun doWorld() {
207 delay(1000L)
208 println("World!")
209}
210```
211
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300212> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-04.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300213
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300214### Coroutines ARE light-weight
Roman Elizarov7deefb82017-01-31 10:33:17 +0300215
216Run the following code:
217
218```kotlin
219fun main(args: Array<String>) = runBlocking<Unit> {
220 val jobs = List(100_000) { // create a lot of coroutines and list their jobs
221 launch(CommonPool) {
222 delay(1000L)
223 print(".")
224 }
225 }
226 jobs.forEach { it.join() } // wait for all jobs to complete
227}
228```
229
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300230> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-05.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300231
232It starts 100K coroutines and, after a second, each coroutine prints a dot.
233Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
234
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300235### Coroutines are like daemon threads
Roman Elizarov7deefb82017-01-31 10:33:17 +0300236
237The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300238returns from the main function after some delay:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300239
240```kotlin
241fun main(args: Array<String>) = runBlocking<Unit> {
242 launch(CommonPool) {
243 repeat(1000) { i ->
244 println("I'm sleeping $i ...")
245 delay(500L)
246 }
247 }
248 delay(1300L) // just quit after delay
249}
250```
251
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300252> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-06.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300253
254You can run and see that it prints three lines and terminates:
255
256```
257I'm sleeping 0 ...
258I'm sleeping 1 ...
259I'm sleeping 2 ...
260```
261
262Active coroutines do not keep the process alive. They are like daemon threads.
263
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300264## Cancellation and timeouts
265
266This section covers coroutine cancellation and timeouts.
267
268### Cancelling coroutine execution
Roman Elizarov7deefb82017-01-31 10:33:17 +0300269
270In small application the return from "main" method might sound like a good idea to get all coroutines
271implicitly terminated. In a larger, long-running application, you need finer-grained control.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300272The [launch] function returns a [Job] that can be used to cancel running coroutine:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300273
274```kotlin
275fun main(args: Array<String>) = runBlocking<Unit> {
276 val job = launch(CommonPool) {
277 repeat(1000) { i ->
278 println("I'm sleeping $i ...")
279 delay(500L)
280 }
281 }
282 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300283 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300284 job.cancel() // cancels the job
285 delay(1300L) // delay a bit to ensure it was cancelled indeed
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300286 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300287}
288```
289
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300290> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300291
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300292It produces the following output:
293
294```
295I'm sleeping 0 ...
296I'm sleeping 1 ...
297I'm sleeping 2 ...
298main: I'm tired of waiting!
299main: Now I can quit.
300```
301
302As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
303
304### Cancellation is cooperative
Roman Elizarov7deefb82017-01-31 10:33:17 +0300305
Tair Rzayevaf734622017-02-01 22:30:16 +0200306Coroutine cancellation is _cooperative_. A coroutine code has to cooperate to be cancellable.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300307All the suspending functions in `kotlinx.coroutines` are _cancellable_. They check for cancellation of
Roman Elizarov419a6c82017-02-09 18:36:22 +0300308coroutine and throw [CancellationException] when cancelled. However, if a coroutine is working in
Roman Elizarov7deefb82017-01-31 10:33:17 +0300309a computation and does not check for cancellation, then it cannot be cancelled, like the following
310example shows:
311
312```kotlin
313fun main(args: Array<String>) = runBlocking<Unit> {
314 val job = launch(CommonPool) {
315 var nextPrintTime = 0L
316 var i = 0
317 while (true) { // computation loop
318 val currentTime = System.currentTimeMillis()
319 if (currentTime >= nextPrintTime) {
320 println("I'm sleeping ${i++} ...")
321 nextPrintTime = currentTime + 500L
322 }
323 }
324 }
325 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300326 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300327 job.cancel() // cancels the job
328 delay(1300L) // delay a bit to see if it was cancelled....
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300329 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300330}
331```
332
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300333> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300334
335Run it to see that it continues to print "I'm sleeping" even after cancellation.
336
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300337### Making computation code cancellable
Roman Elizarov7deefb82017-01-31 10:33:17 +0300338
339There are two approaches to making computation code cancellable. The first one is to periodically
Roman Elizarov419a6c82017-02-09 18:36:22 +0300340invoke a suspending function. There is a [yield] function that is a good choice for that purpose.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300341The other one is to explicitly check the cancellation status. Let us try the later approach.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300342
343Replace `while (true)` in the previous example with `while (isActive)` and rerun it.
344
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300345```kotlin
346fun main(args: Array<String>) = runBlocking<Unit> {
347 val job = launch(CommonPool) {
348 var nextPrintTime = 0L
349 var i = 0
350 while (isActive) { // cancellable computation loop
351 val currentTime = System.currentTimeMillis()
352 if (currentTime >= nextPrintTime) {
353 println("I'm sleeping ${i++} ...")
354 nextPrintTime = currentTime + 500L
355 }
356 }
357 }
358 delay(1300L) // delay a bit
359 println("main: I'm tired of waiting!")
360 job.cancel() // cancels the job
361 delay(1300L) // delay a bit to see if it was cancelled....
362 println("main: Now I can quit.")
363}
364```
365
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300366> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300367
Roman Elizarov419a6c82017-02-09 18:36:22 +0300368As you can see, now this loop can be cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
369the code of coroutines via [CoroutineScope] object.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300370
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300371### Closing resources with finally
372
Roman Elizarov419a6c82017-02-09 18:36:22 +0300373Cancellable suspending functions throw [CancellationException] on cancellation which can be handled in
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300374all the usual way. For example, the `try {...} finally {...}` and Kotlin `use` function execute their
375finalization actions normally when coroutine is cancelled:
376
377```kotlin
378fun main(args: Array<String>) = runBlocking<Unit> {
379 val job = launch(CommonPool) {
380 try {
381 repeat(1000) { i ->
382 println("I'm sleeping $i ...")
383 delay(500L)
384 }
385 } finally {
386 println("I'm running finally")
387 }
388 }
389 delay(1300L) // delay a bit
390 println("main: I'm tired of waiting!")
391 job.cancel() // cancels the job
392 delay(1300L) // delay a bit to ensure it was cancelled indeed
393 println("main: Now I can quit.")
394}
395```
396
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300397> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300398
399The example above produces the following output:
400
401```
402I'm sleeping 0 ...
403I'm sleeping 1 ...
404I'm sleeping 2 ...
405main: I'm tired of waiting!
406I'm running finally
407main: Now I can quit.
408```
409
410### Run non-cancellable block
411
412Any attempt to use a suspending function in the `finally` block of the previous example will cause
Roman Elizarov419a6c82017-02-09 18:36:22 +0300413[CancellationException], because the coroutine running this code is cancelled. Usually, this is not a
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300414problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
415communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
416rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
Roman Elizarov419a6c82017-02-09 18:36:22 +0300417`run(NonCancellable) {...}` using [run] function and [NonCancellable] context as the following example shows:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300418
419```kotlin
420fun main(args: Array<String>) = runBlocking<Unit> {
421 val job = launch(CommonPool) {
422 try {
423 repeat(1000) { i ->
424 println("I'm sleeping $i ...")
425 delay(500L)
426 }
427 } finally {
428 run(NonCancellable) {
429 println("I'm running finally")
430 delay(1000L)
431 println("And I've just delayed for 1 sec because I'm non-cancellable")
432 }
433 }
434 }
435 delay(1300L) // delay a bit
436 println("main: I'm tired of waiting!")
437 job.cancel() // cancels the job
438 delay(1300L) // delay a bit to ensure it was cancelled indeed
439 println("main: Now I can quit.")
440}
441```
442
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300443> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300444
445### Timeout
446
447The most obvious reason to cancel coroutine execution in practice,
448is because its execution time has exceeded some timeout.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300449While you can manually track the reference to the corresponding [Job] and launch a separate coroutine to cancel
450the tracked one after delay, there is a ready to use [withTimeout] function that does it.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300451Look at the following example:
452
453```kotlin
454fun main(args: Array<String>) = runBlocking<Unit> {
455 withTimeout(1300L) {
456 repeat(1000) { i ->
457 println("I'm sleeping $i ...")
458 delay(500L)
459 }
460 }
461}
462```
463
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300464> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-06.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300465
466It produces the following output:
467
468```
469I'm sleeping 0 ...
470I'm sleeping 1 ...
471I'm sleeping 2 ...
472Exception in thread "main" java.util.concurrent.CancellationException: Timed out waiting for 1300 MILLISECONDS
473```
474
Roman Elizarov419a6c82017-02-09 18:36:22 +0300475We have not seen the [CancellationException] stack trace printed on the console before. That is because
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300476inside a cancelled coroutine `CancellationException` is a considered a normal reason for coroutine completion.
477However, in this example we have used `withTimeout` right inside the `main` function.
478
479Because cancellation is just an exception, all the resources will be closed in a usual way.
480You can wrap the code with timeout in `try {...} catch (e: CancellationException) {...}` block if
481you need to do some additional action specifically on timeout.
482
483## Composing suspending functions
484
485This section covers various approaches to composition of suspending functions.
486
487### Sequential by default
488
489Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300490remote service call or computation. We just pretend they are useful, but actually each one just
491delays for a second for the purpose of this example:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300492
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300493<!--- INCLUDE .*/example-compose-([0-9]+).kt
494import kotlin.system.measureTimeMillis
495-->
496
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300497```kotlin
498suspend fun doSomethingUsefulOne(): Int {
499 delay(1000L) // pretend we are doing something useful here
500 return 13
501}
502
503suspend fun doSomethingUsefulTwo(): Int {
504 delay(1000L) // pretend we are doing something useful here, too
505 return 29
506}
507```
508
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300509<!--- INCLUDE .*/example-compose-([0-9]+).kt -->
510
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300511What do we do if need to invoke them _sequentially_ -- first `doSomethingUsefulOne` _and then_
512`doSomethingUsefulTwo` and compute the sum of their results?
513In practise we do this if we use the results of the first function to make a decision on whether we need
514to invoke the second one or to decide on how to invoke it.
515
516We just use a normal sequential invocation, because the code in the coroutine, just like in the regular
Roman Elizarov32d95322017-02-09 15:57:31 +0300517code, is _sequential_ by default. The following example demonstrates it by measuring the total
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300518time it takes to execute both suspending functions:
519
520```kotlin
521fun main(args: Array<String>) = runBlocking<Unit> {
522 val time = measureTimeMillis {
523 val one = doSomethingUsefulOne()
524 val two = doSomethingUsefulTwo()
525 println("The answer is ${one + two}")
526 }
527 println("Completed in $time ms")
528}
529```
530
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300531> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-01.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300532
533It produces something like this:
534
535```
536The answer is 42
537Completed in 2017 ms
538```
539
Roman Elizarov32d95322017-02-09 15:57:31 +0300540### Concurrent using async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300541
542What if there are no dependencies between invocation of `doSomethingUsefulOne` and `doSomethingUsefulTwo` and
Roman Elizarov419a6c82017-02-09 18:36:22 +0300543we want to get the answer faster, by doing both _concurrently_? This is where [async] comes to help.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300544
Roman Elizarov419a6c82017-02-09 18:36:22 +0300545Conceptually, [async] is just like [launch]. It starts a separate coroutine which is a light-weight thread
546that works concurrently with all the other coroutines. The difference is that `launch` returns a [Job] and
547does not carry any resulting value, while `async` returns a [Deferred] -- a light-weight non-blocking future
Roman Elizarov32d95322017-02-09 15:57:31 +0300548that represents a promise to provide a result later. You can use `.await()` on a deferred value to get its eventual result,
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300549but `Deferred` is also a `Job`, so you can cancel it if needed.
550
551```kotlin
552fun main(args: Array<String>) = runBlocking<Unit> {
553 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300554 val one = async(CommonPool) { doSomethingUsefulOne() }
555 val two = async(CommonPool) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300556 println("The answer is ${one.await() + two.await()}")
557 }
558 println("Completed in $time ms")
559}
560```
561
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300562> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-02.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300563
564It produces something like this:
565
566```
567The answer is 42
568Completed in 1017 ms
569```
570
571This is twice as fast, because we have concurrent execution of two coroutines.
572Note, that concurrency with coroutines is always explicit.
573
Roman Elizarov32d95322017-02-09 15:57:31 +0300574### Lazily started async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300575
Roman Elizarov419a6c82017-02-09 18:36:22 +0300576There is a laziness option to [async] with `start = false` parameter.
577It starts coroutine only when its result is needed by some
578[await][Deferred.await] or if a [start][Job.start] function
Roman Elizarov32d95322017-02-09 15:57:31 +0300579is invoked. Run the following example that differs from the previous one only by this option:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300580
581```kotlin
582fun main(args: Array<String>) = runBlocking<Unit> {
583 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300584 val one = async(CommonPool, start = false) { doSomethingUsefulOne() }
585 val two = async(CommonPool, start = false) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300586 println("The answer is ${one.await() + two.await()}")
587 }
588 println("Completed in $time ms")
589}
590```
591
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300592> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-03.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300593
594It produces something like this:
595
596```
597The answer is 42
598Completed in 2017 ms
599```
600
Roman Elizarov32d95322017-02-09 15:57:31 +0300601So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
602for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
603the standard `lazy` function in cases when computation of the value involves suspending functions.
604
605### Async-style functions
606
607We can define async-style functions that invoke `doSomethingUsefulOne` and `doSomethingUsefulTwo`
Roman Elizarov419a6c82017-02-09 18:36:22 +0300608_asynchronously_ using [async] coroutine builder. It is a good style to name such functions with
Roman Elizarov32d95322017-02-09 15:57:31 +0300609either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous
610computation and one needs to use the resulting deferred value to get the result.
611
612```kotlin
613// The result type of asyncSomethingUsefulOne is Deferred<Int>
614fun asyncSomethingUsefulOne() = async(CommonPool) {
615 doSomethingUsefulOne()
616}
617
618// The result type of asyncSomethingUsefulTwo is Deferred<Int>
619fun asyncSomethingUsefulTwo() = async(CommonPool) {
620 doSomethingUsefulTwo()
621}
622```
623
624Note, that these `asyncXXX` function are **not** _suspending_ functions. They can be used from anywhere.
625However, their use always implies asynchronous (here meaning _concurrent_) execution of their action
626with the invoking code.
627
628The following example shows their use outside of coroutine:
629
630```kotlin
631// note, that we don't have `runBlocking` to the right of `main` in this example
632fun main(args: Array<String>) {
633 val time = measureTimeMillis {
634 // we can initiate async actions outside of a coroutine
635 val one = asyncSomethingUsefulOne()
636 val two = asyncSomethingUsefulTwo()
637 // but waiting for a result must involve either suspending or blocking.
638 // here we use `runBlocking { ... }` to block the main thread while waiting for the result
639 runBlocking {
640 println("The answer is ${one.await() + two.await()}")
641 }
642 }
643 println("Completed in $time ms")
644}
645```
646
647> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300648
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300649## Coroutine context and dispatchers
650
Roman Elizarov32d95322017-02-09 15:57:31 +0300651We've already seen `launch(CommonPool) {...}`, `async(CommonPool) {...}`, `run(NonCancellable) {...}`, etc.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300652In these code snippets [CommonPool] and [NonCancellable] are _coroutine contexts_.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300653This section covers other available choices.
654
655### Dispatchers and threads
656
Roman Elizarov419a6c82017-02-09 18:36:22 +0300657Coroutine context includes a [_coroutine dispatcher_][CoroutineDispatcher] which determines what thread or threads
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300658the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
659to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:
660
661```kotlin
662fun main(args: Array<String>) = runBlocking<Unit> {
663 val jobs = arrayListOf<Job>()
664 jobs += launch(Unconfined) { // not confined -- will work with main thread
665 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
666 }
667 jobs += launch(context) { // context of the parent, runBlocking coroutine
668 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
669 }
670 jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
671 println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
672 }
673 jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
674 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
675 }
676 jobs.forEach { it.join() }
677}
678```
679
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300680> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-01.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300681
682It produces the following output (maybe in different order):
683
684```
685 'Unconfined': I'm working in thread main
686 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
687 'newSTC': I'm working in thread MyOwnThread
688 'context': I'm working in thread main
689```
690
Roman Elizarov419a6c82017-02-09 18:36:22 +0300691The difference between parent [context][CoroutineScope.context] and [Unconfined] context will be shown later.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300692
693### Unconfined vs confined dispatcher
694
Roman Elizarov419a6c82017-02-09 18:36:22 +0300695The [Unconfined] coroutine dispatcher starts coroutine in the caller thread, but only until the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300696first suspension point. After suspension it resumes in the thread that is fully determined by the
697suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not
698consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
699
Roman Elizarov419a6c82017-02-09 18:36:22 +0300700On the other side, [context][CoroutineScope.context] property that is available inside the block of any coroutine
701via [CoroutineScope] interface, is a reference to a context of this particular coroutine.
702This way, a parent context can be inherited. The default context of [runBlocking], in particular,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300703is confined to be invoker thread, so inheriting it has the effect of confining execution to
704this thread with a predictable FIFO scheduling.
705
706```kotlin
707fun main(args: Array<String>) = runBlocking<Unit> {
708 val jobs = arrayListOf<Job>()
709 jobs += launch(Unconfined) { // not confined -- will work with main thread
710 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
711 delay(1000)
712 println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
713 }
714 jobs += launch(context) { // context of the parent, runBlocking coroutine
715 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
716 delay(1000)
717 println(" 'context': After delay in thread ${Thread.currentThread().name}")
718 }
719 jobs.forEach { it.join() }
720}
721```
722
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300723> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-contest-02.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300724
725Produces the output:
726
727```
728 'Unconfined': I'm working in thread main
729 'context': I'm working in thread main
730 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
731 'context': After delay in thread main
732```
733
734So, the coroutine the had inherited `context` of `runBlocking {...}` continues to execute in the `main` thread,
Roman Elizarov419a6c82017-02-09 18:36:22 +0300735while the unconfined one had resumed in the scheduler thread that [delay] function is using.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300736
737### Debugging coroutines and threads
738
Roman Elizarov419a6c82017-02-09 18:36:22 +0300739Coroutines can suspend on one thread and resume on another thread with [Unconfined] dispatcher or
740with a multi-threaded dispatcher like [CommonPool]. Even with a single-threaded dispatcher it might be hard to
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300741figure out what coroutine was doing what, where, and when. The common approach to debugging applications with
742threads is to print the thread name in the log file on each log statement. This feature is universally supported
743by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
744`kotlinx.coroutines` includes debugging facilities to make it easier.
745
746Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
747
748```kotlin
749fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
750
751fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov32d95322017-02-09 15:57:31 +0300752 val a = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300753 log("I'm computing a piece of the answer")
754 6
755 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300756 val b = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300757 log("I'm computing another piece of the answer")
758 7
759 }
760 log("The answer is ${a.await() * b.await()}")
761}
762```
763
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300764> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300765
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300766There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300767and two coroutines computing deferred values `a` (#2) and `b` (#3).
768They are all executing in the context of `runBlocking` and are confined to the main thread.
769The output of this code is:
770
771```
772[main @coroutine#2] I'm computing a piece of the answer
773[main @coroutine#3] I'm computing another piece of the answer
774[main @coroutine#1] The answer is 42
775```
776
777The `log` function prints the name of the thread in square brackets and you can see, that it is the `main`
778thread, but the identifier of the currently executing coroutine is appended to it. This identifier
779is consecutively assigned to all created coroutines when debugging mode is turned on.
780
Roman Elizarov419a6c82017-02-09 18:36:22 +0300781You can read more about debugging facilities in the documentation for [newCoroutineContext] function.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300782
783### Jumping between threads
784
785Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
786
787```kotlin
788fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
789
790fun main(args: Array<String>) {
791 val ctx1 = newSingleThreadContext("Ctx1")
792 val ctx2 = newSingleThreadContext("Ctx2")
793 runBlocking(ctx1) {
794 log("Started in ctx1")
795 run(ctx2) {
796 log("Working in ctx2")
797 }
798 log("Back to ctx1")
799 }
800}
801```
802
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300803> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300804
Roman Elizarov419a6c82017-02-09 18:36:22 +0300805It demonstrates two new techniques. One is using [runBlocking] with an explicitly specified context, and
806the second one is using [run] function to change a context of a coroutine while still staying in the
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300807same coroutine as you can see in the output below:
808
809```
810[Ctx1 @coroutine#1] Started in ctx1
811[Ctx2 @coroutine#1] Working in ctx2
812[Ctx1 @coroutine#1] Back to ctx1
813```
814
815### Job in the context
816
Roman Elizarov419a6c82017-02-09 18:36:22 +0300817The coroutine [Job] is part of its context. The coroutine can retrieve it from its own context
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300818using `context[Job]` expression:
819
820```kotlin
821fun main(args: Array<String>) = runBlocking<Unit> {
822 println("My job is ${context[Job]}")
823}
824```
825
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300826> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-05.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300827
828It produces
829
830```
831My job is BlockingCoroutine{isActive=true}
832```
833
Roman Elizarov419a6c82017-02-09 18:36:22 +0300834So, [isActive][CoroutineScope.isActive] in [CoroutineScope] is just a convenient shortcut for `context[Job]!!.isActive`.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300835
836### Children of a coroutine
837
Roman Elizarov419a6c82017-02-09 18:36:22 +0300838When [context][CoroutineScope.context] of a coroutine is used to launch another coroutine,
839the [Job] of the new coroutine becomes
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300840a _child_ of the parent coroutine's job. When the parent coroutine is cancelled, all its children
841are recursively cancelled, too.
842
843```kotlin
844fun main(args: Array<String>) = runBlocking<Unit> {
845 // start a coroutine to process some kind of incoming request
846 val request = launch(CommonPool) {
847 // it spawns two other jobs, one with its separate context
848 val job1 = launch(CommonPool) {
849 println("job1: I have my own context and execute independently!")
850 delay(1000)
851 println("job1: I am not affected by cancellation of the request")
852 }
853 // and the other inherits the parent context
854 val job2 = launch(context) {
855 println("job2: I am a child of the request coroutine")
856 delay(1000)
857 println("job2: I will not execute this line if my parent request is cancelled")
858 }
859 // request completes when both its sub-jobs complete:
860 job1.join()
861 job2.join()
862 }
863 delay(500)
864 request.cancel() // cancel processing of the request
865 delay(1000) // delay a second to see what happens
866 println("main: Who has survived request cancellation?")
867}
868```
869
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300870> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-06.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300871
872The output of this code is:
873
874```
875job1: I have my own context and execute independently!
876job2: I am a child of the request coroutine
877job1: I am not affected by cancellation of the request
878main: Who has survived request cancellation?
879```
880
881### Combining contexts
882
883Coroutine context can be combined using `+` operator. The context on the right-hand side replaces relevant entries
Roman Elizarov419a6c82017-02-09 18:36:22 +0300884of the context on the left-hand side. For example, a [Job] of the parent coroutine can be inherited, while
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300885its dispatcher replaced:
886
887```kotlin
888fun main(args: Array<String>) = runBlocking<Unit> {
889 // start a coroutine to process some kind of incoming request
890 val request = launch(context) { // use the context of `runBlocking`
891 // spawns CPU-intensive child job in CommonPool !!!
892 val job = launch(context + CommonPool) {
893 println("job: I am a child of the request coroutine, but with a different dispatcher")
894 delay(1000)
895 println("job: I will not execute this line if my parent request is cancelled")
896 }
897 job.join() // request completes when its sub-job completes
898 }
899 delay(500)
900 request.cancel() // cancel processing of the request
901 delay(1000) // delay a second to see what happens
902 println("main: Who has survived request cancellation?")
903}
904```
905
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300906> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-07.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300907
908The expected outcome of this code is:
909
910```
911job: I am a child of the request coroutine, but with a different dispatcher
912main: Who has survived request cancellation?
913```
914
915### Naming coroutines for debugging
916
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300917Automatically assigned ids are good when coroutines log often and you just need to correlate log records
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300918coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
919or doing some specific background task, it is better to name it explicitly for debugging purposes.
Roman Elizarov419a6c82017-02-09 18:36:22 +0300920[CoroutineName] serves the same function as a thread name. It'll get displayed in the thread name that
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300921is executing this coroutine when debugging more is turned on.
922
923The following example demonstrates this concept:
924
925```kotlin
926fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
927
928fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
929 log("Started main coroutine")
930 // run two background value computations
Roman Elizarov32d95322017-02-09 15:57:31 +0300931 val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300932 log("Computing v1")
933 delay(500)
934 252
935 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300936 val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300937 log("Computing v2")
938 delay(1000)
939 6
940 }
941 log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
942}
943```
944
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300945> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300946
947The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
948
949```
950[main @main#1] Started main coroutine
951[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
952[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
953[main @main#1] The answer for v1 / v2 = 42
954```
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300955
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300956## Channels
Roman Elizarov7deefb82017-01-31 10:33:17 +0300957
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300958Deferred values provide a convenient way to transfer a single value between coroutines.
959Channels provide a way to transfer a stream of values.
960
961<!--- INCLUDE .*/example-channel-([0-9]+).kt
962import kotlinx.coroutines.experimental.channels.*
963-->
964
965### Channel basics
966
Roman Elizarov419a6c82017-02-09 18:36:22 +0300967A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
968instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
969a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300970
971```kotlin
972fun main(args: Array<String>) = runBlocking<Unit> {
973 val channel = Channel<Int>()
974 launch(CommonPool) {
975 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
976 for (x in 1..5) channel.send(x * x)
977 }
978 // here we print five received integers:
979 repeat(5) { println(channel.receive()) }
980 println("Done!")
981}
982```
983
984> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
985
986### Closing and iteration over channels
987
988Unlike a queue, a channel can be closed to indicate that no more elements are coming.
989On the receiver side it is convenient to use a regular `for` loop to receive elements
990from the channel.
991
Roman Elizarov419a6c82017-02-09 18:36:22 +0300992Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300993The iteration stops as soon as this close token is received, so there is a guarantee
994that all previously sent elements before the close are received:
995
996```kotlin
997fun main(args: Array<String>) = runBlocking<Unit> {
998 val channel = Channel<Int>()
999 launch(CommonPool) {
1000 for (x in 1..5) channel.send(x * x)
1001 channel.close() // we're done sending
1002 }
1003 // here we print received values using `for` loop (until the channel is closed)
1004 for (y in channel) println(y)
1005 println("Done!")
1006}
1007```
1008
1009> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
1010
1011### Building channel producers
1012
1013The pattern where a coroutine is producing a sequence of elements into a channel is quite common.
1014You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
1015to common sense that results must be returned from functions. Here is a convenience
Roman Elizarov419a6c82017-02-09 18:36:22 +03001016coroutine builder named [buildChannel] that makes it easy to do it right:
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001017
1018```kotlin
1019fun produceSquares() = buildChannel<Int>(CommonPool) {
1020 for (x in 1..5) send(x * x)
1021}
1022
1023fun main(args: Array<String>) = runBlocking<Unit> {
1024 val squares = produceSquares()
1025 for (y in squares) println(y)
1026 println("Done!")
1027}
1028```
1029
1030> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
1031
1032### Pipelines
1033
1034Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
1035
1036```kotlin
1037fun produceNumbers() = buildChannel<Int>(CommonPool) {
1038 var x = 1
1039 while (true) send(x++) // infinite stream of integers starting from 1
1040}
1041```
1042
1043And another coroutine or coroutines are receiving that stream, doing some processing, and sending the result.
1044In the below example the numbers are just squared:
1045
1046```kotlin
1047fun square(numbers: ReceiveChannel<Int>) = buildChannel<Int>(CommonPool) {
1048 for (x in numbers) send(x * x)
1049}
1050```
1051
1052The main code starts and connects pipeline:
1053
1054```kotlin
1055fun main(args: Array<String>) = runBlocking<Unit> {
1056 val numbers = produceNumbers() // produces integers from 1 and on
1057 val squares = square(numbers) // squares integers
1058 for (i in 1..5) println(squares.receive()) // print first five
1059 println("Done!") // we are done
1060 squares.cancel() // need to cancel these coroutines in a larger app
1061 numbers.cancel()
1062}
1063```
1064
1065> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
1066
1067We don't have to cancel these coroutines in this example app, because
1068[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
1069but in a larger app we'll need to stop our pipeline if we don't need it anymore.
1070Alternatively, we could have run pipeline coroutines as
1071[children of a coroutine](#children-of-a-coroutine).
1072
1073### Prime numbers with pipeline
1074
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001075Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001076of coroutines. We start with an infinite sequence of numbers. This time we introduce an
1077explicit context parameter, so that caller can control where our coroutines run:
1078
1079<!--- INCLUDE kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
1080import kotlin.coroutines.experimental.CoroutineContext
1081-->
1082
1083```kotlin
1084fun numbersFrom(context: CoroutineContext, start: Int) = buildChannel<Int>(context) {
1085 var x = start
1086 while (true) send(x++) // infinite stream of integers from start
1087}
1088```
1089
1090The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1091that are divisible by the given prime number:
1092
1093```kotlin
1094fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = buildChannel<Int>(context) {
1095 for (x in numbers) if (x % prime != 0) send(x)
1096}
1097```
1098
1099Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
Roman Elizarov62500ba2017-02-09 18:55:40 +03001100and launching new pipeline stage for each prime number found:
1101
1102```
1103numbers -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
1104```
1105
1106The following example prints the first ten prime numbers,
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001107running the whole pipeline in the context of the main thread:
1108
1109```kotlin
1110fun main(args: Array<String>) = runBlocking<Unit> {
1111 var cur = numbersFrom(context, 2)
1112 for (i in 1..10) {
1113 val prime = cur.receive()
1114 println(prime)
1115 cur = filter(context, cur, prime)
1116 }
1117}
1118```
1119
1120> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
1121
1122The output of this code is:
1123
1124```
11252
11263
11275
11287
112911
113013
113117
113219
113323
113429
1135```
1136
Roman Elizarov62500ba2017-02-09 18:55:40 +03001137Note, that you can build the same pipeline using `buildIterator` from the standard library.
1138Replace `buildSequence` with `buildIterator`, `send` with `yield`, `receive` with `next`,
1139`ReceiveChannel` with `Iterator`, and get rid of the context. You will not need `runBlocking` either.
1140However, the benefit of a pipeline that uses channels as shown above is that it can actually use
1141multiple CPU cores if you run it in [CommonPool] context.
1142
1143Anyway, this is an extremely impractical way to find prime numbers. In practise, pipelines do involve some
1144other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
1145built using `buildSeqeunce`/`buildIterator`, because they do not allow arbitrary suspension, unlike
1146`buildChannel` which is fully asynchronous.
1147
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001148### Fan-out
1149
1150Multiple coroutines may receive from the same channel, distributing work between themselves.
1151Let us start with a producer coroutine that is periodically producing integers
1152(ten numbers per second):
1153
1154```kotlin
1155fun produceNumbers() = buildChannel<Int>(CommonPool) {
1156 var x = 1 // start from 1
1157 while (true) {
1158 send(x++) // produce next
1159 delay(100) // wait 0.1s
1160 }
1161}
1162```
1163
1164Then we can have several processor coroutines. In this example, they just print their id and
1165received number:
1166
1167```kotlin
1168fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
1169 while (true) {
1170 val x = channel.receive()
1171 println("Processor #$id received $x")
1172 }
1173}
1174```
1175
1176Now let us launch five processors and let them work for a second. See what happens:
1177
1178```kotlin
1179fun main(args: Array<String>) = runBlocking<Unit> {
1180 val producer = produceNumbers()
1181 repeat(5) { launchProcessor(it, producer) }
1182 delay(1000)
1183 producer.cancel() // cancel producer coroutine and thus kill them all
1184}
1185```
1186
1187> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
1188
1189The output will be similar to the the following one, albeit the processor ids that receive
1190each specific integer may be different:
1191
1192```
1193Processor #2 received 1
1194Processor #4 received 2
1195Processor #0 received 3
1196Processor #1 received 4
1197Processor #3 received 5
1198Processor #2 received 6
1199Processor #4 received 7
1200Processor #0 received 8
1201Processor #1 received 9
1202Processor #3 received 10
1203```
1204
1205Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1206over the channel that processor coroutines are doing.
1207
1208### Fan-in
1209
1210Multiple coroutines may send to the same channel.
1211For example, let us have a channel of strings, and a suspending function that
1212repeatedly sends a specified string to this channel with a specified delay:
1213
1214```kotlin
1215suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1216 while (true) {
1217 delay(time)
1218 channel.send(s)
1219 }
1220}
1221```
1222
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001223Now, let us see what happens if we launch a couple of coroutines sending strings
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001224(in this example we launch them in the context of the main thread):
1225
1226```kotlin
1227fun main(args: Array<String>) = runBlocking<Unit> {
1228 val channel = Channel<String>()
1229 launch(context) { sendString(channel, "foo", 200L) }
1230 launch(context) { sendString(channel, "BAR!", 500L) }
1231 repeat(6) { // receive first six
1232 println(channel.receive())
1233 }
1234}
1235```
1236
1237> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
1238
1239The output is:
1240
1241```
1242foo
1243foo
1244BAR!
1245foo
1246foo
1247BAR!
1248```
1249
1250### Buffered channels
1251
1252The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1253meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1254if receive is invoked first, it is suspended until send is invoked.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001255
1256Both [Channel()][Channel.invoke] factory function and [buildChannel] builder take an optional `capacity` parameter to
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001257specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1258similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1259
1260Take a look at the behavior of the following code:
1261
1262```kotlin
1263fun main(args: Array<String>) = runBlocking<Unit> {
1264 val channel = Channel<Int>(4) // create buffered channel
1265 launch(context) { // launch sender coroutine
1266 repeat(10) {
1267 println("Sending $it") // print before sending each element
1268 channel.send(it) // will suspend when buffer is full
1269 }
1270 }
1271 // don't receive anything... just wait....
1272 delay(1000)
1273}
1274```
1275
1276> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
1277
1278It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1279
1280```
1281Sending 0
1282Sending 1
1283Sending 2
1284Sending 3
1285Sending 4
1286```
1287
1288The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Roman Elizarov419a6c82017-02-09 18:36:22 +03001289
1290<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/ -->
1291<!--- INDEX kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core/kotlinx.coroutines.experimental/index.md -->
1292[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
1293[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
1294[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
1295[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
1296[CoroutineScope.context]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/context.html
1297[CoroutineScope.isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/is-active.html
1298[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
1299[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/await.html
1300[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
1301[Job.start]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/start.html
1302[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
1303[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
1304[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-cancellation-exception.html
1305[async]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/async.html
1306[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/delay.html
1307[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
1308[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
1309[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
1310[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
1311[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
1312[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
1313<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/ -->
1314<!--- INDEX kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/index.md -->
1315[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
1316[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/invoke.html
1317[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/receive.html
1318[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/close.html
1319[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/send.html
1320[buildChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/build-channel.html
1321<!--- END -->