blob: 70a4614ab4512cfcc30c178f24a117bef06b812b [file] [log] [blame] [view]
hadihariri7db55532018-09-15 10:35:08 +02001<!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt
2/*
3 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
4 */
5
6// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
7package kotlinx.coroutines.experimental.guide.$$1$$2
8
9import kotlinx.coroutines.experimental.*
10import kotlinx.coroutines.experimental.channels.*
11-->
12<!--- KNIT ../core/kotlinx-coroutines-core/test/guide/.*\.kt -->
13<!--- TEST_OUT ../core/kotlinx-coroutines-core/test/guide/test/ChannelsGuideTest.kt
14// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
15package kotlinx.coroutines.experimental.guide.test
16
17import org.junit.Test
18
19class ChannelsGuideTest {
20-->
21## Table of contents
22
23<!--- TOC -->
24
25* [Channels (experimental)](#channels-(experimental))
26 * [Channel basics](#channel-basics)
27 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
28 * [Building channel producers](#building-channel-producers)
29 * [Pipelines](#pipelines)
30 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
31 * [Fan-out](#fan-out)
32 * [Fan-in](#fan-in)
33 * [Buffered channels](#buffered-channels)
34 * [Channels are fair](#channels-are-fair)
35 * [Ticker channels](#ticker-channels)
36
37<!--- END_TOC -->
38
39
40
41
42
43## Channels (experimental)
44
45Deferred values provide a convenient way to transfer a single value between coroutines.
46Channels provide a way to transfer a stream of values.
47
48> Channels are an experimental feature of `kotlinx.coroutines`. Their API is expected to
49evolve in the upcoming updates of the `kotlinx.coroutines` library with potentially
50breaking changes.
51
52### Channel basics
53
54A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
55instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
56a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
57
58```kotlin
59fun main(args: Array<String>) = runBlocking<Unit> {
60 val channel = Channel<Int>()
61 launch {
62 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
63 for (x in 1..5) channel.send(x * x)
64 }
65 // here we print five received integers:
66 repeat(5) { println(channel.receive()) }
67 println("Done!")
68}
69```
70
71> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-01.kt)
72
73The output of this code is:
74
75```text
761
774
789
7916
8025
81Done!
82```
83
84<!--- TEST -->
85
86### Closing and iteration over channels
87
88Unlike a queue, a channel can be closed to indicate that no more elements are coming.
89On the receiver side it is convenient to use a regular `for` loop to receive elements
90from the channel.
91
92Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
93The iteration stops as soon as this close token is received, so there is a guarantee
94that all previously sent elements before the close are received:
95
96```kotlin
97fun main(args: Array<String>) = runBlocking<Unit> {
98 val channel = Channel<Int>()
99 launch {
100 for (x in 1..5) channel.send(x * x)
101 channel.close() // we're done sending
102 }
103 // here we print received values using `for` loop (until the channel is closed)
104 for (y in channel) println(y)
105 println("Done!")
106}
107```
108
109> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-02.kt)
110
111<!--- TEST
1121
1134
1149
11516
11625
117Done!
118-->
119
120### Building channel producers
121
122The pattern where a coroutine is producing a sequence of elements is quite common.
123This is a part of _producer-consumer_ pattern that is often found in concurrent code.
124You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
125to common sense that results must be returned from functions.
126
127There is a convenience coroutine builder named [produce] that makes it easy to do it right on producer side,
128and an extension function [consumeEach], that replaces a `for` loop on the consumer side:
129
130```kotlin
131fun CoroutineScope.produceSquares() = produce<Int> {
132 for (x in 1..5) send(x * x)
133}
134
135fun main(args: Array<String>) = runBlocking<Unit> {
136 val squares = produceSquares()
137 squares.consumeEach { println(it) }
138 println("Done!")
139}
140```
141
142> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-03.kt)
143
144<!--- TEST
1451
1464
1479
14816
14925
150Done!
151-->
152
153### Pipelines
154
155A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
156
157```kotlin
158fun CoroutineScope.produceNumbers() = produce<Int> {
159 var x = 1
160 while (true) send(x++) // infinite stream of integers starting from 1
161}
162```
163
164And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
165In the below example the numbers are just squared:
166
167```kotlin
168fun CoroutineScope.square(numbers: ReceiveChannel<Int>) = produce<Int> {
169 for (x in numbers) send(x * x)
170}
171```
172
173The main code starts and connects the whole pipeline:
174
175```kotlin
176fun main(args: Array<String>) = runBlocking<Unit> {
177 val numbers = produceNumbers() // produces integers from 1 and on
178 val squares = square(numbers) // squares integers
179 for (i in 1..5) println(squares.receive()) // print first five
180 println("Done!") // we are done
181 coroutineContext.cancelChildren() // cancel children coroutines
182}
183```
184
185> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-04.kt)
186
187<!--- TEST
1881
1894
1909
19116
19225
193Done!
194-->
195
196> All functions that create coroutines are defined as extensions on [CoroutineScope],
197so that we can rely on [structured concurrency](#structured-concurrency) to make
198sure that we don't have lingering global coroutines in our application.
199
200### Prime numbers with pipeline
201
202Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
203of coroutines. We start with an infinite sequence of numbers.
204
205<!--- INCLUDE
206import kotlin.coroutines.experimental.*
207-->
208
209```kotlin
210fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
211 var x = start
212 while (true) send(x++) // infinite stream of integers from start
213}
214```
215
216The following pipeline stage filters an incoming stream of numbers, removing all the numbers
217that are divisible by the given prime number:
218
219```kotlin
220fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
221 for (x in numbers) if (x % prime != 0) send(x)
222}
223```
224
225Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
226and launching new pipeline stage for each prime number found:
227
228```
229numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
230```
231
232The following example prints the first ten prime numbers,
233running the whole pipeline in the context of the main thread. Since all the coroutines are launched in
234the scope of the main [runBlocking] coroutine
235we don't have to keep an explicit list of all the coroutines we have started.
236We use [cancelChildren][kotlin.coroutines.experimental.CoroutineContext.cancelChildren]
237extension function to cancel all the children coroutines after we have printed
238the first ten prime numbers.
239
240```kotlin
241fun main(args: Array<String>) = runBlocking<Unit> {
242 var cur = numbersFrom(2)
243 for (i in 1..10) {
244 val prime = cur.receive()
245 println(prime)
246 cur = filter(cur, prime)
247 }
248 coroutineContext.cancelChildren() // cancel all children to let main finish
249}
250```
251
252> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-05.kt)
253
254The output of this code is:
255
256```text
2572
2583
2595
2607
26111
26213
26317
26419
26523
26629
267```
268
269<!--- TEST -->
270
271Note, that you can build the same pipeline using
272[`buildIterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/build-iterator.html)
273coroutine builder from the standard library.
274Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
275`ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either.
276However, the benefit of a pipeline that uses channels as shown above is that it can actually use
277multiple CPU cores if you run it in [Dispatchers.Default] context.
278
279Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
280other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
281built using `buildSequence`/`buildIterator`, because they do not allow arbitrary suspension, unlike
282`produce`, which is fully asynchronous.
283
284### Fan-out
285
286Multiple coroutines may receive from the same channel, distributing work between themselves.
287Let us start with a producer coroutine that is periodically producing integers
288(ten numbers per second):
289
290```kotlin
291fun CoroutineScope.produceNumbers() = produce<Int> {
292 var x = 1 // start from 1
293 while (true) {
294 send(x++) // produce next
295 delay(100) // wait 0.1s
296 }
297}
298```
299
300Then we can have several processor coroutines. In this example, they just print their id and
301received number:
302
303```kotlin
304fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
305 for (msg in channel) {
306 println("Processor #$id received $msg")
307 }
308}
309```
310
311Now let us launch five processors and let them work for almost a second. See what happens:
312
313```kotlin
314fun main(args: Array<String>) = runBlocking<Unit> {
315 val producer = produceNumbers()
316 repeat(5) { launchProcessor(it, producer) }
317 delay(950)
318 producer.cancel() // cancel producer coroutine and thus kill them all
319}
320```
321
322> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-06.kt)
323
324The output will be similar to the the following one, albeit the processor ids that receive
325each specific integer may be different:
326
327```
328Processor #2 received 1
329Processor #4 received 2
330Processor #0 received 3
331Processor #1 received 4
332Processor #3 received 5
333Processor #2 received 6
334Processor #4 received 7
335Processor #0 received 8
336Processor #1 received 9
337Processor #3 received 10
338```
339
340<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
341
342Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
343over the channel that processor coroutines are doing.
344
345Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code.
346Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor
347coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach`
348always consumes (cancels) the underlying channel on its normal or abnormal completion.
349
350### Fan-in
351
352Multiple coroutines may send to the same channel.
353For example, let us have a channel of strings, and a suspending function that
354repeatedly sends a specified string to this channel with a specified delay:
355
356<!--- INCLUDE
357import kotlin.coroutines.experimental.*
358-->
359
360```kotlin
361suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
362 while (true) {
363 delay(time)
364 channel.send(s)
365 }
366}
367```
368
369Now, let us see what happens if we launch a couple of coroutines sending strings
370(in this example we launch them in the context of the main thread as main coroutine's children):
371
372```kotlin
373fun main(args: Array<String>) = runBlocking<Unit> {
374 val channel = Channel<String>()
375 launch { sendString(channel, "foo", 200L) }
376 launch { sendString(channel, "BAR!", 500L) }
377 repeat(6) { // receive first six
378 println(channel.receive())
379 }
380 coroutineContext.cancelChildren() // cancel all children to let main finish
381}
382```
383
384> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-07.kt)
385
386The output is:
387
388```text
389foo
390foo
391BAR!
392foo
393foo
394BAR!
395```
396
397<!--- TEST -->
398
399### Buffered channels
400
401The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
402meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
403if receive is invoked first, it is suspended until send is invoked.
404
405Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to
406specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
407similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
408
409Take a look at the behavior of the following code:
410
411<!--- INCLUDE
412import kotlin.coroutines.experimental.*
413-->
414
415```kotlin
416fun main(args: Array<String>) = runBlocking<Unit> {
417 val channel = Channel<Int>(4) // create buffered channel
418 val sender = launch { // launch sender coroutine
419 repeat(10) {
420 println("Sending $it") // print before sending each element
421 channel.send(it) // will suspend when buffer is full
422 }
423 }
424 // don't receive anything... just wait....
425 delay(1000)
426 sender.cancel() // cancel sender coroutine
427}
428```
429
430> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-08.kt)
431
432It prints "sending" _five_ times using a buffered channel with capacity of _four_:
433
434```text
435Sending 0
436Sending 1
437Sending 2
438Sending 3
439Sending 4
440```
441
442<!--- TEST -->
443
444The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
445
446### Channels are fair
447
448Send and receive operations to channels are _fair_ with respect to the order of their invocation from
449multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
450gets the element. In the following example two coroutines "ping" and "pong" are
451receiving the "ball" object from the shared "table" channel.
452
453<!--- INCLUDE
454import kotlin.coroutines.experimental.*
455-->
456
457```kotlin
458data class Ball(var hits: Int)
459
460fun main(args: Array<String>) = runBlocking<Unit> {
461 val table = Channel<Ball>() // a shared table
462 launch { player("ping", table) }
463 launch { player("pong", table) }
464 table.send(Ball(0)) // serve the ball
465 delay(1000) // delay 1 second
466 coroutineContext.cancelChildren() // game over, cancel them
467}
468
469suspend fun player(name: String, table: Channel<Ball>) {
470 for (ball in table) { // receive the ball in a loop
471 ball.hits++
472 println("$name $ball")
473 delay(300) // wait a bit
474 table.send(ball) // send the ball back
475 }
476}
477```
478
479> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-09.kt)
480
481The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
482coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
483received by the "pong" coroutine, because it was already waiting for it:
484
485```text
486ping Ball(hits=1)
487pong Ball(hits=2)
488ping Ball(hits=3)
489pong Ball(hits=4)
490```
491
492<!--- TEST -->
493
494Note, that sometimes channels may produce executions that look unfair due to the nature of the executor
495that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details.
496
497### Ticker channels
498
499Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel.
500Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce]
501pipelines and operators that do windowing and other time-dependent processing.
502Ticker channel can be used in [select] to perform "on tick" action.
503
504To create such channel use a factory method [ticker].
505To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
506
507Now let's see how it works in practice:
508
509```kotlin
510fun main(args: Array<String>) = runBlocking<Unit> {
511 val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
512 var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
513 println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
514
515 nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
516 println("Next element is not ready in 50 ms: $nextElement")
517
518 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
519 println("Next element is ready in 100 ms: $nextElement")
520
521 // Emulate large consumption delays
522 println("Consumer pauses for 150ms")
523 delay(150)
524 // Next element is available immediately
525 nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
526 println("Next element is available immediately after large consumer delay: $nextElement")
527 // Note that the pause between `receive` calls is taken into account and next element arrives faster
528 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
529 println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
530
531 tickerChannel.cancel() // indicate that no more elements are needed
532}
533```
534
535> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-10.kt)
536
537It prints following lines:
538
539```text
540Initial element is available immediately: kotlin.Unit
541Next element is not ready in 50 ms: null
542Next element is ready in 100 ms: kotlin.Unit
543Consumer pauses for 150ms
544Next element is available immediately after large consumer delay: kotlin.Unit
545Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
546```
547
548<!--- TEST -->
549
550Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element
551delay if a pause occurs, trying to maintain a fixed rate of produced elements.
552
553Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed
554delay between elements.
555
556
557<!--- MODULE kotlinx-coroutines-core -->
Roman Elizarov99c28aa2018-09-23 18:42:36 +0300558<!--- INDEX kotlinx.coroutines.experimental -->
559[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
560[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
561[kotlin.coroutines.experimental.CoroutineContext.cancelChildren]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/kotlin.coroutines.experimental.-coroutine-context/cancel-children.html
562[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-dispatchers/-default.html
hadihariri7db55532018-09-15 10:35:08 +0200563<!--- INDEX kotlinx.coroutines.experimental.channels -->
564[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
565[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
566[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
567[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html
568[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
569[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
570[Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel.html
571[ticker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/ticker.html
572[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/cancel.html
573[TickerMode.FIXED_DELAY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y.html
Roman Elizarov99c28aa2018-09-23 18:42:36 +0300574<!--- INDEX kotlinx.coroutines.experimental.selects -->
575[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
hadihariri7db55532018-09-15 10:35:08 +0200576<!--- END -->