Roman Elizarov | 660c2d7 | 2020-02-14 13:18:37 +0300 | [diff] [blame] | 1 | <!--- TEST_NAME SelectGuideTest --> |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 2 | |
Prendota | b8a559d | 2018-11-30 16:24:23 +0300 | [diff] [blame] | 3 | **Table of contents** |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 4 | |
| 5 | <!--- TOC --> |
| 6 | |
Roman Elizarov | 3258e1f | 2019-08-22 20:08:48 +0300 | [diff] [blame] | 7 | * [Select Expression (experimental)](#select-expression-experimental) |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 8 | * [Selecting from channels](#selecting-from-channels) |
| 9 | * [Selecting on close](#selecting-on-close) |
| 10 | * [Selecting to send](#selecting-to-send) |
| 11 | * [Selecting deferred values](#selecting-deferred-values) |
| 12 | * [Switch over a channel of deferred values](#switch-over-a-channel-of-deferred-values) |
| 13 | |
Roman Elizarov | 660c2d7 | 2020-02-14 13:18:37 +0300 | [diff] [blame] | 14 | <!--- END --> |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 15 | |
Roman Elizarov | 3258e1f | 2019-08-22 20:08:48 +0300 | [diff] [blame] | 16 | ## Select Expression (experimental) |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 17 | |
| 18 | Select expression makes it possible to await multiple suspending functions simultaneously and _select_ |
| 19 | the first one that becomes available. |
| 20 | |
| 21 | > Select expressions are an experimental feature of `kotlinx.coroutines`. Their API is expected to |
| 22 | evolve in the upcoming updates of the `kotlinx.coroutines` library with potentially |
| 23 | breaking changes. |
| 24 | |
| 25 | ### Selecting from channels |
| 26 | |
| 27 | Let us have two producers of strings: `fizz` and `buzz`. The `fizz` produces "Fizz" string every 300 ms: |
| 28 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 29 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 30 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 31 | ```kotlin |
| 32 | fun CoroutineScope.fizz() = produce<String> { |
| 33 | while (true) { // sends "Fizz" every 300 ms |
| 34 | delay(300) |
| 35 | send("Fizz") |
| 36 | } |
| 37 | } |
| 38 | ``` |
| 39 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 40 | </div> |
| 41 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 42 | And the `buzz` produces "Buzz!" string every 500 ms: |
| 43 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 44 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 45 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 46 | ```kotlin |
| 47 | fun CoroutineScope.buzz() = produce<String> { |
| 48 | while (true) { // sends "Buzz!" every 500 ms |
| 49 | delay(500) |
| 50 | send("Buzz!") |
| 51 | } |
| 52 | } |
| 53 | ``` |
| 54 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 55 | </div> |
| 56 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 57 | Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the |
| 58 | other. But [select] expression allows us to receive from _both_ simultaneously using its |
| 59 | [onReceive][ReceiveChannel.onReceive] clauses: |
| 60 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 61 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 62 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 63 | ```kotlin |
| 64 | suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) { |
| 65 | select<Unit> { // <Unit> means that this select expression does not produce any result |
| 66 | fizz.onReceive { value -> // this is the first select clause |
| 67 | println("fizz -> '$value'") |
| 68 | } |
| 69 | buzz.onReceive { value -> // this is the second select clause |
| 70 | println("buzz -> '$value'") |
| 71 | } |
| 72 | } |
| 73 | } |
| 74 | ``` |
| 75 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 76 | </div> |
| 77 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 78 | Let us run it all seven times: |
| 79 | |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 80 | <!--- CLEAR --> |
| 81 | |
| 82 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 83 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 84 | ```kotlin |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 85 | import kotlinx.coroutines.* |
| 86 | import kotlinx.coroutines.channels.* |
| 87 | import kotlinx.coroutines.selects.* |
| 88 | |
| 89 | fun CoroutineScope.fizz() = produce<String> { |
| 90 | while (true) { // sends "Fizz" every 300 ms |
| 91 | delay(300) |
| 92 | send("Fizz") |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | fun CoroutineScope.buzz() = produce<String> { |
| 97 | while (true) { // sends "Buzz!" every 500 ms |
| 98 | delay(500) |
| 99 | send("Buzz!") |
| 100 | } |
| 101 | } |
| 102 | |
| 103 | suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) { |
| 104 | select<Unit> { // <Unit> means that this select expression does not produce any result |
| 105 | fizz.onReceive { value -> // this is the first select clause |
| 106 | println("fizz -> '$value'") |
| 107 | } |
| 108 | buzz.onReceive { value -> // this is the second select clause |
| 109 | println("buzz -> '$value'") |
| 110 | } |
| 111 | } |
| 112 | } |
| 113 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 114 | fun main() = runBlocking<Unit> { |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 115 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 116 | val fizz = fizz() |
| 117 | val buzz = buzz() |
| 118 | repeat(7) { |
| 119 | selectFizzBuzz(fizz, buzz) |
| 120 | } |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 121 | coroutineContext.cancelChildren() // cancel fizz & buzz coroutines |
| 122 | //sampleEnd |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 123 | } |
| 124 | ``` |
| 125 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 126 | </div> |
| 127 | |
Adam Howard | f13549a | 2020-06-02 11:17:46 +0100 | [diff] [blame^] | 128 | > You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-01.kt). |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 129 | |
| 130 | The result of this code is: |
| 131 | |
| 132 | ```text |
| 133 | fizz -> 'Fizz' |
| 134 | buzz -> 'Buzz!' |
| 135 | fizz -> 'Fizz' |
| 136 | fizz -> 'Fizz' |
| 137 | buzz -> 'Buzz!' |
| 138 | fizz -> 'Fizz' |
| 139 | buzz -> 'Buzz!' |
| 140 | ``` |
| 141 | |
| 142 | <!--- TEST --> |
| 143 | |
| 144 | ### Selecting on close |
| 145 | |
| 146 | The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed causing the corresponding |
Vsevolod Tolstopyatov | a8904e2 | 2019-07-17 17:22:05 -0700 | [diff] [blame] | 147 | `select` to throw an exception. We can use [onReceiveOrNull][onReceiveOrNull] clause to perform a |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 148 | specific action when the channel is closed. The following example also shows that `select` is an expression that returns |
| 149 | the result of its selected clause: |
| 150 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 151 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 152 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 153 | ```kotlin |
| 154 | suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = |
| 155 | select<String> { |
| 156 | a.onReceiveOrNull { value -> |
| 157 | if (value == null) |
| 158 | "Channel 'a' is closed" |
| 159 | else |
| 160 | "a -> '$value'" |
| 161 | } |
| 162 | b.onReceiveOrNull { value -> |
| 163 | if (value == null) |
| 164 | "Channel 'b' is closed" |
| 165 | else |
| 166 | "b -> '$value'" |
| 167 | } |
| 168 | } |
| 169 | ``` |
| 170 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 171 | </div> |
| 172 | |
Vsevolod Tolstopyatov | a8904e2 | 2019-07-17 17:22:05 -0700 | [diff] [blame] | 173 | Note that [onReceiveOrNull][onReceiveOrNull] is an extension function defined only |
| 174 | for channels with non-nullable elements so that there is no accidental confusion between a closed channel |
| 175 | and a null value. |
| 176 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 177 | Let's use it with channel `a` that produces "Hello" string four times and |
| 178 | channel `b` that produces "World" four times: |
| 179 | |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 180 | <!--- CLEAR --> |
| 181 | |
| 182 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 183 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 184 | ```kotlin |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 185 | import kotlinx.coroutines.* |
| 186 | import kotlinx.coroutines.channels.* |
| 187 | import kotlinx.coroutines.selects.* |
| 188 | |
| 189 | suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = |
| 190 | select<String> { |
| 191 | a.onReceiveOrNull { value -> |
| 192 | if (value == null) |
| 193 | "Channel 'a' is closed" |
| 194 | else |
| 195 | "a -> '$value'" |
| 196 | } |
| 197 | b.onReceiveOrNull { value -> |
| 198 | if (value == null) |
| 199 | "Channel 'b' is closed" |
| 200 | else |
| 201 | "b -> '$value'" |
| 202 | } |
| 203 | } |
| 204 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 205 | fun main() = runBlocking<Unit> { |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 206 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 207 | val a = produce<String> { |
| 208 | repeat(4) { send("Hello $it") } |
| 209 | } |
| 210 | val b = produce<String> { |
| 211 | repeat(4) { send("World $it") } |
| 212 | } |
| 213 | repeat(8) { // print first eight results |
| 214 | println(selectAorB(a, b)) |
| 215 | } |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 216 | coroutineContext.cancelChildren() |
| 217 | //sampleEnd |
| 218 | } |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 219 | ``` |
| 220 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 221 | </div> |
| 222 | |
Adam Howard | f13549a | 2020-06-02 11:17:46 +0100 | [diff] [blame^] | 223 | > You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt). |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 224 | |
| 225 | The result of this code is quite interesting, so we'll analyze it in mode detail: |
| 226 | |
| 227 | ```text |
| 228 | a -> 'Hello 0' |
| 229 | a -> 'Hello 1' |
| 230 | b -> 'World 0' |
| 231 | a -> 'Hello 2' |
| 232 | a -> 'Hello 3' |
| 233 | b -> 'World 1' |
| 234 | Channel 'a' is closed |
| 235 | Channel 'a' is closed |
| 236 | ``` |
| 237 | |
| 238 | <!--- TEST --> |
| 239 | |
| 240 | There are couple of observations to make out of it. |
| 241 | |
| 242 | First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time, |
| 243 | the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel, |
| 244 | being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from |
| 245 | time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too. |
| 246 | |
Vsevolod Tolstopyatov | a8904e2 | 2019-07-17 17:22:05 -0700 | [diff] [blame] | 247 | The second observation, is that [onReceiveOrNull][onReceiveOrNull] gets immediately selected when the |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 248 | channel is already closed. |
| 249 | |
| 250 | ### Selecting to send |
| 251 | |
| 252 | Select expression has [onSend][SendChannel.onSend] clause that can be used for a great good in combination |
| 253 | with a biased nature of selection. |
| 254 | |
| 255 | Let us write an example of producer of integers that sends its values to a `side` channel when |
| 256 | the consumers on its primary channel cannot keep up with it: |
| 257 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 258 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 259 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 260 | ```kotlin |
| 261 | fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> { |
| 262 | for (num in 1..10) { // produce 10 numbers from 1 to 10 |
| 263 | delay(100) // every 100 ms |
| 264 | select<Unit> { |
| 265 | onSend(num) {} // Send to the primary channel |
| 266 | side.onSend(num) {} // or to the side channel |
| 267 | } |
| 268 | } |
| 269 | } |
| 270 | ``` |
| 271 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 272 | </div> |
| 273 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 274 | Consumer is going to be quite slow, taking 250 ms to process each number: |
| 275 | |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 276 | <!--- CLEAR --> |
| 277 | |
| 278 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 279 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 280 | ```kotlin |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 281 | import kotlinx.coroutines.* |
| 282 | import kotlinx.coroutines.channels.* |
| 283 | import kotlinx.coroutines.selects.* |
| 284 | |
| 285 | fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> { |
| 286 | for (num in 1..10) { // produce 10 numbers from 1 to 10 |
| 287 | delay(100) // every 100 ms |
| 288 | select<Unit> { |
| 289 | onSend(num) {} // Send to the primary channel |
| 290 | side.onSend(num) {} // or to the side channel |
| 291 | } |
| 292 | } |
| 293 | } |
| 294 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 295 | fun main() = runBlocking<Unit> { |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 296 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 297 | val side = Channel<Int>() // allocate side channel |
| 298 | launch { // this is a very fast consumer for the side channel |
| 299 | side.consumeEach { println("Side channel has $it") } |
| 300 | } |
| 301 | produceNumbers(side).consumeEach { |
| 302 | println("Consuming $it") |
| 303 | delay(250) // let us digest the consumed number properly, do not hurry |
| 304 | } |
| 305 | println("Done consuming") |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 306 | coroutineContext.cancelChildren() |
| 307 | //sampleEnd |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 308 | } |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 309 | ``` |
| 310 | |
| 311 | </div> |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 312 | |
Adam Howard | f13549a | 2020-06-02 11:17:46 +0100 | [diff] [blame^] | 313 | > You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-03.kt). |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 314 | |
| 315 | So let us see what happens: |
| 316 | |
| 317 | ```text |
| 318 | Consuming 1 |
| 319 | Side channel has 2 |
| 320 | Side channel has 3 |
| 321 | Consuming 4 |
| 322 | Side channel has 5 |
| 323 | Side channel has 6 |
| 324 | Consuming 7 |
| 325 | Side channel has 8 |
| 326 | Side channel has 9 |
| 327 | Consuming 10 |
| 328 | Done consuming |
| 329 | ``` |
| 330 | |
| 331 | <!--- TEST --> |
| 332 | |
| 333 | ### Selecting deferred values |
| 334 | |
| 335 | Deferred values can be selected using [onAwait][Deferred.onAwait] clause. |
| 336 | Let us start with an async function that returns a deferred string value after |
| 337 | a random delay: |
| 338 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 339 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 340 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 341 | ```kotlin |
| 342 | fun CoroutineScope.asyncString(time: Int) = async { |
| 343 | delay(time.toLong()) |
| 344 | "Waited for $time ms" |
| 345 | } |
| 346 | ``` |
| 347 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 348 | </div> |
| 349 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 350 | Let us start a dozen of them with a random delay. |
| 351 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 352 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 353 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 354 | ```kotlin |
| 355 | fun CoroutineScope.asyncStringsList(): List<Deferred<String>> { |
| 356 | val random = Random(3) |
| 357 | return List(12) { asyncString(random.nextInt(1000)) } |
| 358 | } |
| 359 | ``` |
| 360 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 361 | </div> |
| 362 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 363 | Now the main function awaits for the first of them to complete and counts the number of deferred values |
Inego | ebe519a | 2019-04-21 13:22:27 +0700 | [diff] [blame] | 364 | that are still active. Note that we've used here the fact that `select` expression is a Kotlin DSL, |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 365 | so we can provide clauses for it using an arbitrary code. In this case we iterate over a list |
| 366 | of deferred values to provide `onAwait` clause for each deferred value. |
| 367 | |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 368 | <!--- CLEAR --> |
| 369 | |
| 370 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 371 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 372 | ```kotlin |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 373 | import kotlinx.coroutines.* |
| 374 | import kotlinx.coroutines.selects.* |
| 375 | import java.util.* |
| 376 | |
| 377 | fun CoroutineScope.asyncString(time: Int) = async { |
| 378 | delay(time.toLong()) |
| 379 | "Waited for $time ms" |
| 380 | } |
| 381 | |
| 382 | fun CoroutineScope.asyncStringsList(): List<Deferred<String>> { |
| 383 | val random = Random(3) |
| 384 | return List(12) { asyncString(random.nextInt(1000)) } |
| 385 | } |
| 386 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 387 | fun main() = runBlocking<Unit> { |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 388 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 389 | val list = asyncStringsList() |
| 390 | val result = select<String> { |
| 391 | list.withIndex().forEach { (index, deferred) -> |
| 392 | deferred.onAwait { answer -> |
| 393 | "Deferred $index produced answer '$answer'" |
| 394 | } |
| 395 | } |
| 396 | } |
| 397 | println(result) |
| 398 | val countActive = list.count { it.isActive } |
| 399 | println("$countActive coroutines are still active") |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 400 | //sampleEnd |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 401 | } |
| 402 | ``` |
| 403 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 404 | </div> |
| 405 | |
Adam Howard | f13549a | 2020-06-02 11:17:46 +0100 | [diff] [blame^] | 406 | > You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-04.kt). |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 407 | |
| 408 | The output is: |
| 409 | |
| 410 | ```text |
| 411 | Deferred 4 produced answer 'Waited for 128 ms' |
| 412 | 11 coroutines are still active |
| 413 | ``` |
| 414 | |
| 415 | <!--- TEST --> |
| 416 | |
| 417 | ### Switch over a channel of deferred values |
| 418 | |
| 419 | Let us write a channel producer function that consumes a channel of deferred string values, waits for each received |
| 420 | deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together |
Vsevolod Tolstopyatov | a8904e2 | 2019-07-17 17:22:05 -0700 | [diff] [blame] | 421 | [onReceiveOrNull][onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`: |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 422 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 423 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 424 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 425 | ```kotlin |
| 426 | fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> { |
| 427 | var current = input.receive() // start with first received deferred value |
| 428 | while (isActive) { // loop while not cancelled/closed |
| 429 | val next = select<Deferred<String>?> { // return next deferred value from this select or null |
| 430 | input.onReceiveOrNull { update -> |
| 431 | update // replaces next value to wait |
| 432 | } |
| 433 | current.onAwait { value -> |
| 434 | send(value) // send value that current deferred has produced |
| 435 | input.receiveOrNull() // and use the next deferred from the input channel |
| 436 | } |
| 437 | } |
| 438 | if (next == null) { |
| 439 | println("Channel was closed") |
| 440 | break // out of loop |
| 441 | } else { |
| 442 | current = next |
| 443 | } |
| 444 | } |
| 445 | } |
| 446 | ``` |
| 447 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 448 | </div> |
| 449 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 450 | To test it, we'll use a simple async function that resolves to a specified string after a specified time: |
| 451 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 452 | |
| 453 | <div class="sample" markdown="1" theme="idea" data-highlight-only> |
| 454 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 455 | ```kotlin |
| 456 | fun CoroutineScope.asyncString(str: String, time: Long) = async { |
| 457 | delay(time) |
| 458 | str |
| 459 | } |
| 460 | ``` |
| 461 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 462 | </div> |
| 463 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 464 | The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test |
| 465 | data to it: |
| 466 | |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 467 | <!--- CLEAR --> |
| 468 | |
| 469 | <div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 470 | |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 471 | ```kotlin |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 472 | import kotlinx.coroutines.* |
| 473 | import kotlinx.coroutines.channels.* |
| 474 | import kotlinx.coroutines.selects.* |
| 475 | |
| 476 | fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> { |
| 477 | var current = input.receive() // start with first received deferred value |
| 478 | while (isActive) { // loop while not cancelled/closed |
| 479 | val next = select<Deferred<String>?> { // return next deferred value from this select or null |
| 480 | input.onReceiveOrNull { update -> |
| 481 | update // replaces next value to wait |
| 482 | } |
| 483 | current.onAwait { value -> |
| 484 | send(value) // send value that current deferred has produced |
| 485 | input.receiveOrNull() // and use the next deferred from the input channel |
| 486 | } |
| 487 | } |
| 488 | if (next == null) { |
| 489 | println("Channel was closed") |
| 490 | break // out of loop |
| 491 | } else { |
| 492 | current = next |
| 493 | } |
| 494 | } |
| 495 | } |
| 496 | |
| 497 | fun CoroutineScope.asyncString(str: String, time: Long) = async { |
| 498 | delay(time) |
| 499 | str |
| 500 | } |
| 501 | |
Prendota | 65e6c8c | 2018-10-17 11:51:08 +0300 | [diff] [blame] | 502 | fun main() = runBlocking<Unit> { |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 503 | //sampleStart |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 504 | val chan = Channel<Deferred<String>>() // the channel for test |
| 505 | launch { // launch printing coroutine |
| 506 | for (s in switchMapDeferreds(chan)) |
| 507 | println(s) // print each received string |
| 508 | } |
| 509 | chan.send(asyncString("BEGIN", 100)) |
| 510 | delay(200) // enough time for "BEGIN" to be produced |
| 511 | chan.send(asyncString("Slow", 500)) |
| 512 | delay(100) // not enough time to produce slow |
| 513 | chan.send(asyncString("Replace", 100)) |
| 514 | delay(500) // give it time before the last one |
| 515 | chan.send(asyncString("END", 500)) |
| 516 | delay(1000) // give it time to process |
| 517 | chan.close() // close the channel ... |
| 518 | delay(500) // and wait some time to let it finish |
Prendota | 0eee3c3 | 2018-10-22 12:52:56 +0300 | [diff] [blame] | 519 | //sampleEnd |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 520 | } |
| 521 | ``` |
| 522 | |
Alexander Prendota | cbeef10 | 2018-09-27 18:42:04 +0300 | [diff] [blame] | 523 | </div> |
| 524 | |
Adam Howard | f13549a | 2020-06-02 11:17:46 +0100 | [diff] [blame^] | 525 | > You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt). |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 526 | |
| 527 | The result of this code: |
| 528 | |
| 529 | ```text |
| 530 | BEGIN |
| 531 | Replace |
| 532 | END |
| 533 | Channel was closed |
| 534 | ``` |
| 535 | |
| 536 | <!--- TEST --> |
| 537 | |
| 538 | <!--- MODULE kotlinx-coroutines-core --> |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 539 | <!--- INDEX kotlinx.coroutines --> |
| 540 | [Deferred.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-deferred/on-await.html |
| 541 | <!--- INDEX kotlinx.coroutines.channels --> |
| 542 | [ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html |
| 543 | [ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html |
Vsevolod Tolstopyatov | a8904e2 | 2019-07-17 17:22:05 -0700 | [diff] [blame] | 544 | [onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 545 | [SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html |
| 546 | [SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html |
| 547 | <!--- INDEX kotlinx.coroutines.selects --> |
| 548 | [select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html |
hadihariri | 7db5553 | 2018-09-15 10:35:08 +0200 | [diff] [blame] | 549 | <!--- END --> |