hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 1 | <!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt |
| 2 | /* |
| 3 | * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 4 | */ |
| 5 | |
| 6 | // This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit. |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame^] | 7 | package kotlinx.coroutines.guide.$$1$$2 |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 8 | |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame^] | 9 | import kotlinx.coroutines.* |
| 10 | import kotlinx.coroutines.channels.* |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 11 | --> |
| 12 | <!--- KNIT ../core/kotlinx-coroutines-core/test/guide/.*\.kt --> |
| 13 | <!--- TEST_OUT ../core/kotlinx-coroutines-core/test/guide/test/ChannelsGuideTest.kt |
| 14 | // This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit. |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame^] | 15 | package kotlinx.coroutines.guide.test |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 16 | |
| 17 | import org.junit.Test |
| 18 | |
| 19 | class ChannelsGuideTest { |
| 20 | --> |
| 21 | ## Table of contents |
| 22 | |
| 23 | <!--- TOC --> |
| 24 | |
Vsevolod Tolstopyatov | b590aa3 | 2018-09-27 18:34:05 +0300 | [diff] [blame] | 25 | * [Channels (experimental)](#channels-experimental) |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 26 | * [Channel basics](#channel-basics) |
| 27 | * [Closing and iteration over channels](#closing-and-iteration-over-channels) |
| 28 | * [Building channel producers](#building-channel-producers) |
| 29 | * [Pipelines](#pipelines) |
| 30 | * [Prime numbers with pipeline](#prime-numbers-with-pipeline) |
| 31 | * [Fan-out](#fan-out) |
| 32 | * [Fan-in](#fan-in) |
| 33 | * [Buffered channels](#buffered-channels) |
| 34 | * [Channels are fair](#channels-are-fair) |
| 35 | * [Ticker channels](#ticker-channels) |
| 36 | |
| 37 | <!--- END_TOC --> |
| 38 | |
| 39 | |
| 40 | |
| 41 | |
| 42 | |
| 43 | ## Channels (experimental) |
| 44 | |
| 45 | Deferred values provide a convenient way to transfer a single value between coroutines. |
| 46 | Channels provide a way to transfer a stream of values. |
| 47 | |
| 48 | > Channels are an experimental feature of `kotlinx.coroutines`. Their API is expected to |
| 49 | evolve in the upcoming updates of the `kotlinx.coroutines` library with potentially |
| 50 | breaking changes. |
| 51 | |
| 52 | ### Channel basics |
| 53 | |
| 54 | A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that |
| 55 | instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of |
| 56 | a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive]. |
| 57 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 58 | |
| 59 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 60 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 61 | ```kotlin |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 62 | fun main(args: Array<String>) = runBlocking { |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 63 | val channel = Channel<Int>() |
| 64 | launch { |
| 65 | // this might be heavy CPU-consuming computation or async logic, we'll just send five squares |
| 66 | for (x in 1..5) channel.send(x * x) |
| 67 | } |
| 68 | // here we print five received integers: |
| 69 | repeat(5) { println(channel.receive()) } |
| 70 | println("Done!") |
| 71 | } |
| 72 | ``` |
| 73 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 74 | </div> |
| 75 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 76 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-01.kt) |
| 77 | |
| 78 | The output of this code is: |
| 79 | |
| 80 | ```text |
| 81 | 1 |
| 82 | 4 |
| 83 | 9 |
| 84 | 16 |
| 85 | 25 |
| 86 | Done! |
| 87 | ``` |
| 88 | |
| 89 | <!--- TEST --> |
| 90 | |
| 91 | ### Closing and iteration over channels |
| 92 | |
| 93 | Unlike a queue, a channel can be closed to indicate that no more elements are coming. |
| 94 | On the receiver side it is convenient to use a regular `for` loop to receive elements |
| 95 | from the channel. |
| 96 | |
| 97 | Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel. |
| 98 | The iteration stops as soon as this close token is received, so there is a guarantee |
| 99 | that all previously sent elements before the close are received: |
| 100 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 101 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 102 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 103 | ```kotlin |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 104 | fun main(args: Array<String>) = runBlocking { |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 105 | val channel = Channel<Int>() |
| 106 | launch { |
| 107 | for (x in 1..5) channel.send(x * x) |
| 108 | channel.close() // we're done sending |
| 109 | } |
| 110 | // here we print received values using `for` loop (until the channel is closed) |
| 111 | for (y in channel) println(y) |
| 112 | println("Done!") |
| 113 | } |
| 114 | ``` |
| 115 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 116 | </div> |
| 117 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 118 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-02.kt) |
| 119 | |
| 120 | <!--- TEST |
| 121 | 1 |
| 122 | 4 |
| 123 | 9 |
| 124 | 16 |
| 125 | 25 |
| 126 | Done! |
| 127 | --> |
| 128 | |
| 129 | ### Building channel producers |
| 130 | |
| 131 | The pattern where a coroutine is producing a sequence of elements is quite common. |
| 132 | This is a part of _producer-consumer_ pattern that is often found in concurrent code. |
| 133 | You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary |
| 134 | to common sense that results must be returned from functions. |
| 135 | |
| 136 | There is a convenience coroutine builder named [produce] that makes it easy to do it right on producer side, |
| 137 | and an extension function [consumeEach], that replaces a `for` loop on the consumer side: |
| 138 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 139 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 140 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 141 | ```kotlin |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 142 | fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce { |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 143 | for (x in 1..5) send(x * x) |
| 144 | } |
| 145 | |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 146 | fun main(args: Array<String>) = runBlocking { |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 147 | val squares = produceSquares() |
| 148 | squares.consumeEach { println(it) } |
| 149 | println("Done!") |
| 150 | } |
| 151 | ``` |
| 152 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 153 | </div> |
| 154 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 155 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-03.kt) |
| 156 | |
| 157 | <!--- TEST |
| 158 | 1 |
| 159 | 4 |
| 160 | 9 |
| 161 | 16 |
| 162 | 25 |
| 163 | Done! |
| 164 | --> |
| 165 | |
| 166 | ### Pipelines |
| 167 | |
| 168 | A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values: |
| 169 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 170 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 171 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 172 | ```kotlin |
| 173 | fun CoroutineScope.produceNumbers() = produce<Int> { |
| 174 | var x = 1 |
| 175 | while (true) send(x++) // infinite stream of integers starting from 1 |
| 176 | } |
| 177 | ``` |
| 178 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 179 | </div> |
| 180 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 181 | And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results. |
| 182 | In the below example the numbers are just squared: |
| 183 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 184 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 185 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 186 | ```kotlin |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 187 | fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 188 | for (x in numbers) send(x * x) |
| 189 | } |
| 190 | ``` |
| 191 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 192 | </div> |
| 193 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 194 | The main code starts and connects the whole pipeline: |
| 195 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 196 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 197 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 198 | ```kotlin |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 199 | fun main(args: Array<String>) = runBlocking { |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 200 | val numbers = produceNumbers() // produces integers from 1 and on |
| 201 | val squares = square(numbers) // squares integers |
| 202 | for (i in 1..5) println(squares.receive()) // print first five |
| 203 | println("Done!") // we are done |
| 204 | coroutineContext.cancelChildren() // cancel children coroutines |
| 205 | } |
| 206 | ``` |
| 207 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 208 | </div> |
| 209 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 210 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-04.kt) |
| 211 | |
| 212 | <!--- TEST |
| 213 | 1 |
| 214 | 4 |
| 215 | 9 |
| 216 | 16 |
| 217 | 25 |
| 218 | Done! |
| 219 | --> |
| 220 | |
| 221 | > All functions that create coroutines are defined as extensions on [CoroutineScope], |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 222 | so that we can rely on [structured concurrency](https://github.com/Kotlin/kotlinx.coroutineskotlinx.coroutines/blob/master/docs/composing-suspending-functions.md#structured-concurrency-with-async) to make |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 223 | sure that we don't have lingering global coroutines in our application. |
| 224 | |
| 225 | ### Prime numbers with pipeline |
| 226 | |
| 227 | Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline |
| 228 | of coroutines. We start with an infinite sequence of numbers. |
| 229 | |
| 230 | <!--- INCLUDE |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame^] | 231 | import kotlin.coroutines.* |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 232 | --> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 233 | |
| 234 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 235 | |
| 236 | ```kotlin |
| 237 | fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { |
| 238 | var x = start |
| 239 | while (true) send(x++) // infinite stream of integers from start |
| 240 | } |
| 241 | ``` |
| 242 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 243 | </div> |
| 244 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 245 | The following pipeline stage filters an incoming stream of numbers, removing all the numbers |
| 246 | that are divisible by the given prime number: |
| 247 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 248 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 249 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 250 | ```kotlin |
| 251 | fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { |
| 252 | for (x in numbers) if (x % prime != 0) send(x) |
| 253 | } |
| 254 | ``` |
| 255 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 256 | </div> |
| 257 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 258 | Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel, |
| 259 | and launching new pipeline stage for each prime number found: |
| 260 | |
| 261 | ``` |
| 262 | numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... |
| 263 | ``` |
| 264 | |
| 265 | The following example prints the first ten prime numbers, |
| 266 | running the whole pipeline in the context of the main thread. Since all the coroutines are launched in |
| 267 | the scope of the main [runBlocking] coroutine |
| 268 | we don't have to keep an explicit list of all the coroutines we have started. |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame^] | 269 | We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren] |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 270 | extension function to cancel all the children coroutines after we have printed |
| 271 | the first ten prime numbers. |
| 272 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 273 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 274 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 275 | ```kotlin |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 276 | fun main(args: Array<String>) = runBlocking { |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 277 | var cur = numbersFrom(2) |
| 278 | for (i in 1..10) { |
| 279 | val prime = cur.receive() |
| 280 | println(prime) |
| 281 | cur = filter(cur, prime) |
| 282 | } |
| 283 | coroutineContext.cancelChildren() // cancel all children to let main finish |
| 284 | } |
| 285 | ``` |
| 286 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 287 | </div> |
| 288 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 289 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-05.kt) |
| 290 | |
| 291 | The output of this code is: |
| 292 | |
| 293 | ```text |
| 294 | 2 |
| 295 | 3 |
| 296 | 5 |
| 297 | 7 |
| 298 | 11 |
| 299 | 13 |
| 300 | 17 |
| 301 | 19 |
| 302 | 23 |
| 303 | 29 |
| 304 | ``` |
| 305 | |
| 306 | <!--- TEST --> |
| 307 | |
| 308 | Note, that you can build the same pipeline using |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame^] | 309 | [`buildIterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/build-iterator.html) |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 310 | coroutine builder from the standard library. |
| 311 | Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`, |
| 312 | `ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either. |
| 313 | However, the benefit of a pipeline that uses channels as shown above is that it can actually use |
| 314 | multiple CPU cores if you run it in [Dispatchers.Default] context. |
| 315 | |
| 316 | Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some |
| 317 | other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be |
| 318 | built using `buildSequence`/`buildIterator`, because they do not allow arbitrary suspension, unlike |
| 319 | `produce`, which is fully asynchronous. |
| 320 | |
| 321 | ### Fan-out |
| 322 | |
| 323 | Multiple coroutines may receive from the same channel, distributing work between themselves. |
| 324 | Let us start with a producer coroutine that is periodically producing integers |
| 325 | (ten numbers per second): |
| 326 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 327 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 328 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 329 | ```kotlin |
| 330 | fun CoroutineScope.produceNumbers() = produce<Int> { |
| 331 | var x = 1 // start from 1 |
| 332 | while (true) { |
| 333 | send(x++) // produce next |
| 334 | delay(100) // wait 0.1s |
| 335 | } |
| 336 | } |
| 337 | ``` |
| 338 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 339 | </div> |
| 340 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 341 | Then we can have several processor coroutines. In this example, they just print their id and |
| 342 | received number: |
| 343 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 344 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 345 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 346 | ```kotlin |
| 347 | fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { |
| 348 | for (msg in channel) { |
| 349 | println("Processor #$id received $msg") |
| 350 | } |
| 351 | } |
| 352 | ``` |
| 353 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 354 | </div> |
| 355 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 356 | Now let us launch five processors and let them work for almost a second. See what happens: |
| 357 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 358 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 359 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 360 | ```kotlin |
| 361 | fun main(args: Array<String>) = runBlocking<Unit> { |
| 362 | val producer = produceNumbers() |
| 363 | repeat(5) { launchProcessor(it, producer) } |
| 364 | delay(950) |
| 365 | producer.cancel() // cancel producer coroutine and thus kill them all |
| 366 | } |
| 367 | ``` |
| 368 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 369 | </div> |
| 370 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 371 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-06.kt) |
| 372 | |
| 373 | The output will be similar to the the following one, albeit the processor ids that receive |
| 374 | each specific integer may be different: |
| 375 | |
| 376 | ``` |
| 377 | Processor #2 received 1 |
| 378 | Processor #4 received 2 |
| 379 | Processor #0 received 3 |
| 380 | Processor #1 received 4 |
| 381 | Processor #3 received 5 |
| 382 | Processor #2 received 6 |
| 383 | Processor #4 received 7 |
| 384 | Processor #0 received 8 |
| 385 | Processor #1 received 9 |
| 386 | Processor #3 received 10 |
| 387 | ``` |
| 388 | |
| 389 | <!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } --> |
| 390 | |
| 391 | Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration |
| 392 | over the channel that processor coroutines are doing. |
| 393 | |
| 394 | Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code. |
| 395 | Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor |
| 396 | coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach` |
| 397 | always consumes (cancels) the underlying channel on its normal or abnormal completion. |
| 398 | |
| 399 | ### Fan-in |
| 400 | |
| 401 | Multiple coroutines may send to the same channel. |
| 402 | For example, let us have a channel of strings, and a suspending function that |
| 403 | repeatedly sends a specified string to this channel with a specified delay: |
| 404 | |
| 405 | <!--- INCLUDE |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame^] | 406 | import kotlin.coroutines.* |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 407 | --> |
| 408 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 409 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 410 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 411 | ```kotlin |
| 412 | suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { |
| 413 | while (true) { |
| 414 | delay(time) |
| 415 | channel.send(s) |
| 416 | } |
| 417 | } |
| 418 | ``` |
| 419 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 420 | </div> |
| 421 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 422 | Now, let us see what happens if we launch a couple of coroutines sending strings |
| 423 | (in this example we launch them in the context of the main thread as main coroutine's children): |
| 424 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 425 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 426 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 427 | ```kotlin |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 428 | fun main(args: Array<String>) = runBlocking { |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 429 | val channel = Channel<String>() |
| 430 | launch { sendString(channel, "foo", 200L) } |
| 431 | launch { sendString(channel, "BAR!", 500L) } |
| 432 | repeat(6) { // receive first six |
| 433 | println(channel.receive()) |
| 434 | } |
| 435 | coroutineContext.cancelChildren() // cancel all children to let main finish |
| 436 | } |
| 437 | ``` |
| 438 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 439 | </div> |
| 440 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 441 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-07.kt) |
| 442 | |
| 443 | The output is: |
| 444 | |
| 445 | ```text |
| 446 | foo |
| 447 | foo |
| 448 | BAR! |
| 449 | foo |
| 450 | foo |
| 451 | BAR! |
| 452 | ``` |
| 453 | |
| 454 | <!--- TEST --> |
| 455 | |
| 456 | ### Buffered channels |
| 457 | |
| 458 | The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver |
| 459 | meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked, |
| 460 | if receive is invoked first, it is suspended until send is invoked. |
| 461 | |
| 462 | Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to |
| 463 | specify _buffer size_. Buffer allows senders to send multiple elements before suspending, |
| 464 | similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full. |
| 465 | |
| 466 | Take a look at the behavior of the following code: |
| 467 | |
| 468 | <!--- INCLUDE |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame^] | 469 | import kotlin.coroutines.* |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 470 | --> |
| 471 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 472 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 473 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 474 | ```kotlin |
| 475 | fun main(args: Array<String>) = runBlocking<Unit> { |
| 476 | val channel = Channel<Int>(4) // create buffered channel |
| 477 | val sender = launch { // launch sender coroutine |
| 478 | repeat(10) { |
| 479 | println("Sending $it") // print before sending each element |
| 480 | channel.send(it) // will suspend when buffer is full |
| 481 | } |
| 482 | } |
| 483 | // don't receive anything... just wait.... |
| 484 | delay(1000) |
| 485 | sender.cancel() // cancel sender coroutine |
| 486 | } |
| 487 | ``` |
| 488 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 489 | </div> |
| 490 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 491 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-08.kt) |
| 492 | |
| 493 | It prints "sending" _five_ times using a buffered channel with capacity of _four_: |
| 494 | |
| 495 | ```text |
| 496 | Sending 0 |
| 497 | Sending 1 |
| 498 | Sending 2 |
| 499 | Sending 3 |
| 500 | Sending 4 |
| 501 | ``` |
| 502 | |
| 503 | <!--- TEST --> |
| 504 | |
| 505 | The first four elements are added to the buffer and the sender suspends when trying to send the fifth one. |
| 506 | |
| 507 | ### Channels are fair |
| 508 | |
| 509 | Send and receive operations to channels are _fair_ with respect to the order of their invocation from |
| 510 | multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive` |
| 511 | gets the element. In the following example two coroutines "ping" and "pong" are |
| 512 | receiving the "ball" object from the shared "table" channel. |
| 513 | |
| 514 | <!--- INCLUDE |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame^] | 515 | import kotlin.coroutines.* |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 516 | --> |
| 517 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 518 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 519 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 520 | ```kotlin |
| 521 | data class Ball(var hits: Int) |
| 522 | |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 523 | fun main(args: Array<String>) = runBlocking { |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 524 | val table = Channel<Ball>() // a shared table |
| 525 | launch { player("ping", table) } |
| 526 | launch { player("pong", table) } |
| 527 | table.send(Ball(0)) // serve the ball |
| 528 | delay(1000) // delay 1 second |
| 529 | coroutineContext.cancelChildren() // game over, cancel them |
| 530 | } |
| 531 | |
| 532 | suspend fun player(name: String, table: Channel<Ball>) { |
| 533 | for (ball in table) { // receive the ball in a loop |
| 534 | ball.hits++ |
| 535 | println("$name $ball") |
| 536 | delay(300) // wait a bit |
| 537 | table.send(ball) // send the ball back |
| 538 | } |
| 539 | } |
| 540 | ``` |
| 541 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 542 | </div> |
| 543 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 544 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-09.kt) |
| 545 | |
| 546 | The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping" |
| 547 | coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets |
| 548 | received by the "pong" coroutine, because it was already waiting for it: |
| 549 | |
| 550 | ```text |
| 551 | ping Ball(hits=1) |
| 552 | pong Ball(hits=2) |
| 553 | ping Ball(hits=3) |
| 554 | pong Ball(hits=4) |
| 555 | ``` |
| 556 | |
| 557 | <!--- TEST --> |
| 558 | |
| 559 | Note, that sometimes channels may produce executions that look unfair due to the nature of the executor |
| 560 | that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details. |
| 561 | |
| 562 | ### Ticker channels |
| 563 | |
| 564 | Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel. |
| 565 | Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce] |
| 566 | pipelines and operators that do windowing and other time-dependent processing. |
| 567 | Ticker channel can be used in [select] to perform "on tick" action. |
| 568 | |
| 569 | To create such channel use a factory method [ticker]. |
| 570 | To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it. |
| 571 | |
| 572 | Now let's see how it works in practice: |
| 573 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 574 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 575 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 576 | ```kotlin |
| 577 | fun main(args: Array<String>) = runBlocking<Unit> { |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 578 | val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 579 | var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } |
| 580 | println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet |
| 581 | |
| 582 | nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay |
| 583 | println("Next element is not ready in 50 ms: $nextElement") |
| 584 | |
| 585 | nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } |
| 586 | println("Next element is ready in 100 ms: $nextElement") |
| 587 | |
| 588 | // Emulate large consumption delays |
| 589 | println("Consumer pauses for 150ms") |
| 590 | delay(150) |
| 591 | // Next element is available immediately |
| 592 | nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } |
| 593 | println("Next element is available immediately after large consumer delay: $nextElement") |
| 594 | // Note that the pause between `receive` calls is taken into account and next element arrives faster |
| 595 | nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } |
| 596 | println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement") |
| 597 | |
| 598 | tickerChannel.cancel() // indicate that no more elements are needed |
| 599 | } |
| 600 | ``` |
| 601 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 602 | </div> |
| 603 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 604 | > You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-10.kt) |
| 605 | |
| 606 | It prints following lines: |
| 607 | |
| 608 | ```text |
| 609 | Initial element is available immediately: kotlin.Unit |
| 610 | Next element is not ready in 50 ms: null |
| 611 | Next element is ready in 100 ms: kotlin.Unit |
| 612 | Consumer pauses for 150ms |
| 613 | Next element is available immediately after large consumer delay: kotlin.Unit |
| 614 | Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit |
| 615 | ``` |
| 616 | |
| 617 | <!--- TEST --> |
| 618 | |
| 619 | Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element |
| 620 | delay if a pause occurs, trying to maintain a fixed rate of produced elements. |
| 621 | |
| 622 | Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed |
| 623 | delay between elements. |
| 624 | |
| 625 | |
| 626 | <!--- MODULE kotlinx-coroutines-core --> |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame^] | 627 | <!--- INDEX kotlinx.coroutines --> |
| 628 | [CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html |
| 629 | [runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html |
| 630 | [Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html |
| 631 | <!--- INDEX kotlinx.coroutines.channels --> |
| 632 | [Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html |
| 633 | [SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html |
| 634 | [ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html |
| 635 | [SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html |
| 636 | [produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html |
| 637 | [consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html |
| 638 | [Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html |
| 639 | [ticker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html |
| 640 | [ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html |
| 641 | [TickerMode.FIXED_DELAY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y.html |
| 642 | <!--- INDEX kotlinx.coroutines.selects --> |
| 643 | [select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 644 | <!--- END --> |