blob: 9f087684153b558c69f764ef0a5167d75e55b409 [file] [log] [blame] [view]
Roman Elizarov660c2d72020-02-14 13:18:37 +03001<!--- TEST_NAME ChannelsGuideTest -->
hadihariri7db55532018-09-15 10:35:08 +02002
Prendotab8a559d2018-11-30 16:24:23 +03003**Table of contents**
hadihariri7db55532018-09-15 10:35:08 +02004
5<!--- TOC -->
6
Denys Ma97fd432019-06-05 03:10:34 +03007* [Channels](#channels)
hadihariri7db55532018-09-15 10:35:08 +02008 * [Channel basics](#channel-basics)
9 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
10 * [Building channel producers](#building-channel-producers)
11 * [Pipelines](#pipelines)
12 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
13 * [Fan-out](#fan-out)
14 * [Fan-in](#fan-in)
15 * [Buffered channels](#buffered-channels)
16 * [Channels are fair](#channels-are-fair)
17 * [Ticker channels](#ticker-channels)
18
Roman Elizarov660c2d72020-02-14 13:18:37 +030019<!--- END -->
hadihariri7db55532018-09-15 10:35:08 +020020
Denys Ma97fd432019-06-05 03:10:34 +030021## Channels
hadihariri7db55532018-09-15 10:35:08 +020022
23Deferred values provide a convenient way to transfer a single value between coroutines.
24Channels provide a way to transfer a stream of values.
25
hadihariri7db55532018-09-15 10:35:08 +020026### Channel basics
27
28A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
29instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
30a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
31
Alexander Prendotacbeef102018-09-27 18:42:04 +030032
Prendota65e6c8c2018-10-17 11:51:08 +030033<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +030034
hadihariri7db55532018-09-15 10:35:08 +020035```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +030036import kotlinx.coroutines.*
37import kotlinx.coroutines.channels.*
38
39fun main() = runBlocking {
40//sampleStart
hadihariri7db55532018-09-15 10:35:08 +020041 val channel = Channel<Int>()
42 launch {
43 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
44 for (x in 1..5) channel.send(x * x)
45 }
46 // here we print five received integers:
47 repeat(5) { println(channel.receive()) }
48 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +030049//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +020050}
51```
52
Alexander Prendotacbeef102018-09-27 18:42:04 +030053</div>
54
Inego69c26df2019-04-21 14:51:25 +070055> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt).
hadihariri7db55532018-09-15 10:35:08 +020056
57The output of this code is:
58
59```text
601
614
629
6316
6425
65Done!
66```
67
68<!--- TEST -->
69
70### Closing and iteration over channels
71
72Unlike a queue, a channel can be closed to indicate that no more elements are coming.
73On the receiver side it is convenient to use a regular `for` loop to receive elements
74from the channel.
75
76Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
77The iteration stops as soon as this close token is received, so there is a guarantee
78that all previously sent elements before the close are received:
79
Prendota65e6c8c2018-10-17 11:51:08 +030080<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +030081
hadihariri7db55532018-09-15 10:35:08 +020082```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +030083import kotlinx.coroutines.*
84import kotlinx.coroutines.channels.*
85
86fun main() = runBlocking {
87//sampleStart
hadihariri7db55532018-09-15 10:35:08 +020088 val channel = Channel<Int>()
89 launch {
90 for (x in 1..5) channel.send(x * x)
91 channel.close() // we're done sending
92 }
93 // here we print received values using `for` loop (until the channel is closed)
94 for (y in channel) println(y)
95 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +030096//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +020097}
98```
99
Alexander Prendotacbeef102018-09-27 18:42:04 +0300100</div>
101
Inego69c26df2019-04-21 14:51:25 +0700102> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt).
hadihariri7db55532018-09-15 10:35:08 +0200103
104<!--- TEST
1051
1064
1079
10816
10925
110Done!
111-->
112
113### Building channel producers
114
115The pattern where a coroutine is producing a sequence of elements is quite common.
116This is a part of _producer-consumer_ pattern that is often found in concurrent code.
117You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
118to common sense that results must be returned from functions.
119
arman simonyan02b33022018-10-10 22:06:04 +0400120There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side,
hadihariri7db55532018-09-15 10:35:08 +0200121and an extension function [consumeEach], that replaces a `for` loop on the consumer side:
122
Prendota65e6c8c2018-10-17 11:51:08 +0300123<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300124
hadihariri7db55532018-09-15 10:35:08 +0200125```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300126import kotlinx.coroutines.*
127import kotlinx.coroutines.channels.*
128
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300129fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
hadihariri7db55532018-09-15 10:35:08 +0200130 for (x in 1..5) send(x * x)
131}
132
Prendota65e6c8c2018-10-17 11:51:08 +0300133fun main() = runBlocking {
134//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200135 val squares = produceSquares()
136 squares.consumeEach { println(it) }
137 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +0300138//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200139}
140```
141
Alexander Prendotacbeef102018-09-27 18:42:04 +0300142</div>
143
Inego69c26df2019-04-21 14:51:25 +0700144> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt).
hadihariri7db55532018-09-15 10:35:08 +0200145
146<!--- TEST
1471
1484
1499
15016
15125
152Done!
153-->
154
155### Pipelines
156
157A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
158
Alexander Prendotacbeef102018-09-27 18:42:04 +0300159<div class="sample" markdown="1" theme="idea" data-highlight-only>
160
hadihariri7db55532018-09-15 10:35:08 +0200161```kotlin
162fun CoroutineScope.produceNumbers() = produce<Int> {
163 var x = 1
164 while (true) send(x++) // infinite stream of integers starting from 1
165}
166```
167
Alexander Prendotacbeef102018-09-27 18:42:04 +0300168</div>
169
hadihariri7db55532018-09-15 10:35:08 +0200170And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
armansimonyan1352f10c72018-10-11 02:52:58 +0400171In the example below, the numbers are just squared:
hadihariri7db55532018-09-15 10:35:08 +0200172
Alexander Prendotacbeef102018-09-27 18:42:04 +0300173<div class="sample" markdown="1" theme="idea" data-highlight-only>
174
hadihariri7db55532018-09-15 10:35:08 +0200175```kotlin
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300176fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
hadihariri7db55532018-09-15 10:35:08 +0200177 for (x in numbers) send(x * x)
178}
179```
180
Alexander Prendotacbeef102018-09-27 18:42:04 +0300181</div>
182
hadihariri7db55532018-09-15 10:35:08 +0200183The main code starts and connects the whole pipeline:
184
Prendota65e6c8c2018-10-17 11:51:08 +0300185<!--- CLEAR -->
186
187<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300188
hadihariri7db55532018-09-15 10:35:08 +0200189```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300190import kotlinx.coroutines.*
191import kotlinx.coroutines.channels.*
192
193fun main() = runBlocking {
194//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200195 val numbers = produceNumbers() // produces integers from 1 and on
196 val squares = square(numbers) // squares integers
Tatsuya Fujisaki9be85a42020-01-12 10:39:33 +0900197 repeat(5) {
198 println(squares.receive()) // print first five
199 }
hadihariri7db55532018-09-15 10:35:08 +0200200 println("Done!") // we are done
201 coroutineContext.cancelChildren() // cancel children coroutines
Prendota65e6c8c2018-10-17 11:51:08 +0300202//sampleEnd
203}
204
Prendota65e6c8c2018-10-17 11:51:08 +0300205fun CoroutineScope.produceNumbers() = produce<Int> {
206 var x = 1
207 while (true) send(x++) // infinite stream of integers starting from 1
208}
Joffrey Bion3feef052018-12-12 23:57:53 +0100209
Prendota65e6c8c2018-10-17 11:51:08 +0300210fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
211 for (x in numbers) send(x * x)
hadihariri7db55532018-09-15 10:35:08 +0200212}
213```
214
Alexander Prendotacbeef102018-09-27 18:42:04 +0300215</div>
216
Inego69c26df2019-04-21 14:51:25 +0700217> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt).
hadihariri7db55532018-09-15 10:35:08 +0200218
219<!--- TEST
2201
2214
2229
22316
22425
225Done!
226-->
227
228> All functions that create coroutines are defined as extensions on [CoroutineScope],
armansimonyan1352f10c72018-10-11 02:52:58 +0400229so that we can rely on [structured concurrency](https://kotlinlang.org/docs/reference/coroutines/composing-suspending-functions.html#structured-concurrency-with-async) to make
hadihariri7db55532018-09-15 10:35:08 +0200230sure that we don't have lingering global coroutines in our application.
231
232### Prime numbers with pipeline
233
234Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
235of coroutines. We start with an infinite sequence of numbers.
Alexander Prendotacbeef102018-09-27 18:42:04 +0300236
237<div class="sample" markdown="1" theme="idea" data-highlight-only>
hadihariri7db55532018-09-15 10:35:08 +0200238
239```kotlin
240fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
241 var x = start
242 while (true) send(x++) // infinite stream of integers from start
243}
244```
245
Alexander Prendotacbeef102018-09-27 18:42:04 +0300246</div>
247
hadihariri7db55532018-09-15 10:35:08 +0200248The following pipeline stage filters an incoming stream of numbers, removing all the numbers
249that are divisible by the given prime number:
250
Alexander Prendotacbeef102018-09-27 18:42:04 +0300251<div class="sample" markdown="1" theme="idea" data-highlight-only>
252
hadihariri7db55532018-09-15 10:35:08 +0200253```kotlin
254fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
255 for (x in numbers) if (x % prime != 0) send(x)
256}
257```
258
Alexander Prendotacbeef102018-09-27 18:42:04 +0300259</div>
260
hadihariri7db55532018-09-15 10:35:08 +0200261Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
262and launching new pipeline stage for each prime number found:
263
264```
265numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
266```
267
268The following example prints the first ten prime numbers,
269running the whole pipeline in the context of the main thread. Since all the coroutines are launched in
270the scope of the main [runBlocking] coroutine
271we don't have to keep an explicit list of all the coroutines we have started.
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300272We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren]
hadihariri7db55532018-09-15 10:35:08 +0200273extension function to cancel all the children coroutines after we have printed
274the first ten prime numbers.
275
Prendota65e6c8c2018-10-17 11:51:08 +0300276<!--- CLEAR -->
277
278<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300279
hadihariri7db55532018-09-15 10:35:08 +0200280```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300281import kotlinx.coroutines.*
282import kotlinx.coroutines.channels.*
283
284fun main() = runBlocking {
285//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200286 var cur = numbersFrom(2)
Tatsuya Fujisaki9be85a42020-01-12 10:39:33 +0900287 repeat(10) {
hadihariri7db55532018-09-15 10:35:08 +0200288 val prime = cur.receive()
289 println(prime)
290 cur = filter(cur, prime)
291 }
292 coroutineContext.cancelChildren() // cancel all children to let main finish
Prendota65e6c8c2018-10-17 11:51:08 +0300293//sampleEnd
294}
295
296fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
297 var x = start
298 while (true) send(x++) // infinite stream of integers from start
299}
300
301fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
302 for (x in numbers) if (x % prime != 0) send(x)
hadihariri7db55532018-09-15 10:35:08 +0200303}
304```
305
Alexander Prendotacbeef102018-09-27 18:42:04 +0300306</div>
307
Inego69c26df2019-04-21 14:51:25 +0700308> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt).
hadihariri7db55532018-09-15 10:35:08 +0200309
310The output of this code is:
311
312```text
3132
3143
3155
3167
31711
31813
31917
32019
32123
32229
323```
324
325<!--- TEST -->
326
Inegoebe519a2019-04-21 13:22:27 +0700327Note that you can build the same pipeline using
Ahmad El-Melegy153e21f2019-03-23 23:27:36 +0200328[`iterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/iterator.html)
hadihariri7db55532018-09-15 10:35:08 +0200329coroutine builder from the standard library.
Ahmad El-Melegy153e21f2019-03-23 23:27:36 +0200330Replace `produce` with `iterator`, `send` with `yield`, `receive` with `next`,
hadihariri7db55532018-09-15 10:35:08 +0200331`ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either.
332However, the benefit of a pipeline that uses channels as shown above is that it can actually use
333multiple CPU cores if you run it in [Dispatchers.Default] context.
334
335Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
336other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
Ahmad El-Melegy153e21f2019-03-23 23:27:36 +0200337built using `sequence`/`iterator`, because they do not allow arbitrary suspension, unlike
hadihariri7db55532018-09-15 10:35:08 +0200338`produce`, which is fully asynchronous.
339
340### Fan-out
341
342Multiple coroutines may receive from the same channel, distributing work between themselves.
343Let us start with a producer coroutine that is periodically producing integers
344(ten numbers per second):
345
Alexander Prendotacbeef102018-09-27 18:42:04 +0300346<div class="sample" markdown="1" theme="idea" data-highlight-only>
347
hadihariri7db55532018-09-15 10:35:08 +0200348```kotlin
349fun CoroutineScope.produceNumbers() = produce<Int> {
350 var x = 1 // start from 1
351 while (true) {
352 send(x++) // produce next
353 delay(100) // wait 0.1s
354 }
355}
356```
357
Alexander Prendotacbeef102018-09-27 18:42:04 +0300358</div>
359
hadihariri7db55532018-09-15 10:35:08 +0200360Then we can have several processor coroutines. In this example, they just print their id and
361received number:
362
Alexander Prendotacbeef102018-09-27 18:42:04 +0300363<div class="sample" markdown="1" theme="idea" data-highlight-only>
364
hadihariri7db55532018-09-15 10:35:08 +0200365```kotlin
366fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
367 for (msg in channel) {
368 println("Processor #$id received $msg")
369 }
370}
371```
372
Alexander Prendotacbeef102018-09-27 18:42:04 +0300373</div>
374
hadihariri7db55532018-09-15 10:35:08 +0200375Now let us launch five processors and let them work for almost a second. See what happens:
376
Prendota65e6c8c2018-10-17 11:51:08 +0300377<!--- CLEAR -->
378
379<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300380
hadihariri7db55532018-09-15 10:35:08 +0200381```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300382import kotlinx.coroutines.*
383import kotlinx.coroutines.channels.*
384
385fun main() = runBlocking<Unit> {
386//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200387 val producer = produceNumbers()
388 repeat(5) { launchProcessor(it, producer) }
389 delay(950)
390 producer.cancel() // cancel producer coroutine and thus kill them all
Prendota65e6c8c2018-10-17 11:51:08 +0300391//sampleEnd
392}
393
394fun CoroutineScope.produceNumbers() = produce<Int> {
395 var x = 1 // start from 1
396 while (true) {
397 send(x++) // produce next
398 delay(100) // wait 0.1s
399 }
400}
401
402fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
403 for (msg in channel) {
404 println("Processor #$id received $msg")
405 }
hadihariri7db55532018-09-15 10:35:08 +0200406}
407```
408
Alexander Prendotacbeef102018-09-27 18:42:04 +0300409</div>
410
Inego69c26df2019-04-21 14:51:25 +0700411> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt).
hadihariri7db55532018-09-15 10:35:08 +0200412
413The output will be similar to the the following one, albeit the processor ids that receive
414each specific integer may be different:
415
416```
417Processor #2 received 1
418Processor #4 received 2
419Processor #0 received 3
420Processor #1 received 4
421Processor #3 received 5
422Processor #2 received 6
423Processor #4 received 7
424Processor #0 received 8
425Processor #1 received 9
426Processor #3 received 10
427```
428
429<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
430
Inegoebe519a2019-04-21 13:22:27 +0700431Note that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
hadihariri7db55532018-09-15 10:35:08 +0200432over the channel that processor coroutines are doing.
433
434Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code.
435Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor
436coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach`
437always consumes (cancels) the underlying channel on its normal or abnormal completion.
438
439### Fan-in
440
441Multiple coroutines may send to the same channel.
442For example, let us have a channel of strings, and a suspending function that
443repeatedly sends a specified string to this channel with a specified delay:
444
Alexander Prendotacbeef102018-09-27 18:42:04 +0300445<div class="sample" markdown="1" theme="idea" data-highlight-only>
446
hadihariri7db55532018-09-15 10:35:08 +0200447```kotlin
448suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
449 while (true) {
450 delay(time)
451 channel.send(s)
452 }
453}
454```
455
Alexander Prendotacbeef102018-09-27 18:42:04 +0300456</div>
457
hadihariri7db55532018-09-15 10:35:08 +0200458Now, let us see what happens if we launch a couple of coroutines sending strings
459(in this example we launch them in the context of the main thread as main coroutine's children):
460
Prendota65e6c8c2018-10-17 11:51:08 +0300461<!--- CLEAR -->
462
463<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300464
hadihariri7db55532018-09-15 10:35:08 +0200465```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300466import kotlinx.coroutines.*
467import kotlinx.coroutines.channels.*
468
469fun main() = runBlocking {
470//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200471 val channel = Channel<String>()
472 launch { sendString(channel, "foo", 200L) }
473 launch { sendString(channel, "BAR!", 500L) }
474 repeat(6) { // receive first six
475 println(channel.receive())
476 }
477 coroutineContext.cancelChildren() // cancel all children to let main finish
Prendota65e6c8c2018-10-17 11:51:08 +0300478//sampleEnd
479}
480
481suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
482 while (true) {
483 delay(time)
484 channel.send(s)
485 }
hadihariri7db55532018-09-15 10:35:08 +0200486}
487```
488
Alexander Prendotacbeef102018-09-27 18:42:04 +0300489</div>
490
Inego69c26df2019-04-21 14:51:25 +0700491> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt).
hadihariri7db55532018-09-15 10:35:08 +0200492
493The output is:
494
495```text
496foo
497foo
498BAR!
499foo
500foo
501BAR!
502```
503
504<!--- TEST -->
505
506### Buffered channels
507
508The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
509meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
510if receive is invoked first, it is suspended until send is invoked.
511
512Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to
513specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
514similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
515
516Take a look at the behavior of the following code:
517
hadihariri7db55532018-09-15 10:35:08 +0200518
Prendota65e6c8c2018-10-17 11:51:08 +0300519<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300520
hadihariri7db55532018-09-15 10:35:08 +0200521```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300522import kotlinx.coroutines.*
523import kotlinx.coroutines.channels.*
524
525fun main() = runBlocking<Unit> {
526//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200527 val channel = Channel<Int>(4) // create buffered channel
528 val sender = launch { // launch sender coroutine
529 repeat(10) {
530 println("Sending $it") // print before sending each element
531 channel.send(it) // will suspend when buffer is full
532 }
533 }
534 // don't receive anything... just wait....
535 delay(1000)
536 sender.cancel() // cancel sender coroutine
Prendota65e6c8c2018-10-17 11:51:08 +0300537//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200538}
539```
540
Alexander Prendotacbeef102018-09-27 18:42:04 +0300541</div>
542
Inego69c26df2019-04-21 14:51:25 +0700543> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt).
hadihariri7db55532018-09-15 10:35:08 +0200544
545It prints "sending" _five_ times using a buffered channel with capacity of _four_:
546
547```text
548Sending 0
549Sending 1
550Sending 2
551Sending 3
552Sending 4
553```
554
555<!--- TEST -->
556
557The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
558
559### Channels are fair
560
561Send and receive operations to channels are _fair_ with respect to the order of their invocation from
562multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
563gets the element. In the following example two coroutines "ping" and "pong" are
564receiving the "ball" object from the shared "table" channel.
565
hadihariri7db55532018-09-15 10:35:08 +0200566
Prendota65e6c8c2018-10-17 11:51:08 +0300567<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300568
hadihariri7db55532018-09-15 10:35:08 +0200569```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300570import kotlinx.coroutines.*
571import kotlinx.coroutines.channels.*
572
573//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200574data class Ball(var hits: Int)
575
Prendota65e6c8c2018-10-17 11:51:08 +0300576fun main() = runBlocking {
hadihariri7db55532018-09-15 10:35:08 +0200577 val table = Channel<Ball>() // a shared table
578 launch { player("ping", table) }
579 launch { player("pong", table) }
580 table.send(Ball(0)) // serve the ball
581 delay(1000) // delay 1 second
582 coroutineContext.cancelChildren() // game over, cancel them
583}
584
585suspend fun player(name: String, table: Channel<Ball>) {
586 for (ball in table) { // receive the ball in a loop
587 ball.hits++
588 println("$name $ball")
589 delay(300) // wait a bit
590 table.send(ball) // send the ball back
591 }
592}
Prendota65e6c8c2018-10-17 11:51:08 +0300593//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200594```
595
Alexander Prendotacbeef102018-09-27 18:42:04 +0300596</div>
597
Inego69c26df2019-04-21 14:51:25 +0700598> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt).
hadihariri7db55532018-09-15 10:35:08 +0200599
600The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
601coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
602received by the "pong" coroutine, because it was already waiting for it:
603
604```text
605ping Ball(hits=1)
606pong Ball(hits=2)
607ping Ball(hits=3)
608pong Ball(hits=4)
609```
610
611<!--- TEST -->
612
Inegoebe519a2019-04-21 13:22:27 +0700613Note that sometimes channels may produce executions that look unfair due to the nature of the executor
hadihariri7db55532018-09-15 10:35:08 +0200614that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details.
615
616### Ticker channels
617
618Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel.
619Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce]
620pipelines and operators that do windowing and other time-dependent processing.
621Ticker channel can be used in [select] to perform "on tick" action.
622
623To create such channel use a factory method [ticker].
624To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
625
626Now let's see how it works in practice:
627
Alexander Prendotacbeef102018-09-27 18:42:04 +0300628<div class="sample" markdown="1" theme="idea" data-highlight-only>
629
hadihariri7db55532018-09-15 10:35:08 +0200630```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300631import kotlinx.coroutines.*
632import kotlinx.coroutines.channels.*
633
634fun main() = runBlocking<Unit> {
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300635 val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
hadihariri7db55532018-09-15 10:35:08 +0200636 var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
Vadim Semenov171fcc12020-05-03 18:20:09 +0100637 println("Initial element is available immediately: $nextElement") // no initial delay
hadihariri7db55532018-09-15 10:35:08 +0200638
Vadim Semenov171fcc12020-05-03 18:20:09 +0100639 nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
hadihariri7db55532018-09-15 10:35:08 +0200640 println("Next element is not ready in 50 ms: $nextElement")
641
642 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
643 println("Next element is ready in 100 ms: $nextElement")
644
645 // Emulate large consumption delays
646 println("Consumer pauses for 150ms")
647 delay(150)
648 // Next element is available immediately
649 nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
650 println("Next element is available immediately after large consumer delay: $nextElement")
651 // Note that the pause between `receive` calls is taken into account and next element arrives faster
652 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
653 println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
654
655 tickerChannel.cancel() // indicate that no more elements are needed
656}
657```
658
Alexander Prendotacbeef102018-09-27 18:42:04 +0300659</div>
660
Inego69c26df2019-04-21 14:51:25 +0700661> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt).
hadihariri7db55532018-09-15 10:35:08 +0200662
663It prints following lines:
664
665```text
666Initial element is available immediately: kotlin.Unit
667Next element is not ready in 50 ms: null
668Next element is ready in 100 ms: kotlin.Unit
669Consumer pauses for 150ms
670Next element is available immediately after large consumer delay: kotlin.Unit
671Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
672```
673
674<!--- TEST -->
675
676Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element
677delay if a pause occurs, trying to maintain a fixed rate of produced elements.
678
679Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed
680delay between elements.
681
682
683<!--- MODULE kotlinx-coroutines-core -->
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300684<!--- INDEX kotlinx.coroutines -->
685[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
686[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
Vsevolod Tolstopyatov706e3932018-10-13 15:40:32 +0300687[kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/kotlin.coroutines.-coroutine-context/cancel-children.html
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300688[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
689<!--- INDEX kotlinx.coroutines.channels -->
690[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
691[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
692[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
693[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html
694[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
695[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
696[Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html
697[ticker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html
698[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
699[TickerMode.FIXED_DELAY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y.html
700<!--- INDEX kotlinx.coroutines.selects -->
701[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
hadihariri7db55532018-09-15 10:35:08 +0200702<!--- END -->