blob: 57e6c6de5e7d5edcbf2d6c275e5ad895ac0fba9e [file] [log] [blame] [view]
hadihariri7db55532018-09-15 10:35:08 +02001<!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt
2/*
3 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
4 */
5
6// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
Roman Elizarov0950dfa2018-07-13 10:33:25 +03007package kotlinx.coroutines.guide.$$1$$2
hadihariri7db55532018-09-15 10:35:08 +02008-->
9<!--- KNIT ../core/kotlinx-coroutines-core/test/guide/.*\.kt -->
10<!--- TEST_OUT ../core/kotlinx-coroutines-core/test/guide/test/ChannelsGuideTest.kt
11// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
Roman Elizarov0950dfa2018-07-13 10:33:25 +030012package kotlinx.coroutines.guide.test
hadihariri7db55532018-09-15 10:35:08 +020013
14import org.junit.Test
15
16class ChannelsGuideTest {
17-->
18## Table of contents
19
20<!--- TOC -->
21
Vsevolod Tolstopyatovb590aa32018-09-27 18:34:05 +030022* [Channels (experimental)](#channels-experimental)
hadihariri7db55532018-09-15 10:35:08 +020023 * [Channel basics](#channel-basics)
24 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
25 * [Building channel producers](#building-channel-producers)
26 * [Pipelines](#pipelines)
27 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
28 * [Fan-out](#fan-out)
29 * [Fan-in](#fan-in)
30 * [Buffered channels](#buffered-channels)
31 * [Channels are fair](#channels-are-fair)
32 * [Ticker channels](#ticker-channels)
33
34<!--- END_TOC -->
35
hadihariri7db55532018-09-15 10:35:08 +020036## Channels (experimental)
37
38Deferred values provide a convenient way to transfer a single value between coroutines.
39Channels provide a way to transfer a stream of values.
40
41> Channels are an experimental feature of `kotlinx.coroutines`. Their API is expected to
42evolve in the upcoming updates of the `kotlinx.coroutines` library with potentially
43breaking changes.
44
45### Channel basics
46
47A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
48instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
49a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
50
Alexander Prendotacbeef102018-09-27 18:42:04 +030051
Prendota65e6c8c2018-10-17 11:51:08 +030052<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +030053
hadihariri7db55532018-09-15 10:35:08 +020054```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +030055import kotlinx.coroutines.*
56import kotlinx.coroutines.channels.*
57
58fun main() = runBlocking {
59//sampleStart
hadihariri7db55532018-09-15 10:35:08 +020060 val channel = Channel<Int>()
61 launch {
62 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
63 for (x in 1..5) channel.send(x * x)
64 }
65 // here we print five received integers:
66 repeat(5) { println(channel.receive()) }
67 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +030068//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +020069}
70```
71
Alexander Prendotacbeef102018-09-27 18:42:04 +030072</div>
73
hadihariri7db55532018-09-15 10:35:08 +020074> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-01.kt)
75
76The output of this code is:
77
78```text
791
804
819
8216
8325
84Done!
85```
86
87<!--- TEST -->
88
89### Closing and iteration over channels
90
91Unlike a queue, a channel can be closed to indicate that no more elements are coming.
92On the receiver side it is convenient to use a regular `for` loop to receive elements
93from the channel.
94
95Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
96The iteration stops as soon as this close token is received, so there is a guarantee
97that all previously sent elements before the close are received:
98
Prendota65e6c8c2018-10-17 11:51:08 +030099<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300100
hadihariri7db55532018-09-15 10:35:08 +0200101```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300102import kotlinx.coroutines.*
103import kotlinx.coroutines.channels.*
104
105fun main() = runBlocking {
106//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200107 val channel = Channel<Int>()
108 launch {
109 for (x in 1..5) channel.send(x * x)
110 channel.close() // we're done sending
111 }
112 // here we print received values using `for` loop (until the channel is closed)
113 for (y in channel) println(y)
114 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +0300115//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200116}
117```
118
Alexander Prendotacbeef102018-09-27 18:42:04 +0300119</div>
120
hadihariri7db55532018-09-15 10:35:08 +0200121> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-02.kt)
122
123<!--- TEST
1241
1254
1269
12716
12825
129Done!
130-->
131
132### Building channel producers
133
134The pattern where a coroutine is producing a sequence of elements is quite common.
135This is a part of _producer-consumer_ pattern that is often found in concurrent code.
136You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
137to common sense that results must be returned from functions.
138
arman simonyan02b33022018-10-10 22:06:04 +0400139There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side,
hadihariri7db55532018-09-15 10:35:08 +0200140and an extension function [consumeEach], that replaces a `for` loop on the consumer side:
141
Prendota65e6c8c2018-10-17 11:51:08 +0300142<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300143
hadihariri7db55532018-09-15 10:35:08 +0200144```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300145import kotlinx.coroutines.*
146import kotlinx.coroutines.channels.*
147
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300148fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
hadihariri7db55532018-09-15 10:35:08 +0200149 for (x in 1..5) send(x * x)
150}
151
Prendota65e6c8c2018-10-17 11:51:08 +0300152fun main() = runBlocking {
153//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200154 val squares = produceSquares()
155 squares.consumeEach { println(it) }
156 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +0300157//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200158}
159```
160
Alexander Prendotacbeef102018-09-27 18:42:04 +0300161</div>
162
hadihariri7db55532018-09-15 10:35:08 +0200163> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-03.kt)
164
165<!--- TEST
1661
1674
1689
16916
17025
171Done!
172-->
173
174### Pipelines
175
176A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
177
Alexander Prendotacbeef102018-09-27 18:42:04 +0300178<div class="sample" markdown="1" theme="idea" data-highlight-only>
179
hadihariri7db55532018-09-15 10:35:08 +0200180```kotlin
181fun CoroutineScope.produceNumbers() = produce<Int> {
182 var x = 1
183 while (true) send(x++) // infinite stream of integers starting from 1
184}
185```
186
Alexander Prendotacbeef102018-09-27 18:42:04 +0300187</div>
188
hadihariri7db55532018-09-15 10:35:08 +0200189And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
armansimonyan1352f10c72018-10-11 02:52:58 +0400190In the example below, the numbers are just squared:
hadihariri7db55532018-09-15 10:35:08 +0200191
Alexander Prendotacbeef102018-09-27 18:42:04 +0300192<div class="sample" markdown="1" theme="idea" data-highlight-only>
193
hadihariri7db55532018-09-15 10:35:08 +0200194```kotlin
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300195fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
hadihariri7db55532018-09-15 10:35:08 +0200196 for (x in numbers) send(x * x)
197}
198```
199
Alexander Prendotacbeef102018-09-27 18:42:04 +0300200</div>
201
hadihariri7db55532018-09-15 10:35:08 +0200202The main code starts and connects the whole pipeline:
203
Prendota65e6c8c2018-10-17 11:51:08 +0300204<!--- CLEAR -->
205
206<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300207
hadihariri7db55532018-09-15 10:35:08 +0200208```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300209import kotlinx.coroutines.*
210import kotlinx.coroutines.channels.*
211
212fun main() = runBlocking {
213//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200214 val numbers = produceNumbers() // produces integers from 1 and on
215 val squares = square(numbers) // squares integers
216 for (i in 1..5) println(squares.receive()) // print first five
217 println("Done!") // we are done
218 coroutineContext.cancelChildren() // cancel children coroutines
Prendota65e6c8c2018-10-17 11:51:08 +0300219//sampleEnd
220}
221
222fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
223 for (x in 1..5) send(x * x)
224}
225
226fun CoroutineScope.produceNumbers() = produce<Int> {
227 var x = 1
228 while (true) send(x++) // infinite stream of integers starting from 1
229}
230fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
231 for (x in numbers) send(x * x)
hadihariri7db55532018-09-15 10:35:08 +0200232}
233```
234
Alexander Prendotacbeef102018-09-27 18:42:04 +0300235</div>
236
hadihariri7db55532018-09-15 10:35:08 +0200237> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-04.kt)
238
239<!--- TEST
2401
2414
2429
24316
24425
245Done!
246-->
247
248> All functions that create coroutines are defined as extensions on [CoroutineScope],
armansimonyan1352f10c72018-10-11 02:52:58 +0400249so that we can rely on [structured concurrency](https://kotlinlang.org/docs/reference/coroutines/composing-suspending-functions.html#structured-concurrency-with-async) to make
hadihariri7db55532018-09-15 10:35:08 +0200250sure that we don't have lingering global coroutines in our application.
251
252### Prime numbers with pipeline
253
254Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
255of coroutines. We start with an infinite sequence of numbers.
Alexander Prendotacbeef102018-09-27 18:42:04 +0300256
257<div class="sample" markdown="1" theme="idea" data-highlight-only>
hadihariri7db55532018-09-15 10:35:08 +0200258
259```kotlin
260fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
261 var x = start
262 while (true) send(x++) // infinite stream of integers from start
263}
264```
265
Alexander Prendotacbeef102018-09-27 18:42:04 +0300266</div>
267
hadihariri7db55532018-09-15 10:35:08 +0200268The following pipeline stage filters an incoming stream of numbers, removing all the numbers
269that are divisible by the given prime number:
270
Alexander Prendotacbeef102018-09-27 18:42:04 +0300271<div class="sample" markdown="1" theme="idea" data-highlight-only>
272
hadihariri7db55532018-09-15 10:35:08 +0200273```kotlin
274fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
275 for (x in numbers) if (x % prime != 0) send(x)
276}
277```
278
Alexander Prendotacbeef102018-09-27 18:42:04 +0300279</div>
280
hadihariri7db55532018-09-15 10:35:08 +0200281Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
282and launching new pipeline stage for each prime number found:
283
284```
285numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
286```
287
288The following example prints the first ten prime numbers,
289running the whole pipeline in the context of the main thread. Since all the coroutines are launched in
290the scope of the main [runBlocking] coroutine
291we don't have to keep an explicit list of all the coroutines we have started.
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300292We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren]
hadihariri7db55532018-09-15 10:35:08 +0200293extension function to cancel all the children coroutines after we have printed
294the first ten prime numbers.
295
Prendota65e6c8c2018-10-17 11:51:08 +0300296<!--- CLEAR -->
297
298<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300299
hadihariri7db55532018-09-15 10:35:08 +0200300```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300301import kotlinx.coroutines.*
302import kotlinx.coroutines.channels.*
303
304fun main() = runBlocking {
305//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200306 var cur = numbersFrom(2)
307 for (i in 1..10) {
308 val prime = cur.receive()
309 println(prime)
310 cur = filter(cur, prime)
311 }
312 coroutineContext.cancelChildren() // cancel all children to let main finish
Prendota65e6c8c2018-10-17 11:51:08 +0300313//sampleEnd
314}
315
316fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
317 var x = start
318 while (true) send(x++) // infinite stream of integers from start
319}
320
321fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
322 for (x in numbers) if (x % prime != 0) send(x)
hadihariri7db55532018-09-15 10:35:08 +0200323}
324```
325
Alexander Prendotacbeef102018-09-27 18:42:04 +0300326</div>
327
hadihariri7db55532018-09-15 10:35:08 +0200328> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-05.kt)
329
330The output of this code is:
331
332```text
3332
3343
3355
3367
33711
33813
33917
34019
34123
34229
343```
344
345<!--- TEST -->
346
347Note, that you can build the same pipeline using
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300348[`buildIterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/build-iterator.html)
hadihariri7db55532018-09-15 10:35:08 +0200349coroutine builder from the standard library.
350Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
351`ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either.
352However, the benefit of a pipeline that uses channels as shown above is that it can actually use
353multiple CPU cores if you run it in [Dispatchers.Default] context.
354
355Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
356other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
357built using `buildSequence`/`buildIterator`, because they do not allow arbitrary suspension, unlike
358`produce`, which is fully asynchronous.
359
360### Fan-out
361
362Multiple coroutines may receive from the same channel, distributing work between themselves.
363Let us start with a producer coroutine that is periodically producing integers
364(ten numbers per second):
365
Alexander Prendotacbeef102018-09-27 18:42:04 +0300366<div class="sample" markdown="1" theme="idea" data-highlight-only>
367
hadihariri7db55532018-09-15 10:35:08 +0200368```kotlin
369fun CoroutineScope.produceNumbers() = produce<Int> {
370 var x = 1 // start from 1
371 while (true) {
372 send(x++) // produce next
373 delay(100) // wait 0.1s
374 }
375}
376```
377
Alexander Prendotacbeef102018-09-27 18:42:04 +0300378</div>
379
hadihariri7db55532018-09-15 10:35:08 +0200380Then we can have several processor coroutines. In this example, they just print their id and
381received number:
382
Alexander Prendotacbeef102018-09-27 18:42:04 +0300383<div class="sample" markdown="1" theme="idea" data-highlight-only>
384
hadihariri7db55532018-09-15 10:35:08 +0200385```kotlin
386fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
387 for (msg in channel) {
388 println("Processor #$id received $msg")
389 }
390}
391```
392
Alexander Prendotacbeef102018-09-27 18:42:04 +0300393</div>
394
hadihariri7db55532018-09-15 10:35:08 +0200395Now let us launch five processors and let them work for almost a second. See what happens:
396
Prendota65e6c8c2018-10-17 11:51:08 +0300397<!--- CLEAR -->
398
399<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300400
hadihariri7db55532018-09-15 10:35:08 +0200401```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300402import kotlinx.coroutines.*
403import kotlinx.coroutines.channels.*
404
405fun main() = runBlocking<Unit> {
406//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200407 val producer = produceNumbers()
408 repeat(5) { launchProcessor(it, producer) }
409 delay(950)
410 producer.cancel() // cancel producer coroutine and thus kill them all
Prendota65e6c8c2018-10-17 11:51:08 +0300411//sampleEnd
412}
413
414fun CoroutineScope.produceNumbers() = produce<Int> {
415 var x = 1 // start from 1
416 while (true) {
417 send(x++) // produce next
418 delay(100) // wait 0.1s
419 }
420}
421
422fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
423 for (msg in channel) {
424 println("Processor #$id received $msg")
425 }
hadihariri7db55532018-09-15 10:35:08 +0200426}
427```
428
Alexander Prendotacbeef102018-09-27 18:42:04 +0300429</div>
430
hadihariri7db55532018-09-15 10:35:08 +0200431> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-06.kt)
432
433The output will be similar to the the following one, albeit the processor ids that receive
434each specific integer may be different:
435
436```
437Processor #2 received 1
438Processor #4 received 2
439Processor #0 received 3
440Processor #1 received 4
441Processor #3 received 5
442Processor #2 received 6
443Processor #4 received 7
444Processor #0 received 8
445Processor #1 received 9
446Processor #3 received 10
447```
448
449<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
450
451Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
452over the channel that processor coroutines are doing.
453
454Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code.
455Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor
456coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach`
457always consumes (cancels) the underlying channel on its normal or abnormal completion.
458
459### Fan-in
460
461Multiple coroutines may send to the same channel.
462For example, let us have a channel of strings, and a suspending function that
463repeatedly sends a specified string to this channel with a specified delay:
464
Alexander Prendotacbeef102018-09-27 18:42:04 +0300465<div class="sample" markdown="1" theme="idea" data-highlight-only>
466
hadihariri7db55532018-09-15 10:35:08 +0200467```kotlin
468suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
469 while (true) {
470 delay(time)
471 channel.send(s)
472 }
473}
474```
475
Alexander Prendotacbeef102018-09-27 18:42:04 +0300476</div>
477
hadihariri7db55532018-09-15 10:35:08 +0200478Now, let us see what happens if we launch a couple of coroutines sending strings
479(in this example we launch them in the context of the main thread as main coroutine's children):
480
Prendota65e6c8c2018-10-17 11:51:08 +0300481<!--- CLEAR -->
482
483<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300484
hadihariri7db55532018-09-15 10:35:08 +0200485```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300486import kotlinx.coroutines.*
487import kotlinx.coroutines.channels.*
488
489fun main() = runBlocking {
490//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200491 val channel = Channel<String>()
492 launch { sendString(channel, "foo", 200L) }
493 launch { sendString(channel, "BAR!", 500L) }
494 repeat(6) { // receive first six
495 println(channel.receive())
496 }
497 coroutineContext.cancelChildren() // cancel all children to let main finish
Prendota65e6c8c2018-10-17 11:51:08 +0300498//sampleEnd
499}
500
501suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
502 while (true) {
503 delay(time)
504 channel.send(s)
505 }
hadihariri7db55532018-09-15 10:35:08 +0200506}
507```
508
Alexander Prendotacbeef102018-09-27 18:42:04 +0300509</div>
510
hadihariri7db55532018-09-15 10:35:08 +0200511> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-07.kt)
512
513The output is:
514
515```text
516foo
517foo
518BAR!
519foo
520foo
521BAR!
522```
523
524<!--- TEST -->
525
526### Buffered channels
527
528The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
529meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
530if receive is invoked first, it is suspended until send is invoked.
531
532Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to
533specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
534similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
535
536Take a look at the behavior of the following code:
537
hadihariri7db55532018-09-15 10:35:08 +0200538
Prendota65e6c8c2018-10-17 11:51:08 +0300539<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300540
hadihariri7db55532018-09-15 10:35:08 +0200541```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300542import kotlinx.coroutines.*
543import kotlinx.coroutines.channels.*
544
545fun main() = runBlocking<Unit> {
546//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200547 val channel = Channel<Int>(4) // create buffered channel
548 val sender = launch { // launch sender coroutine
549 repeat(10) {
550 println("Sending $it") // print before sending each element
551 channel.send(it) // will suspend when buffer is full
552 }
553 }
554 // don't receive anything... just wait....
555 delay(1000)
556 sender.cancel() // cancel sender coroutine
Prendota65e6c8c2018-10-17 11:51:08 +0300557//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200558}
559```
560
Alexander Prendotacbeef102018-09-27 18:42:04 +0300561</div>
562
hadihariri7db55532018-09-15 10:35:08 +0200563> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-08.kt)
564
565It prints "sending" _five_ times using a buffered channel with capacity of _four_:
566
567```text
568Sending 0
569Sending 1
570Sending 2
571Sending 3
572Sending 4
573```
574
575<!--- TEST -->
576
577The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
578
579### Channels are fair
580
581Send and receive operations to channels are _fair_ with respect to the order of their invocation from
582multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
583gets the element. In the following example two coroutines "ping" and "pong" are
584receiving the "ball" object from the shared "table" channel.
585
hadihariri7db55532018-09-15 10:35:08 +0200586
Prendota65e6c8c2018-10-17 11:51:08 +0300587<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300588
hadihariri7db55532018-09-15 10:35:08 +0200589```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300590import kotlinx.coroutines.*
591import kotlinx.coroutines.channels.*
592
593//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200594data class Ball(var hits: Int)
595
Prendota65e6c8c2018-10-17 11:51:08 +0300596fun main() = runBlocking {
hadihariri7db55532018-09-15 10:35:08 +0200597 val table = Channel<Ball>() // a shared table
598 launch { player("ping", table) }
599 launch { player("pong", table) }
600 table.send(Ball(0)) // serve the ball
601 delay(1000) // delay 1 second
602 coroutineContext.cancelChildren() // game over, cancel them
603}
604
605suspend fun player(name: String, table: Channel<Ball>) {
606 for (ball in table) { // receive the ball in a loop
607 ball.hits++
608 println("$name $ball")
609 delay(300) // wait a bit
610 table.send(ball) // send the ball back
611 }
612}
Prendota65e6c8c2018-10-17 11:51:08 +0300613//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200614```
615
Alexander Prendotacbeef102018-09-27 18:42:04 +0300616</div>
617
hadihariri7db55532018-09-15 10:35:08 +0200618> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-09.kt)
619
620The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
621coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
622received by the "pong" coroutine, because it was already waiting for it:
623
624```text
625ping Ball(hits=1)
626pong Ball(hits=2)
627ping Ball(hits=3)
628pong Ball(hits=4)
629```
630
631<!--- TEST -->
632
633Note, that sometimes channels may produce executions that look unfair due to the nature of the executor
634that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details.
635
636### Ticker channels
637
638Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel.
639Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce]
640pipelines and operators that do windowing and other time-dependent processing.
641Ticker channel can be used in [select] to perform "on tick" action.
642
643To create such channel use a factory method [ticker].
644To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
645
646Now let's see how it works in practice:
647
Alexander Prendotacbeef102018-09-27 18:42:04 +0300648<div class="sample" markdown="1" theme="idea" data-highlight-only>
649
hadihariri7db55532018-09-15 10:35:08 +0200650```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300651import kotlinx.coroutines.*
652import kotlinx.coroutines.channels.*
653
654fun main() = runBlocking<Unit> {
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300655 val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
hadihariri7db55532018-09-15 10:35:08 +0200656 var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
657 println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
658
659 nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
660 println("Next element is not ready in 50 ms: $nextElement")
661
662 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
663 println("Next element is ready in 100 ms: $nextElement")
664
665 // Emulate large consumption delays
666 println("Consumer pauses for 150ms")
667 delay(150)
668 // Next element is available immediately
669 nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
670 println("Next element is available immediately after large consumer delay: $nextElement")
671 // Note that the pause between `receive` calls is taken into account and next element arrives faster
672 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
673 println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
674
675 tickerChannel.cancel() // indicate that no more elements are needed
676}
677```
678
Alexander Prendotacbeef102018-09-27 18:42:04 +0300679</div>
680
hadihariri7db55532018-09-15 10:35:08 +0200681> You can get full code [here](../core/kotlinx-coroutines-core/test/guide/example-channel-10.kt)
682
683It prints following lines:
684
685```text
686Initial element is available immediately: kotlin.Unit
687Next element is not ready in 50 ms: null
688Next element is ready in 100 ms: kotlin.Unit
689Consumer pauses for 150ms
690Next element is available immediately after large consumer delay: kotlin.Unit
691Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
692```
693
694<!--- TEST -->
695
696Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element
697delay if a pause occurs, trying to maintain a fixed rate of produced elements.
698
699Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed
700delay between elements.
701
702
703<!--- MODULE kotlinx-coroutines-core -->
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300704<!--- INDEX kotlinx.coroutines -->
705[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
706[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
Vsevolod Tolstopyatov706e3932018-10-13 15:40:32 +0300707[kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/kotlin.coroutines.-coroutine-context/cancel-children.html
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300708[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
709<!--- INDEX kotlinx.coroutines.channels -->
710[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
711[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
712[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
713[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html
714[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
715[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
716[Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html
717[ticker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html
718[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
719[TickerMode.FIXED_DELAY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y.html
720<!--- INDEX kotlinx.coroutines.selects -->
721[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
hadihariri7db55532018-09-15 10:35:08 +0200722<!--- END -->