blob: 5bd5f62e0f2e627953b192879da708caf35fd156 [file] [log] [blame] [view]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001<!---
2
3Copyright 2016-2017 JetBrains s.r.o.
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17-->
18
Roman Elizarov7deefb82017-01-31 10:33:17 +030019# Guide to kotlinx.coroutines by example
20
21This is a short guide on core features of `kotlinx.coroutines` with a series of examples.
22
Roman Elizarov1293ccd2017-02-01 18:49:54 +030023## Table of contents
24
Roman Elizarovfa7723e2017-02-06 11:17:51 +030025<!--- TOC -->
26
Roman Elizarov1293ccd2017-02-01 18:49:54 +030027* [Coroutine basics](#coroutine-basics)
28 * [Your first coroutine](#your-first-coroutine)
29 * [Bridging blocking and non-blocking worlds](#bridging-blocking-and-non-blocking-worlds)
30 * [Waiting for a job](#waiting-for-a-job)
31 * [Extract function refactoring](#extract-function-refactoring)
32 * [Coroutines ARE light-weight](#coroutines-are-light-weight)
33 * [Coroutines are like daemon threads](#coroutines-are-like-daemon-threads)
34* [Cancellation and timeouts](#cancellation-and-timeouts)
35 * [Cancelling coroutine execution](#cancelling-coroutine-execution)
36 * [Cancellation is cooperative](#cancellation-is-cooperative)
37 * [Making computation code cancellable](#making-computation-code-cancellable)
38 * [Closing resources with finally](#closing-resources-with-finally)
39 * [Run non-cancellable block](#run-non-cancellable-block)
40 * [Timeout](#timeout)
41* [Composing suspending functions](#composing-suspending-functions)
42 * [Sequential by default](#sequential-by-default)
Roman Elizarov32d95322017-02-09 15:57:31 +030043 * [Concurrent using async](#concurrent-using-async)
44 * [Lazily started async](#lazily-started-async)
45 * [Async-style functions](#async-style-functions)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030046* [Coroutine context and dispatchers](#coroutine-context-and-dispatchers)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030047 * [Dispatchers and threads](#dispatchers-and-threads)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +030048 * [Unconfined vs confined dispatcher](#unconfined-vs-confined-dispatcher)
49 * [Debugging coroutines and threads](#debugging-coroutines-and-threads)
50 * [Jumping between threads](#jumping-between-threads)
51 * [Job in the context](#job-in-the-context)
52 * [Children of a coroutine](#children-of-a-coroutine)
53 * [Combining contexts](#combining-contexts)
54 * [Naming coroutines for debugging](#naming-coroutines-for-debugging)
Roman Elizarovb7721cf2017-02-03 19:23:08 +030055* [Channels](#channels)
56 * [Channel basics](#channel-basics)
57 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
58 * [Building channel producers](#building-channel-producers)
59 * [Pipelines](#pipelines)
60 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
61 * [Fan-out](#fan-out)
62 * [Fan-in](#fan-in)
63 * [Buffered channels](#buffered-channels)
Roman Elizarovfa7723e2017-02-06 11:17:51 +030064
Roman Elizarovb3d55a52017-02-03 12:47:21 +030065<!--- KNIT kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
Roman Elizarov1293ccd2017-02-01 18:49:54 +030066
Roman Elizarovfa7723e2017-02-06 11:17:51 +030067<!--- INCLUDE .*/example-([a-z]+)-([0-9]+)\.kt
Roman Elizarovf16fd272017-02-07 11:26:00 +030068/*
69 * Copyright 2016-2017 JetBrains s.r.o.
70 *
71 * Licensed under the Apache License, Version 2.0 (the "License");
72 * you may not use this file except in compliance with the License.
73 * You may obtain a copy of the License at
74 *
75 * http://www.apache.org/licenses/LICENSE-2.0
76 *
77 * Unless required by applicable law or agreed to in writing, software
78 * distributed under the License is distributed on an "AS IS" BASIS,
79 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
80 * See the License for the specific language governing permissions and
81 * limitations under the License.
82 */
83
Roman Elizarovb3d55a52017-02-03 12:47:21 +030084// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
Roman Elizarovfa7723e2017-02-06 11:17:51 +030085package guide.$$1.example$$2
Roman Elizarovb3d55a52017-02-03 12:47:21 +030086
87import kotlinx.coroutines.experimental.*
88-->
89
Roman Elizarov1293ccd2017-02-01 18:49:54 +030090## Coroutine basics
91
92This section covers basic coroutine concepts.
93
94### Your first coroutine
Roman Elizarov7deefb82017-01-31 10:33:17 +030095
96Run the following code:
97
98```kotlin
99fun main(args: Array<String>) {
100 launch(CommonPool) { // create new coroutine in common thread pool
101 delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
102 println("World!") // print after delay
103 }
104 println("Hello,") // main function continues while coroutine is delayed
105 Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
106}
107```
108
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300109> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300110
111Run this code:
112
113```
114Hello,
115World!
116```
117
118Essentially, coroutines are light-weight threads. You can achieve the same result replacing
119`launch(CommonPool) { ... }` with `thread { ... }` and `delay(...)` with `Thread.sleep(...)`. Try it.
120
121If you start by replacing `launch(CommonPool)` by `thread`, the compiler produces the following error:
122
123```
124Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
125```
126
127That is because `delay` is a special _suspending function_ that does not block a thread, but _suspends_
128coroutine and it can be only used from a coroutine.
129
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300130### Bridging blocking and non-blocking worlds
Roman Elizarov7deefb82017-01-31 10:33:17 +0300131
132The first example mixes _non-blocking_ `delay(...)` and _blocking_ `Thread.sleep(...)` in the same
133code of `main` function. It is easy to get lost. Let's cleanly separate blocking and non-blocking
134worlds by using `runBlocking { ... }`:
135
136```kotlin
137fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
138 launch(CommonPool) { // create new coroutine in common thread pool
139 delay(1000L)
140 println("World!")
141 }
142 println("Hello,") // main coroutine continues while child is delayed
143 delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
144}
145```
146
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300147> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300148
149The result is the same, but this code uses only non-blocking `delay`.
150
151`runBlocking { ... }` works as an adaptor that is used here to start the top-level main coroutine.
152The regular code outside of `runBlocking` _blocks_, until the coroutine inside `runBlocking` is active.
153
154This is also a way to write unit-tests for suspending functions:
155
156```kotlin
157class MyTest {
158 @Test
159 fun testMySuspendingFunction() = runBlocking<Unit> {
160 // here we can use suspending functions using any assertion style that we like
161 }
162}
163```
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300164
165<!--- CLEAR -->
Roman Elizarov7deefb82017-01-31 10:33:17 +0300166
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300167### Waiting for a job
Roman Elizarov7deefb82017-01-31 10:33:17 +0300168
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300169Delaying for a time while another coroutine is working is not a good approach. Let's explicitly
170wait (in a non-blocking way) until the background job coroutine that we have launched is complete:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300171
172```kotlin
173fun main(args: Array<String>) = runBlocking<Unit> {
174 val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
175 delay(1000L)
176 println("World!")
177 }
178 println("Hello,")
179 job.join() // wait until child coroutine completes
180}
181```
182
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300183> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300184
185Now the result is still the same, but the code of the main coroutine is not tied to the duration of
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300186the background job in any way. Much better.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300187
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300188### Extract function refactoring
Roman Elizarov7deefb82017-01-31 10:33:17 +0300189
190Let's extract the block of code inside `launch(CommonPool} { ... }` into a separate function. When you
191perform "Extract function" refactoring on this code you get a new function with `suspend` modifier.
192That is your first _suspending function_. Suspending functions can be used inside coroutines
193just like regular functions, but their additional feature is that they can, in turn,
194use other suspending functions, like `delay` in this example, to _suspend_ execution of a coroutine.
195
196```kotlin
197fun main(args: Array<String>) = runBlocking<Unit> {
198 val job = launch(CommonPool) { doWorld() }
199 println("Hello,")
200 job.join()
201}
202
203// this is your first suspending function
204suspend fun doWorld() {
205 delay(1000L)
206 println("World!")
207}
208```
209
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300210> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-04.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300211
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300212### Coroutines ARE light-weight
Roman Elizarov7deefb82017-01-31 10:33:17 +0300213
214Run the following code:
215
216```kotlin
217fun main(args: Array<String>) = runBlocking<Unit> {
218 val jobs = List(100_000) { // create a lot of coroutines and list their jobs
219 launch(CommonPool) {
220 delay(1000L)
221 print(".")
222 }
223 }
224 jobs.forEach { it.join() } // wait for all jobs to complete
225}
226```
227
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300228> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-05.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300229
230It starts 100K coroutines and, after a second, each coroutine prints a dot.
231Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
232
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300233### Coroutines are like daemon threads
Roman Elizarov7deefb82017-01-31 10:33:17 +0300234
235The following code launches a long-running coroutine that prints "I'm sleeping" twice a second and then
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300236returns from the main function after some delay:
Roman Elizarov7deefb82017-01-31 10:33:17 +0300237
238```kotlin
239fun main(args: Array<String>) = runBlocking<Unit> {
240 launch(CommonPool) {
241 repeat(1000) { i ->
242 println("I'm sleeping $i ...")
243 delay(500L)
244 }
245 }
246 delay(1300L) // just quit after delay
247}
248```
249
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300250> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-basic-06.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300251
252You can run and see that it prints three lines and terminates:
253
254```
255I'm sleeping 0 ...
256I'm sleeping 1 ...
257I'm sleeping 2 ...
258```
259
260Active coroutines do not keep the process alive. They are like daemon threads.
261
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300262## Cancellation and timeouts
263
264This section covers coroutine cancellation and timeouts.
265
266### Cancelling coroutine execution
Roman Elizarov7deefb82017-01-31 10:33:17 +0300267
268In small application the return from "main" method might sound like a good idea to get all coroutines
269implicitly terminated. In a larger, long-running application, you need finer-grained control.
270The `launch` function returns a `Job` that can be used to cancel running coroutine:
271
272```kotlin
273fun main(args: Array<String>) = runBlocking<Unit> {
274 val job = launch(CommonPool) {
275 repeat(1000) { i ->
276 println("I'm sleeping $i ...")
277 delay(500L)
278 }
279 }
280 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300281 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300282 job.cancel() // cancels the job
283 delay(1300L) // delay a bit to ensure it was cancelled indeed
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300284 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300285}
286```
287
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300288> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300289
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300290It produces the following output:
291
292```
293I'm sleeping 0 ...
294I'm sleeping 1 ...
295I'm sleeping 2 ...
296main: I'm tired of waiting!
297main: Now I can quit.
298```
299
300As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
301
302### Cancellation is cooperative
Roman Elizarov7deefb82017-01-31 10:33:17 +0300303
Tair Rzayevaf734622017-02-01 22:30:16 +0200304Coroutine cancellation is _cooperative_. A coroutine code has to cooperate to be cancellable.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300305All the suspending functions in `kotlinx.coroutines` are _cancellable_. They check for cancellation of
306coroutine and throw `CancellationException` when cancelled. However, if a coroutine is working in
307a computation and does not check for cancellation, then it cannot be cancelled, like the following
308example shows:
309
310```kotlin
311fun main(args: Array<String>) = runBlocking<Unit> {
312 val job = launch(CommonPool) {
313 var nextPrintTime = 0L
314 var i = 0
315 while (true) { // computation loop
316 val currentTime = System.currentTimeMillis()
317 if (currentTime >= nextPrintTime) {
318 println("I'm sleeping ${i++} ...")
319 nextPrintTime = currentTime + 500L
320 }
321 }
322 }
323 delay(1300L) // delay a bit
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300324 println("main: I'm tired of waiting!")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300325 job.cancel() // cancels the job
326 delay(1300L) // delay a bit to see if it was cancelled....
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300327 println("main: Now I can quit.")
Roman Elizarov7deefb82017-01-31 10:33:17 +0300328}
329```
330
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300331> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300332
333Run it to see that it continues to print "I'm sleeping" even after cancellation.
334
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300335### Making computation code cancellable
Roman Elizarov7deefb82017-01-31 10:33:17 +0300336
337There are two approaches to making computation code cancellable. The first one is to periodically
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300338invoke a suspending function. There is a `yield` function that is a good choice for that purpose.
339The other one is to explicitly check the cancellation status. Let us try the later approach.
Roman Elizarov7deefb82017-01-31 10:33:17 +0300340
341Replace `while (true)` in the previous example with `while (isActive)` and rerun it.
342
Roman Elizarovb3d55a52017-02-03 12:47:21 +0300343```kotlin
344fun main(args: Array<String>) = runBlocking<Unit> {
345 val job = launch(CommonPool) {
346 var nextPrintTime = 0L
347 var i = 0
348 while (isActive) { // cancellable computation loop
349 val currentTime = System.currentTimeMillis()
350 if (currentTime >= nextPrintTime) {
351 println("I'm sleeping ${i++} ...")
352 nextPrintTime = currentTime + 500L
353 }
354 }
355 }
356 delay(1300L) // delay a bit
357 println("main: I'm tired of waiting!")
358 job.cancel() // cancels the job
359 delay(1300L) // delay a bit to see if it was cancelled....
360 println("main: Now I can quit.")
361}
362```
363
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300364> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
Roman Elizarov7deefb82017-01-31 10:33:17 +0300365
366As you can see, now this loop can be cancelled. `isActive` is a property that is available inside
367the code of coroutines via `CoroutineScope` object.
368
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300369### Closing resources with finally
370
371Cancellable suspending functions throw `CancellationException` on cancellation which can be handled in
372all the usual way. For example, the `try {...} finally {...}` and Kotlin `use` function execute their
373finalization actions normally when coroutine is cancelled:
374
375```kotlin
376fun main(args: Array<String>) = runBlocking<Unit> {
377 val job = launch(CommonPool) {
378 try {
379 repeat(1000) { i ->
380 println("I'm sleeping $i ...")
381 delay(500L)
382 }
383 } finally {
384 println("I'm running finally")
385 }
386 }
387 delay(1300L) // delay a bit
388 println("main: I'm tired of waiting!")
389 job.cancel() // cancels the job
390 delay(1300L) // delay a bit to ensure it was cancelled indeed
391 println("main: Now I can quit.")
392}
393```
394
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300395> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300396
397The example above produces the following output:
398
399```
400I'm sleeping 0 ...
401I'm sleeping 1 ...
402I'm sleeping 2 ...
403main: I'm tired of waiting!
404I'm running finally
405main: Now I can quit.
406```
407
408### Run non-cancellable block
409
410Any attempt to use a suspending function in the `finally` block of the previous example will cause
411`CancellationException`, because the coroutine running this code is cancelled. Usually, this is not a
412problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
413communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
414rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
415`run(NonCancellable) {...}` as the following example shows:
416
417```kotlin
418fun main(args: Array<String>) = runBlocking<Unit> {
419 val job = launch(CommonPool) {
420 try {
421 repeat(1000) { i ->
422 println("I'm sleeping $i ...")
423 delay(500L)
424 }
425 } finally {
426 run(NonCancellable) {
427 println("I'm running finally")
428 delay(1000L)
429 println("And I've just delayed for 1 sec because I'm non-cancellable")
430 }
431 }
432 }
433 delay(1300L) // delay a bit
434 println("main: I'm tired of waiting!")
435 job.cancel() // cancels the job
436 delay(1300L) // delay a bit to ensure it was cancelled indeed
437 println("main: Now I can quit.")
438}
439```
440
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300441> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300442
443### Timeout
444
445The most obvious reason to cancel coroutine execution in practice,
446is because its execution time has exceeded some timeout.
447While you can manually track the reference to the corresponding `job` and launch a separate coroutine to cancel
448the tracked one after delay, there is a ready to use `withTimeout(...) {...}` function that does it.
449Look at the following example:
450
451```kotlin
452fun main(args: Array<String>) = runBlocking<Unit> {
453 withTimeout(1300L) {
454 repeat(1000) { i ->
455 println("I'm sleeping $i ...")
456 delay(500L)
457 }
458 }
459}
460```
461
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300462> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-06.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300463
464It produces the following output:
465
466```
467I'm sleeping 0 ...
468I'm sleeping 1 ...
469I'm sleeping 2 ...
470Exception in thread "main" java.util.concurrent.CancellationException: Timed out waiting for 1300 MILLISECONDS
471```
472
473We have not seen the `CancellationException` stack trace printed on the console before. That is because
474inside a cancelled coroutine `CancellationException` is a considered a normal reason for coroutine completion.
475However, in this example we have used `withTimeout` right inside the `main` function.
476
477Because cancellation is just an exception, all the resources will be closed in a usual way.
478You can wrap the code with timeout in `try {...} catch (e: CancellationException) {...}` block if
479you need to do some additional action specifically on timeout.
480
481## Composing suspending functions
482
483This section covers various approaches to composition of suspending functions.
484
485### Sequential by default
486
487Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300488remote service call or computation. We just pretend they are useful, but actually each one just
489delays for a second for the purpose of this example:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300490
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300491<!--- INCLUDE .*/example-compose-([0-9]+).kt
492import kotlin.system.measureTimeMillis
493-->
494
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300495```kotlin
496suspend fun doSomethingUsefulOne(): Int {
497 delay(1000L) // pretend we are doing something useful here
498 return 13
499}
500
501suspend fun doSomethingUsefulTwo(): Int {
502 delay(1000L) // pretend we are doing something useful here, too
503 return 29
504}
505```
506
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300507<!--- INCLUDE .*/example-compose-([0-9]+).kt -->
508
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300509What do we do if need to invoke them _sequentially_ -- first `doSomethingUsefulOne` _and then_
510`doSomethingUsefulTwo` and compute the sum of their results?
511In practise we do this if we use the results of the first function to make a decision on whether we need
512to invoke the second one or to decide on how to invoke it.
513
514We just use a normal sequential invocation, because the code in the coroutine, just like in the regular
Roman Elizarov32d95322017-02-09 15:57:31 +0300515code, is _sequential_ by default. The following example demonstrates it by measuring the total
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300516time it takes to execute both suspending functions:
517
518```kotlin
519fun main(args: Array<String>) = runBlocking<Unit> {
520 val time = measureTimeMillis {
521 val one = doSomethingUsefulOne()
522 val two = doSomethingUsefulTwo()
523 println("The answer is ${one + two}")
524 }
525 println("Completed in $time ms")
526}
527```
528
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300529> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-01.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300530
531It produces something like this:
532
533```
534The answer is 42
535Completed in 2017 ms
536```
537
Roman Elizarov32d95322017-02-09 15:57:31 +0300538### Concurrent using async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300539
540What if there are no dependencies between invocation of `doSomethingUsefulOne` and `doSomethingUsefulTwo` and
Roman Elizarov32d95322017-02-09 15:57:31 +0300541we want to get the answer faster, by doing both _concurrently_? This is where `async` comes to help.
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300542
Roman Elizarov32d95322017-02-09 15:57:31 +0300543Conceptually, `async` is just like `launch`. It starts a separate coroutine which is a light-weight thread
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300544that works concurrently with all the other coroutines. The difference is that `launch` returns a `Job` and
Roman Elizarov32d95322017-02-09 15:57:31 +0300545does not carry any resulting value, while `async` returns a `Deferred` -- a light-weight non-blocking future
546that represents a promise to provide a result later. You can use `.await()` on a deferred value to get its eventual result,
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300547but `Deferred` is also a `Job`, so you can cancel it if needed.
548
549```kotlin
550fun main(args: Array<String>) = runBlocking<Unit> {
551 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300552 val one = async(CommonPool) { doSomethingUsefulOne() }
553 val two = async(CommonPool) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300554 println("The answer is ${one.await() + two.await()}")
555 }
556 println("Completed in $time ms")
557}
558```
559
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300560> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-02.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300561
562It produces something like this:
563
564```
565The answer is 42
566Completed in 1017 ms
567```
568
569This is twice as fast, because we have concurrent execution of two coroutines.
570Note, that concurrency with coroutines is always explicit.
571
Roman Elizarov32d95322017-02-09 15:57:31 +0300572### Lazily started async
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300573
Roman Elizarov32d95322017-02-09 15:57:31 +0300574There is a laziness option to `async` with `start = false` parameter.
575It starts coroutine only when its result is needed by some `await` or if a `start` function
576is invoked. Run the following example that differs from the previous one only by this option:
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300577
578```kotlin
579fun main(args: Array<String>) = runBlocking<Unit> {
580 val time = measureTimeMillis {
Roman Elizarov32d95322017-02-09 15:57:31 +0300581 val one = async(CommonPool, start = false) { doSomethingUsefulOne() }
582 val two = async(CommonPool, start = false) { doSomethingUsefulTwo() }
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300583 println("The answer is ${one.await() + two.await()}")
584 }
585 println("Completed in $time ms")
586}
587```
588
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300589> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-03.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300590
591It produces something like this:
592
593```
594The answer is 42
595Completed in 2017 ms
596```
597
Roman Elizarov32d95322017-02-09 15:57:31 +0300598So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
599for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
600the standard `lazy` function in cases when computation of the value involves suspending functions.
601
602### Async-style functions
603
604We can define async-style functions that invoke `doSomethingUsefulOne` and `doSomethingUsefulTwo`
605_asynchronously_ using `async` coroutine builder. It is a good style to name such functions with
606either "async" prefix of "Async" suffix to highlight the fact that they only start asynchronous
607computation and one needs to use the resulting deferred value to get the result.
608
609```kotlin
610// The result type of asyncSomethingUsefulOne is Deferred<Int>
611fun asyncSomethingUsefulOne() = async(CommonPool) {
612 doSomethingUsefulOne()
613}
614
615// The result type of asyncSomethingUsefulTwo is Deferred<Int>
616fun asyncSomethingUsefulTwo() = async(CommonPool) {
617 doSomethingUsefulTwo()
618}
619```
620
621Note, that these `asyncXXX` function are **not** _suspending_ functions. They can be used from anywhere.
622However, their use always implies asynchronous (here meaning _concurrent_) execution of their action
623with the invoking code.
624
625The following example shows their use outside of coroutine:
626
627```kotlin
628// note, that we don't have `runBlocking` to the right of `main` in this example
629fun main(args: Array<String>) {
630 val time = measureTimeMillis {
631 // we can initiate async actions outside of a coroutine
632 val one = asyncSomethingUsefulOne()
633 val two = asyncSomethingUsefulTwo()
634 // but waiting for a result must involve either suspending or blocking.
635 // here we use `runBlocking { ... }` to block the main thread while waiting for the result
636 runBlocking {
637 println("The answer is ${one.await() + two.await()}")
638 }
639 }
640 println("Completed in $time ms")
641}
642```
643
644> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300645
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300646## Coroutine context and dispatchers
647
Roman Elizarov32d95322017-02-09 15:57:31 +0300648We've already seen `launch(CommonPool) {...}`, `async(CommonPool) {...}`, `run(NonCancellable) {...}`, etc.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300649In these code snippets `CommonPool` and `NonCancellable` are _coroutine contexts_.
650This section covers other available choices.
651
652### Dispatchers and threads
653
654Coroutine context includes a _coroutine dispatcher_ which determines what thread or threads
655the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
656to a specific thread, dispatch it to a thread pool, or let it run unconfined. Try the following example:
657
658```kotlin
659fun main(args: Array<String>) = runBlocking<Unit> {
660 val jobs = arrayListOf<Job>()
661 jobs += launch(Unconfined) { // not confined -- will work with main thread
662 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
663 }
664 jobs += launch(context) { // context of the parent, runBlocking coroutine
665 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
666 }
667 jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
668 println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
669 }
670 jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
671 println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
672 }
673 jobs.forEach { it.join() }
674}
675```
676
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300677> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-01.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300678
679It produces the following output (maybe in different order):
680
681```
682 'Unconfined': I'm working in thread main
683 'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
684 'newSTC': I'm working in thread MyOwnThread
685 'context': I'm working in thread main
686```
687
688The difference between parent `context` and `Unconfied` context will be shown later.
689
690### Unconfined vs confined dispatcher
691
692The `Unconfined` coroutine dispatcher starts coroutine in the caller thread, but only until the
693first suspension point. After suspension it resumes in the thread that is fully determined by the
694suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not
695consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
696
697On the other side, `context` property that is available inside the block of any coroutine
698via `CoroutineScope` interface, is a reference to a context of this particular coroutine.
699This way, a parent context can be inherited. The default context of `runBlocking`, in particular,
700is confined to be invoker thread, so inheriting it has the effect of confining execution to
701this thread with a predictable FIFO scheduling.
702
703```kotlin
704fun main(args: Array<String>) = runBlocking<Unit> {
705 val jobs = arrayListOf<Job>()
706 jobs += launch(Unconfined) { // not confined -- will work with main thread
707 println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
708 delay(1000)
709 println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
710 }
711 jobs += launch(context) { // context of the parent, runBlocking coroutine
712 println(" 'context': I'm working in thread ${Thread.currentThread().name}")
713 delay(1000)
714 println(" 'context': After delay in thread ${Thread.currentThread().name}")
715 }
716 jobs.forEach { it.join() }
717}
718```
719
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300720> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-contest-02.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300721
722Produces the output:
723
724```
725 'Unconfined': I'm working in thread main
726 'context': I'm working in thread main
727 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
728 'context': After delay in thread main
729```
730
731So, the coroutine the had inherited `context` of `runBlocking {...}` continues to execute in the `main` thread,
732while the unconfined one had resumed in the scheduler thread that `delay` function is using.
733
734### Debugging coroutines and threads
735
736Coroutines can suspend on one thread and resume on another thread with `Unconfined` dispatcher or
737with a multi-threaded dispatcher like `CommonPool`. Even with a single-threaded dispatcher it might be hard to
738figure out what coroutine was doing what, where, and when. The common approach to debugging applications with
739threads is to print the thread name in the log file on each log statement. This feature is universally supported
740by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
741`kotlinx.coroutines` includes debugging facilities to make it easier.
742
743Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
744
745```kotlin
746fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
747
748fun main(args: Array<String>) = runBlocking<Unit> {
Roman Elizarov32d95322017-02-09 15:57:31 +0300749 val a = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300750 log("I'm computing a piece of the answer")
751 6
752 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300753 val b = async(context) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300754 log("I'm computing another piece of the answer")
755 7
756 }
757 log("The answer is ${a.await() * b.await()}")
758}
759```
760
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300761> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300762
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300763There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300764and two coroutines computing deferred values `a` (#2) and `b` (#3).
765They are all executing in the context of `runBlocking` and are confined to the main thread.
766The output of this code is:
767
768```
769[main @coroutine#2] I'm computing a piece of the answer
770[main @coroutine#3] I'm computing another piece of the answer
771[main @coroutine#1] The answer is 42
772```
773
774The `log` function prints the name of the thread in square brackets and you can see, that it is the `main`
775thread, but the identifier of the currently executing coroutine is appended to it. This identifier
776is consecutively assigned to all created coroutines when debugging mode is turned on.
777
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300778You can read more about debugging facilities in the documentation for `newCoroutineContext` function.
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300779
780### Jumping between threads
781
782Run the following code with `-Dkotlinx.coroutines.debug` JVM option:
783
784```kotlin
785fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
786
787fun main(args: Array<String>) {
788 val ctx1 = newSingleThreadContext("Ctx1")
789 val ctx2 = newSingleThreadContext("Ctx2")
790 runBlocking(ctx1) {
791 log("Started in ctx1")
792 run(ctx2) {
793 log("Working in ctx2")
794 }
795 log("Back to ctx1")
796 }
797}
798```
799
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300800> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300801
802It demonstrates two new techniques. One is using `runBlocking` with an explicitly specified context, and
803the second one is using `run(context) {...}` to change a context of a coroutine while still staying in the
804same coroutine as you can see in the output below:
805
806```
807[Ctx1 @coroutine#1] Started in ctx1
808[Ctx2 @coroutine#1] Working in ctx2
809[Ctx1 @coroutine#1] Back to ctx1
810```
811
812### Job in the context
813
814The coroutine `Job` is part of its context. The coroutine can retrieve it from its own context
815using `context[Job]` expression:
816
817```kotlin
818fun main(args: Array<String>) = runBlocking<Unit> {
819 println("My job is ${context[Job]}")
820}
821```
822
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300823> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-05.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300824
825It produces
826
827```
828My job is BlockingCoroutine{isActive=true}
829```
830
831So, `isActive` in `CoroutineScope` is just a convenient shortcut for `context[Job]!!.isActive`.
832
833### Children of a coroutine
834
835When `context` of a coroutine is used to launch another coroutine, the `Job` of the new coroutine becomes
836a _child_ of the parent coroutine's job. When the parent coroutine is cancelled, all its children
837are recursively cancelled, too.
838
839```kotlin
840fun main(args: Array<String>) = runBlocking<Unit> {
841 // start a coroutine to process some kind of incoming request
842 val request = launch(CommonPool) {
843 // it spawns two other jobs, one with its separate context
844 val job1 = launch(CommonPool) {
845 println("job1: I have my own context and execute independently!")
846 delay(1000)
847 println("job1: I am not affected by cancellation of the request")
848 }
849 // and the other inherits the parent context
850 val job2 = launch(context) {
851 println("job2: I am a child of the request coroutine")
852 delay(1000)
853 println("job2: I will not execute this line if my parent request is cancelled")
854 }
855 // request completes when both its sub-jobs complete:
856 job1.join()
857 job2.join()
858 }
859 delay(500)
860 request.cancel() // cancel processing of the request
861 delay(1000) // delay a second to see what happens
862 println("main: Who has survived request cancellation?")
863}
864```
865
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300866> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-06.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300867
868The output of this code is:
869
870```
871job1: I have my own context and execute independently!
872job2: I am a child of the request coroutine
873job1: I am not affected by cancellation of the request
874main: Who has survived request cancellation?
875```
876
877### Combining contexts
878
879Coroutine context can be combined using `+` operator. The context on the right-hand side replaces relevant entries
880of the context on the left-hand side. For example, a `Job` of the parent coroutine can be inherited, while
881its dispatcher replaced:
882
883```kotlin
884fun main(args: Array<String>) = runBlocking<Unit> {
885 // start a coroutine to process some kind of incoming request
886 val request = launch(context) { // use the context of `runBlocking`
887 // spawns CPU-intensive child job in CommonPool !!!
888 val job = launch(context + CommonPool) {
889 println("job: I am a child of the request coroutine, but with a different dispatcher")
890 delay(1000)
891 println("job: I will not execute this line if my parent request is cancelled")
892 }
893 job.join() // request completes when its sub-job completes
894 }
895 delay(500)
896 request.cancel() // cancel processing of the request
897 delay(1000) // delay a second to see what happens
898 println("main: Who has survived request cancellation?")
899}
900```
901
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300902> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-07.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300903
904The expected outcome of this code is:
905
906```
907job: I am a child of the request coroutine, but with a different dispatcher
908main: Who has survived request cancellation?
909```
910
911### Naming coroutines for debugging
912
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300913Automatically assigned ids are good when coroutines log often and you just need to correlate log records
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300914coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
915or doing some specific background task, it is better to name it explicitly for debugging purposes.
916Coroutine name serves the same function as a thread name. It'll get displayed in the thread name that
917is executing this coroutine when debugging more is turned on.
918
919The following example demonstrates this concept:
920
921```kotlin
922fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
923
924fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
925 log("Started main coroutine")
926 // run two background value computations
Roman Elizarov32d95322017-02-09 15:57:31 +0300927 val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300928 log("Computing v1")
929 delay(500)
930 252
931 }
Roman Elizarov32d95322017-02-09 15:57:31 +0300932 val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300933 log("Computing v2")
934 delay(1000)
935 6
936 }
937 log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
938}
939```
940
Roman Elizarovfa7723e2017-02-06 11:17:51 +0300941> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
Roman Elizarov2f6d7c92017-02-03 15:16:07 +0300942
943The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
944
945```
946[main @main#1] Started main coroutine
947[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
948[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
949[main @main#1] The answer for v1 / v2 = 42
950```
Roman Elizarov1293ccd2017-02-01 18:49:54 +0300951
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300952## Channels
Roman Elizarov7deefb82017-01-31 10:33:17 +0300953
Roman Elizarovb7721cf2017-02-03 19:23:08 +0300954Deferred values provide a convenient way to transfer a single value between coroutines.
955Channels provide a way to transfer a stream of values.
956
957<!--- INCLUDE .*/example-channel-([0-9]+).kt
958import kotlinx.coroutines.experimental.channels.*
959-->
960
961### Channel basics
962
963A `Channel` is conceptually very similar to `BlockingQueue`. One key difference is that
964instead of a blocking `put` operation it has a suspending `send`, and instead of
965a blocking `take` operation it has a suspending `receive`.
966
967```kotlin
968fun main(args: Array<String>) = runBlocking<Unit> {
969 val channel = Channel<Int>()
970 launch(CommonPool) {
971 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
972 for (x in 1..5) channel.send(x * x)
973 }
974 // here we print five received integers:
975 repeat(5) { println(channel.receive()) }
976 println("Done!")
977}
978```
979
980> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
981
982### Closing and iteration over channels
983
984Unlike a queue, a channel can be closed to indicate that no more elements are coming.
985On the receiver side it is convenient to use a regular `for` loop to receive elements
986from the channel.
987
988Conceptually, a `close` is like sending a special close token to the channel.
989The iteration stops as soon as this close token is received, so there is a guarantee
990that all previously sent elements before the close are received:
991
992```kotlin
993fun main(args: Array<String>) = runBlocking<Unit> {
994 val channel = Channel<Int>()
995 launch(CommonPool) {
996 for (x in 1..5) channel.send(x * x)
997 channel.close() // we're done sending
998 }
999 // here we print received values using `for` loop (until the channel is closed)
1000 for (y in channel) println(y)
1001 println("Done!")
1002}
1003```
1004
1005> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
1006
1007### Building channel producers
1008
1009The pattern where a coroutine is producing a sequence of elements into a channel is quite common.
1010You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
1011to common sense that results must be returned from functions. Here is a convenience
1012coroutine builder named `buildChannel` that makes it easy to do it right:
1013
1014```kotlin
1015fun produceSquares() = buildChannel<Int>(CommonPool) {
1016 for (x in 1..5) send(x * x)
1017}
1018
1019fun main(args: Array<String>) = runBlocking<Unit> {
1020 val squares = produceSquares()
1021 for (y in squares) println(y)
1022 println("Done!")
1023}
1024```
1025
1026> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
1027
1028### Pipelines
1029
1030Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
1031
1032```kotlin
1033fun produceNumbers() = buildChannel<Int>(CommonPool) {
1034 var x = 1
1035 while (true) send(x++) // infinite stream of integers starting from 1
1036}
1037```
1038
1039And another coroutine or coroutines are receiving that stream, doing some processing, and sending the result.
1040In the below example the numbers are just squared:
1041
1042```kotlin
1043fun square(numbers: ReceiveChannel<Int>) = buildChannel<Int>(CommonPool) {
1044 for (x in numbers) send(x * x)
1045}
1046```
1047
1048The main code starts and connects pipeline:
1049
1050```kotlin
1051fun main(args: Array<String>) = runBlocking<Unit> {
1052 val numbers = produceNumbers() // produces integers from 1 and on
1053 val squares = square(numbers) // squares integers
1054 for (i in 1..5) println(squares.receive()) // print first five
1055 println("Done!") // we are done
1056 squares.cancel() // need to cancel these coroutines in a larger app
1057 numbers.cancel()
1058}
1059```
1060
1061> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
1062
1063We don't have to cancel these coroutines in this example app, because
1064[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
1065but in a larger app we'll need to stop our pipeline if we don't need it anymore.
1066Alternatively, we could have run pipeline coroutines as
1067[children of a coroutine](#children-of-a-coroutine).
1068
1069### Prime numbers with pipeline
1070
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001071Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001072of coroutines. We start with an infinite sequence of numbers. This time we introduce an
1073explicit context parameter, so that caller can control where our coroutines run:
1074
1075<!--- INCLUDE kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
1076import kotlin.coroutines.experimental.CoroutineContext
1077-->
1078
1079```kotlin
1080fun numbersFrom(context: CoroutineContext, start: Int) = buildChannel<Int>(context) {
1081 var x = start
1082 while (true) send(x++) // infinite stream of integers from start
1083}
1084```
1085
1086The following pipeline stage filters an incoming stream of numbers, removing all the numbers
1087that are divisible by the given prime number:
1088
1089```kotlin
1090fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = buildChannel<Int>(context) {
1091 for (x in numbers) if (x % prime != 0) send(x)
1092}
1093```
1094
1095Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001096and launching new pipeline stage for each prime number found. The following example prints the first ten prime numbers,
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001097running the whole pipeline in the context of the main thread:
1098
1099```kotlin
1100fun main(args: Array<String>) = runBlocking<Unit> {
1101 var cur = numbersFrom(context, 2)
1102 for (i in 1..10) {
1103 val prime = cur.receive()
1104 println(prime)
1105 cur = filter(context, cur, prime)
1106 }
1107}
1108```
1109
1110> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
1111
1112The output of this code is:
1113
1114```
11152
11163
11175
11187
111911
112013
112117
112219
112323
112429
1125```
1126
1127### Fan-out
1128
1129Multiple coroutines may receive from the same channel, distributing work between themselves.
1130Let us start with a producer coroutine that is periodically producing integers
1131(ten numbers per second):
1132
1133```kotlin
1134fun produceNumbers() = buildChannel<Int>(CommonPool) {
1135 var x = 1 // start from 1
1136 while (true) {
1137 send(x++) // produce next
1138 delay(100) // wait 0.1s
1139 }
1140}
1141```
1142
1143Then we can have several processor coroutines. In this example, they just print their id and
1144received number:
1145
1146```kotlin
1147fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
1148 while (true) {
1149 val x = channel.receive()
1150 println("Processor #$id received $x")
1151 }
1152}
1153```
1154
1155Now let us launch five processors and let them work for a second. See what happens:
1156
1157```kotlin
1158fun main(args: Array<String>) = runBlocking<Unit> {
1159 val producer = produceNumbers()
1160 repeat(5) { launchProcessor(it, producer) }
1161 delay(1000)
1162 producer.cancel() // cancel producer coroutine and thus kill them all
1163}
1164```
1165
1166> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
1167
1168The output will be similar to the the following one, albeit the processor ids that receive
1169each specific integer may be different:
1170
1171```
1172Processor #2 received 1
1173Processor #4 received 2
1174Processor #0 received 3
1175Processor #1 received 4
1176Processor #3 received 5
1177Processor #2 received 6
1178Processor #4 received 7
1179Processor #0 received 8
1180Processor #1 received 9
1181Processor #3 received 10
1182```
1183
1184Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
1185over the channel that processor coroutines are doing.
1186
1187### Fan-in
1188
1189Multiple coroutines may send to the same channel.
1190For example, let us have a channel of strings, and a suspending function that
1191repeatedly sends a specified string to this channel with a specified delay:
1192
1193```kotlin
1194suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
1195 while (true) {
1196 delay(time)
1197 channel.send(s)
1198 }
1199}
1200```
1201
Cedric Beustfa0b28f2017-02-07 07:07:25 -08001202Now, let us see what happens if we launch a couple of coroutines sending strings
Roman Elizarovb7721cf2017-02-03 19:23:08 +03001203(in this example we launch them in the context of the main thread):
1204
1205```kotlin
1206fun main(args: Array<String>) = runBlocking<Unit> {
1207 val channel = Channel<String>()
1208 launch(context) { sendString(channel, "foo", 200L) }
1209 launch(context) { sendString(channel, "BAR!", 500L) }
1210 repeat(6) { // receive first six
1211 println(channel.receive())
1212 }
1213}
1214```
1215
1216> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
1217
1218The output is:
1219
1220```
1221foo
1222foo
1223BAR!
1224foo
1225foo
1226BAR!
1227```
1228
1229### Buffered channels
1230
1231The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
1232meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
1233if receive is invoked first, it is suspended until send is invoked.
1234
1235Both `Channel()` factory and `buildChanner{}` builder take an optional `capacity` parameter to
1236specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
1237similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
1238
1239Take a look at the behavior of the following code:
1240
1241```kotlin
1242fun main(args: Array<String>) = runBlocking<Unit> {
1243 val channel = Channel<Int>(4) // create buffered channel
1244 launch(context) { // launch sender coroutine
1245 repeat(10) {
1246 println("Sending $it") // print before sending each element
1247 channel.send(it) // will suspend when buffer is full
1248 }
1249 }
1250 // don't receive anything... just wait....
1251 delay(1000)
1252}
1253```
1254
1255> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
1256
1257It prints "sending" _five_ times using a buffered channel with capacity of _four_:
1258
1259```
1260Sending 0
1261Sending 1
1262Sending 2
1263Sending 3
1264Sending 4
1265```
1266
1267The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.