hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 1 | <!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt |
| 2 | /* |
| 3 | * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 4 | */ |
| 5 | |
| 6 | // This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit. |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 7 | package kotlinx.coroutines.guide.$$1$$2 |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 8 | --> |
| 9 | <!--- KNIT ../core/kotlinx-coroutines-core/test/guide/.*\.kt --> |
| 10 | <!--- TEST_OUT ../core/kotlinx-coroutines-core/test/guide/test/ChannelsGuideTest.kt |
| 11 | // This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit. |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 12 | package kotlinx.coroutines.guide.test |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 13 | |
| 14 | import org.junit.Test |
| 15 | |
| 16 | class ChannelsGuideTest { |
| 17 | --> |
Prendota | b8a559d | 2018-11-30 16:24:23 +0300 | [diff] [blame] | 18 | **Table of contents** |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 19 | |
| 20 | <!--- TOC --> |
| 21 | |
Vsevolod Tolstopyatov | b590aa3 | 2018-09-27 18:34:05 +0300 | [diff] [blame] | 22 | * [Channels (experimental)](#channels-experimental) |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 23 | * [Channel basics](#channel-basics) |
| 24 | * [Closing and iteration over channels](#closing-and-iteration-over-channels) |
| 25 | * [Building channel producers](#building-channel-producers) |
| 26 | * [Pipelines](#pipelines) |
| 27 | * [Prime numbers with pipeline](#prime-numbers-with-pipeline) |
| 28 | * [Fan-out](#fan-out) |
| 29 | * [Fan-in](#fan-in) |
| 30 | * [Buffered channels](#buffered-channels) |
| 31 | * [Channels are fair](#channels-are-fair) |
| 32 | * [Ticker channels](#ticker-channels) |
| 33 | |
| 34 | <!--- END_TOC --> |
| 35 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 36 | ## Channels (experimental) |
| 37 | |
| 38 | Deferred values provide a convenient way to transfer a single value between coroutines. |
| 39 | Channels provide a way to transfer a stream of values. |
| 40 | |
| 41 | > Channels are an experimental feature of `kotlinx.coroutines`. Their API is expected to |
| 42 | evolve in the upcoming updates of the `kotlinx.coroutines` library with potentially |
| 43 | breaking changes. |
| 44 | |
| 45 | ### Channel basics |
| 46 | |
| 47 | A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that |
| 48 | instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of |
| 49 | a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive]. |
| 50 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 51 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 52 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 53 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 54 | ```kotlin |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 55 | import kotlinx.coroutines.* |
| 56 | import kotlinx.coroutines.channels.* |
| 57 | |
| 58 | fun main() = runBlocking { |
| 59 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 60 | val channel = Channel<Int>() |
| 61 | launch { |
| 62 | // this might be heavy CPU-consuming computation or async logic, we'll just send five squares |
| 63 | for (x in 1..5) channel.send(x * x) |
| 64 | } |
| 65 | // here we print five received integers: |
| 66 | repeat(5) { println(channel.receive()) } |
| 67 | println("Done!") |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 68 | //sampleEnd |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 69 | } |
| 70 | ``` |
| 71 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 72 | </div> |
| 73 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 74 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-01.kt) |
| 75 | |
| 76 | The output of this code is: |
| 77 | |
| 78 | ```text |
| 79 | 1 |
| 80 | 4 |
| 81 | 9 |
| 82 | 16 |
| 83 | 25 |
| 84 | Done! |
| 85 | ``` |
| 86 | |
| 87 | <!--- TEST --> |
| 88 | |
| 89 | ### Closing and iteration over channels |
| 90 | |
| 91 | Unlike a queue, a channel can be closed to indicate that no more elements are coming. |
| 92 | On the receiver side it is convenient to use a regular `for` loop to receive elements |
| 93 | from the channel. |
| 94 | |
| 95 | Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel. |
| 96 | The iteration stops as soon as this close token is received, so there is a guarantee |
| 97 | that all previously sent elements before the close are received: |
| 98 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 99 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 100 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 101 | ```kotlin |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 102 | import kotlinx.coroutines.* |
| 103 | import kotlinx.coroutines.channels.* |
| 104 | |
| 105 | fun main() = runBlocking { |
| 106 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 107 | val channel = Channel<Int>() |
| 108 | launch { |
| 109 | for (x in 1..5) channel.send(x * x) |
| 110 | channel.close() // we're done sending |
| 111 | } |
| 112 | // here we print received values using `for` loop (until the channel is closed) |
| 113 | for (y in channel) println(y) |
| 114 | println("Done!") |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 115 | //sampleEnd |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 116 | } |
| 117 | ``` |
| 118 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 119 | </div> |
| 120 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 121 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-02.kt) |
| 122 | |
| 123 | <!--- TEST |
| 124 | 1 |
| 125 | 4 |
| 126 | 9 |
| 127 | 16 |
| 128 | 25 |
| 129 | Done! |
| 130 | --> |
| 131 | |
| 132 | ### Building channel producers |
| 133 | |
| 134 | The pattern where a coroutine is producing a sequence of elements is quite common. |
| 135 | This is a part of _producer-consumer_ pattern that is often found in concurrent code. |
| 136 | You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary |
| 137 | to common sense that results must be returned from functions. |
| 138 | |
arman simonyan | 02b3302 | 2018-10-10 22:06:04 +0400 | [diff] [blame] | 139 | There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side, |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 140 | and an extension function [consumeEach], that replaces a `for` loop on the consumer side: |
| 141 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 142 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 143 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 144 | ```kotlin |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 145 | import kotlinx.coroutines.* |
| 146 | import kotlinx.coroutines.channels.* |
| 147 | |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 148 | fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce { |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 149 | for (x in 1..5) send(x * x) |
| 150 | } |
| 151 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 152 | fun main() = runBlocking { |
| 153 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 154 | val squares = produceSquares() |
| 155 | squares.consumeEach { println(it) } |
| 156 | println("Done!") |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 157 | //sampleEnd |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 158 | } |
| 159 | ``` |
| 160 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 161 | </div> |
| 162 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 163 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-03.kt) |
| 164 | |
| 165 | <!--- TEST |
| 166 | 1 |
| 167 | 4 |
| 168 | 9 |
| 169 | 16 |
| 170 | 25 |
| 171 | Done! |
| 172 | --> |
| 173 | |
| 174 | ### Pipelines |
| 175 | |
| 176 | A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values: |
| 177 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 178 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 179 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 180 | ```kotlin |
| 181 | fun CoroutineScope.produceNumbers() = produce<Int> { |
| 182 | var x = 1 |
| 183 | while (true) send(x++) // infinite stream of integers starting from 1 |
| 184 | } |
| 185 | ``` |
| 186 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 187 | </div> |
| 188 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 189 | And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results. |
armansimonyan13 | 52f10c7 | 2018-10-11 02:52:58 +0400 | [diff] [blame] | 190 | In the example below, the numbers are just squared: |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 191 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 192 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 193 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 194 | ```kotlin |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 195 | fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 196 | for (x in numbers) send(x * x) |
| 197 | } |
| 198 | ``` |
| 199 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 200 | </div> |
| 201 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 202 | The main code starts and connects the whole pipeline: |
| 203 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 204 | <!--- CLEAR --> |
| 205 | |
| 206 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 207 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 208 | ```kotlin |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 209 | import kotlinx.coroutines.* |
| 210 | import kotlinx.coroutines.channels.* |
| 211 | |
| 212 | fun main() = runBlocking { |
| 213 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 214 | val numbers = produceNumbers() // produces integers from 1 and on |
| 215 | val squares = square(numbers) // squares integers |
| 216 | for (i in 1..5) println(squares.receive()) // print first five |
| 217 | println("Done!") // we are done |
| 218 | coroutineContext.cancelChildren() // cancel children coroutines |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 219 | //sampleEnd |
| 220 | } |
| 221 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 222 | fun CoroutineScope.produceNumbers() = produce<Int> { |
| 223 | var x = 1 |
| 224 | while (true) send(x++) // infinite stream of integers starting from 1 |
| 225 | } |
Joffrey Bion | 3feef05 | 2018-12-12 23:57:53 +0100 | [diff] [blame] | 226 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 227 | fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { |
| 228 | for (x in numbers) send(x * x) |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 229 | } |
| 230 | ``` |
| 231 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 232 | </div> |
| 233 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 234 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-04.kt) |
| 235 | |
| 236 | <!--- TEST |
| 237 | 1 |
| 238 | 4 |
| 239 | 9 |
| 240 | 16 |
| 241 | 25 |
| 242 | Done! |
| 243 | --> |
| 244 | |
| 245 | > All functions that create coroutines are defined as extensions on [CoroutineScope], |
armansimonyan13 | 52f10c7 | 2018-10-11 02:52:58 +0400 | [diff] [blame] | 246 | so that we can rely on [structured concurrency](https://kotlinlang.org/docs/reference/coroutines/composing-suspending-functions.html#structured-concurrency-with-async) to make |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 247 | sure that we don't have lingering global coroutines in our application. |
| 248 | |
| 249 | ### Prime numbers with pipeline |
| 250 | |
| 251 | Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline |
| 252 | of coroutines. We start with an infinite sequence of numbers. |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 253 | |
| 254 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 255 | |
| 256 | ```kotlin |
| 257 | fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { |
| 258 | var x = start |
| 259 | while (true) send(x++) // infinite stream of integers from start |
| 260 | } |
| 261 | ``` |
| 262 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 263 | </div> |
| 264 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 265 | The following pipeline stage filters an incoming stream of numbers, removing all the numbers |
| 266 | that are divisible by the given prime number: |
| 267 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 268 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 269 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 270 | ```kotlin |
| 271 | fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { |
| 272 | for (x in numbers) if (x % prime != 0) send(x) |
| 273 | } |
| 274 | ``` |
| 275 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 276 | </div> |
| 277 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 278 | Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel, |
| 279 | and launching new pipeline stage for each prime number found: |
| 280 | |
| 281 | ``` |
| 282 | numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... |
| 283 | ``` |
| 284 | |
| 285 | The following example prints the first ten prime numbers, |
| 286 | running the whole pipeline in the context of the main thread. Since all the coroutines are launched in |
| 287 | the scope of the main [runBlocking] coroutine |
| 288 | we don't have to keep an explicit list of all the coroutines we have started. |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 289 | We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren] |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 290 | extension function to cancel all the children coroutines after we have printed |
| 291 | the first ten prime numbers. |
| 292 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 293 | <!--- CLEAR --> |
| 294 | |
| 295 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 296 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 297 | ```kotlin |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 298 | import kotlinx.coroutines.* |
| 299 | import kotlinx.coroutines.channels.* |
| 300 | |
| 301 | fun main() = runBlocking { |
| 302 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 303 | var cur = numbersFrom(2) |
| 304 | for (i in 1..10) { |
| 305 | val prime = cur.receive() |
| 306 | println(prime) |
| 307 | cur = filter(cur, prime) |
| 308 | } |
| 309 | coroutineContext.cancelChildren() // cancel all children to let main finish |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 310 | //sampleEnd |
| 311 | } |
| 312 | |
| 313 | fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { |
| 314 | var x = start |
| 315 | while (true) send(x++) // infinite stream of integers from start |
| 316 | } |
| 317 | |
| 318 | fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { |
| 319 | for (x in numbers) if (x % prime != 0) send(x) |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 320 | } |
| 321 | ``` |
| 322 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 323 | </div> |
| 324 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 325 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-05.kt) |
| 326 | |
| 327 | The output of this code is: |
| 328 | |
| 329 | ```text |
| 330 | 2 |
| 331 | 3 |
| 332 | 5 |
| 333 | 7 |
| 334 | 11 |
| 335 | 13 |
| 336 | 17 |
| 337 | 19 |
| 338 | 23 |
| 339 | 29 |
| 340 | ``` |
| 341 | |
| 342 | <!--- TEST --> |
| 343 | |
| 344 | Note, that you can build the same pipeline using |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 345 | [`buildIterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/build-iterator.html) |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 346 | coroutine builder from the standard library. |
| 347 | Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`, |
| 348 | `ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either. |
| 349 | However, the benefit of a pipeline that uses channels as shown above is that it can actually use |
| 350 | multiple CPU cores if you run it in [Dispatchers.Default] context. |
| 351 | |
| 352 | Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some |
| 353 | other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be |
| 354 | built using `buildSequence`/`buildIterator`, because they do not allow arbitrary suspension, unlike |
| 355 | `produce`, which is fully asynchronous. |
| 356 | |
| 357 | ### Fan-out |
| 358 | |
| 359 | Multiple coroutines may receive from the same channel, distributing work between themselves. |
| 360 | Let us start with a producer coroutine that is periodically producing integers |
| 361 | (ten numbers per second): |
| 362 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 363 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 364 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 365 | ```kotlin |
| 366 | fun CoroutineScope.produceNumbers() = produce<Int> { |
| 367 | var x = 1 // start from 1 |
| 368 | while (true) { |
| 369 | send(x++) // produce next |
| 370 | delay(100) // wait 0.1s |
| 371 | } |
| 372 | } |
| 373 | ``` |
| 374 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 375 | </div> |
| 376 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 377 | Then we can have several processor coroutines. In this example, they just print their id and |
| 378 | received number: |
| 379 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 380 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 381 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 382 | ```kotlin |
| 383 | fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { |
| 384 | for (msg in channel) { |
| 385 | println("Processor #$id received $msg") |
| 386 | } |
| 387 | } |
| 388 | ``` |
| 389 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 390 | </div> |
| 391 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 392 | Now let us launch five processors and let them work for almost a second. See what happens: |
| 393 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 394 | <!--- CLEAR --> |
| 395 | |
| 396 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 397 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 398 | ```kotlin |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 399 | import kotlinx.coroutines.* |
| 400 | import kotlinx.coroutines.channels.* |
| 401 | |
| 402 | fun main() = runBlocking<Unit> { |
| 403 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 404 | val producer = produceNumbers() |
| 405 | repeat(5) { launchProcessor(it, producer) } |
| 406 | delay(950) |
| 407 | producer.cancel() // cancel producer coroutine and thus kill them all |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 408 | //sampleEnd |
| 409 | } |
| 410 | |
| 411 | fun CoroutineScope.produceNumbers() = produce<Int> { |
| 412 | var x = 1 // start from 1 |
| 413 | while (true) { |
| 414 | send(x++) // produce next |
| 415 | delay(100) // wait 0.1s |
| 416 | } |
| 417 | } |
| 418 | |
| 419 | fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { |
| 420 | for (msg in channel) { |
| 421 | println("Processor #$id received $msg") |
| 422 | } |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 423 | } |
| 424 | ``` |
| 425 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 426 | </div> |
| 427 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 428 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-06.kt) |
| 429 | |
| 430 | The output will be similar to the the following one, albeit the processor ids that receive |
| 431 | each specific integer may be different: |
| 432 | |
| 433 | ``` |
| 434 | Processor #2 received 1 |
| 435 | Processor #4 received 2 |
| 436 | Processor #0 received 3 |
| 437 | Processor #1 received 4 |
| 438 | Processor #3 received 5 |
| 439 | Processor #2 received 6 |
| 440 | Processor #4 received 7 |
| 441 | Processor #0 received 8 |
| 442 | Processor #1 received 9 |
| 443 | Processor #3 received 10 |
| 444 | ``` |
| 445 | |
| 446 | <!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } --> |
| 447 | |
| 448 | Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration |
| 449 | over the channel that processor coroutines are doing. |
| 450 | |
| 451 | Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code. |
| 452 | Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor |
| 453 | coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach` |
| 454 | always consumes (cancels) the underlying channel on its normal or abnormal completion. |
| 455 | |
| 456 | ### Fan-in |
| 457 | |
| 458 | Multiple coroutines may send to the same channel. |
| 459 | For example, let us have a channel of strings, and a suspending function that |
| 460 | repeatedly sends a specified string to this channel with a specified delay: |
| 461 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 462 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 463 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 464 | ```kotlin |
| 465 | suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { |
| 466 | while (true) { |
| 467 | delay(time) |
| 468 | channel.send(s) |
| 469 | } |
| 470 | } |
| 471 | ``` |
| 472 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 473 | </div> |
| 474 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 475 | Now, let us see what happens if we launch a couple of coroutines sending strings |
| 476 | (in this example we launch them in the context of the main thread as main coroutine's children): |
| 477 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 478 | <!--- CLEAR --> |
| 479 | |
| 480 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 481 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 482 | ```kotlin |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 483 | import kotlinx.coroutines.* |
| 484 | import kotlinx.coroutines.channels.* |
| 485 | |
| 486 | fun main() = runBlocking { |
| 487 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 488 | val channel = Channel<String>() |
| 489 | launch { sendString(channel, "foo", 200L) } |
| 490 | launch { sendString(channel, "BAR!", 500L) } |
| 491 | repeat(6) { // receive first six |
| 492 | println(channel.receive()) |
| 493 | } |
| 494 | coroutineContext.cancelChildren() // cancel all children to let main finish |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 495 | //sampleEnd |
| 496 | } |
| 497 | |
| 498 | suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { |
| 499 | while (true) { |
| 500 | delay(time) |
| 501 | channel.send(s) |
| 502 | } |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 503 | } |
| 504 | ``` |
| 505 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 506 | </div> |
| 507 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 508 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-07.kt) |
| 509 | |
| 510 | The output is: |
| 511 | |
| 512 | ```text |
| 513 | foo |
| 514 | foo |
| 515 | BAR! |
| 516 | foo |
| 517 | foo |
| 518 | BAR! |
| 519 | ``` |
| 520 | |
| 521 | <!--- TEST --> |
| 522 | |
| 523 | ### Buffered channels |
| 524 | |
| 525 | The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver |
| 526 | meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked, |
| 527 | if receive is invoked first, it is suspended until send is invoked. |
| 528 | |
| 529 | Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to |
| 530 | specify _buffer size_. Buffer allows senders to send multiple elements before suspending, |
| 531 | similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full. |
| 532 | |
| 533 | Take a look at the behavior of the following code: |
| 534 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 535 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 536 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 537 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 538 | ```kotlin |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 539 | import kotlinx.coroutines.* |
| 540 | import kotlinx.coroutines.channels.* |
| 541 | |
| 542 | fun main() = runBlocking<Unit> { |
| 543 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 544 | val channel = Channel<Int>(4) // create buffered channel |
| 545 | val sender = launch { // launch sender coroutine |
| 546 | repeat(10) { |
| 547 | println("Sending $it") // print before sending each element |
| 548 | channel.send(it) // will suspend when buffer is full |
| 549 | } |
| 550 | } |
| 551 | // don't receive anything... just wait.... |
| 552 | delay(1000) |
| 553 | sender.cancel() // cancel sender coroutine |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 554 | //sampleEnd |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 555 | } |
| 556 | ``` |
| 557 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 558 | </div> |
| 559 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 560 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-08.kt) |
| 561 | |
| 562 | It prints "sending" _five_ times using a buffered channel with capacity of _four_: |
| 563 | |
| 564 | ```text |
| 565 | Sending 0 |
| 566 | Sending 1 |
| 567 | Sending 2 |
| 568 | Sending 3 |
| 569 | Sending 4 |
| 570 | ``` |
| 571 | |
| 572 | <!--- TEST --> |
| 573 | |
| 574 | The first four elements are added to the buffer and the sender suspends when trying to send the fifth one. |
| 575 | |
| 576 | ### Channels are fair |
| 577 | |
| 578 | Send and receive operations to channels are _fair_ with respect to the order of their invocation from |
| 579 | multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive` |
| 580 | gets the element. In the following example two coroutines "ping" and "pong" are |
| 581 | receiving the "ball" object from the shared "table" channel. |
| 582 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 583 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 584 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 585 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 586 | ```kotlin |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 587 | import kotlinx.coroutines.* |
| 588 | import kotlinx.coroutines.channels.* |
| 589 | |
| 590 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 591 | data class Ball(var hits: Int) |
| 592 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 593 | fun main() = runBlocking { |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 594 | val table = Channel<Ball>() // a shared table |
| 595 | launch { player("ping", table) } |
| 596 | launch { player("pong", table) } |
| 597 | table.send(Ball(0)) // serve the ball |
| 598 | delay(1000) // delay 1 second |
| 599 | coroutineContext.cancelChildren() // game over, cancel them |
| 600 | } |
| 601 | |
| 602 | suspend fun player(name: String, table: Channel<Ball>) { |
| 603 | for (ball in table) { // receive the ball in a loop |
| 604 | ball.hits++ |
| 605 | println("$name $ball") |
| 606 | delay(300) // wait a bit |
| 607 | table.send(ball) // send the ball back |
| 608 | } |
| 609 | } |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 610 | //sampleEnd |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 611 | ``` |
| 612 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 613 | </div> |
| 614 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 615 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-09.kt) |
| 616 | |
| 617 | The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping" |
| 618 | coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets |
| 619 | received by the "pong" coroutine, because it was already waiting for it: |
| 620 | |
| 621 | ```text |
| 622 | ping Ball(hits=1) |
| 623 | pong Ball(hits=2) |
| 624 | ping Ball(hits=3) |
| 625 | pong Ball(hits=4) |
| 626 | ``` |
| 627 | |
| 628 | <!--- TEST --> |
| 629 | |
| 630 | Note, that sometimes channels may produce executions that look unfair due to the nature of the executor |
| 631 | that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details. |
| 632 | |
| 633 | ### Ticker channels |
| 634 | |
| 635 | Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel. |
| 636 | Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce] |
| 637 | pipelines and operators that do windowing and other time-dependent processing. |
| 638 | Ticker channel can be used in [select] to perform "on tick" action. |
| 639 | |
| 640 | To create such channel use a factory method [ticker]. |
| 641 | To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it. |
| 642 | |
| 643 | Now let's see how it works in practice: |
| 644 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 645 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 646 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 647 | ```kotlin |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 648 | import kotlinx.coroutines.* |
| 649 | import kotlinx.coroutines.channels.* |
| 650 | |
| 651 | fun main() = runBlocking<Unit> { |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 652 | val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 653 | var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } |
| 654 | println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet |
| 655 | |
| 656 | nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay |
| 657 | println("Next element is not ready in 50 ms: $nextElement") |
| 658 | |
| 659 | nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } |
| 660 | println("Next element is ready in 100 ms: $nextElement") |
| 661 | |
| 662 | // Emulate large consumption delays |
| 663 | println("Consumer pauses for 150ms") |
| 664 | delay(150) |
| 665 | // Next element is available immediately |
| 666 | nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } |
| 667 | println("Next element is available immediately after large consumer delay: $nextElement") |
| 668 | // Note that the pause between `receive` calls is taken into account and next element arrives faster |
| 669 | nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } |
| 670 | println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement") |
| 671 | |
| 672 | tickerChannel.cancel() // indicate that no more elements are needed |
| 673 | } |
| 674 | ``` |
| 675 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 676 | </div> |
| 677 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 678 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-10.kt) |
| 679 | |
| 680 | It prints following lines: |
| 681 | |
| 682 | ```text |
| 683 | Initial element is available immediately: kotlin.Unit |
| 684 | Next element is not ready in 50 ms: null |
| 685 | Next element is ready in 100 ms: kotlin.Unit |
| 686 | Consumer pauses for 150ms |
| 687 | Next element is available immediately after large consumer delay: kotlin.Unit |
| 688 | Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit |
| 689 | ``` |
| 690 | |
| 691 | <!--- TEST --> |
| 692 | |
| 693 | Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element |
| 694 | delay if a pause occurs, trying to maintain a fixed rate of produced elements. |
| 695 | |
| 696 | Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed |
| 697 | delay between elements. |
| 698 | |
| 699 | |
| 700 | <!--- MODULE kotlinx-coroutines-core --> |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 701 | <!--- INDEX kotlinx.coroutines --> |
| 702 | [CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html |
| 703 | [runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html |
Vsevolod Tolstopyatov | 706e393 | 2018-10-13 15:40:32 +0300 | [diff] [blame] | 704 | [kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/kotlin.coroutines.-coroutine-context/cancel-children.html |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 705 | [Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html |
| 706 | <!--- INDEX kotlinx.coroutines.channels --> |
| 707 | [Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html |
| 708 | [SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html |
| 709 | [ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html |
| 710 | [SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html |
| 711 | [produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html |
| 712 | [consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html |
| 713 | [Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html |
| 714 | [ticker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html |
| 715 | [ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html |
| 716 | [TickerMode.FIXED_DELAY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y.html |
| 717 | <!--- INDEX kotlinx.coroutines.selects --> |
| 718 | [select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 719 | <!--- END --> |