blob: 6f5f06977b9c093c6fa82d562decbd4afd7ee530 [file] [log] [blame] [view]
hadihariri7db55532018-09-15 10:35:08 +02001<!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt
2/*
Roman Elizarovdb0ef0c2019-07-03 15:02:44 +03003 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
hadihariri7db55532018-09-15 10:35:08 +02004 */
5
6// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
Roman Elizarov0950dfa2018-07-13 10:33:25 +03007package kotlinx.coroutines.guide.$$1$$2
hadihariri7db55532018-09-15 10:35:08 +02008-->
Vsevolod Tolstopyatove50a0fa2019-01-28 11:34:24 +03009<!--- KNIT ../kotlinx-coroutines-core/jvm/test/guide/.*\.kt -->
10<!--- TEST_OUT ../kotlinx-coroutines-core/jvm/test/guide/test/ChannelsGuideTest.kt
hadihariri7db55532018-09-15 10:35:08 +020011// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
Roman Elizarov0950dfa2018-07-13 10:33:25 +030012package kotlinx.coroutines.guide.test
hadihariri7db55532018-09-15 10:35:08 +020013
14import org.junit.Test
15
16class ChannelsGuideTest {
17-->
Prendotab8a559d2018-11-30 16:24:23 +030018**Table of contents**
hadihariri7db55532018-09-15 10:35:08 +020019
20<!--- TOC -->
21
Denys Ma97fd432019-06-05 03:10:34 +030022* [Channels](#channels)
hadihariri7db55532018-09-15 10:35:08 +020023 * [Channel basics](#channel-basics)
24 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
25 * [Building channel producers](#building-channel-producers)
26 * [Pipelines](#pipelines)
27 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
28 * [Fan-out](#fan-out)
29 * [Fan-in](#fan-in)
30 * [Buffered channels](#buffered-channels)
31 * [Channels are fair](#channels-are-fair)
32 * [Ticker channels](#ticker-channels)
33
34<!--- END_TOC -->
35
Denys Ma97fd432019-06-05 03:10:34 +030036## Channels
hadihariri7db55532018-09-15 10:35:08 +020037
38Deferred values provide a convenient way to transfer a single value between coroutines.
39Channels provide a way to transfer a stream of values.
40
hadihariri7db55532018-09-15 10:35:08 +020041### Channel basics
42
43A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
44instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
45a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
46
Alexander Prendotacbeef102018-09-27 18:42:04 +030047
Prendota65e6c8c2018-10-17 11:51:08 +030048<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +030049
hadihariri7db55532018-09-15 10:35:08 +020050```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +030051import kotlinx.coroutines.*
52import kotlinx.coroutines.channels.*
53
54fun main() = runBlocking {
55//sampleStart
hadihariri7db55532018-09-15 10:35:08 +020056 val channel = Channel<Int>()
57 launch {
58 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
59 for (x in 1..5) channel.send(x * x)
60 }
61 // here we print five received integers:
62 repeat(5) { println(channel.receive()) }
63 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +030064//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +020065}
66```
67
Alexander Prendotacbeef102018-09-27 18:42:04 +030068</div>
69
Inego69c26df2019-04-21 14:51:25 +070070> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt).
hadihariri7db55532018-09-15 10:35:08 +020071
72The output of this code is:
73
74```text
751
764
779
7816
7925
80Done!
81```
82
83<!--- TEST -->
84
85### Closing and iteration over channels
86
87Unlike a queue, a channel can be closed to indicate that no more elements are coming.
88On the receiver side it is convenient to use a regular `for` loop to receive elements
89from the channel.
90
91Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
92The iteration stops as soon as this close token is received, so there is a guarantee
93that all previously sent elements before the close are received:
94
Prendota65e6c8c2018-10-17 11:51:08 +030095<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +030096
hadihariri7db55532018-09-15 10:35:08 +020097```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +030098import kotlinx.coroutines.*
99import kotlinx.coroutines.channels.*
100
101fun main() = runBlocking {
102//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200103 val channel = Channel<Int>()
104 launch {
105 for (x in 1..5) channel.send(x * x)
106 channel.close() // we're done sending
107 }
108 // here we print received values using `for` loop (until the channel is closed)
109 for (y in channel) println(y)
110 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +0300111//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200112}
113```
114
Alexander Prendotacbeef102018-09-27 18:42:04 +0300115</div>
116
Inego69c26df2019-04-21 14:51:25 +0700117> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt).
hadihariri7db55532018-09-15 10:35:08 +0200118
119<!--- TEST
1201
1214
1229
12316
12425
125Done!
126-->
127
128### Building channel producers
129
130The pattern where a coroutine is producing a sequence of elements is quite common.
131This is a part of _producer-consumer_ pattern that is often found in concurrent code.
132You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
133to common sense that results must be returned from functions.
134
arman simonyan02b33022018-10-10 22:06:04 +0400135There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side,
hadihariri7db55532018-09-15 10:35:08 +0200136and an extension function [consumeEach], that replaces a `for` loop on the consumer side:
137
Prendota65e6c8c2018-10-17 11:51:08 +0300138<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300139
hadihariri7db55532018-09-15 10:35:08 +0200140```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300141import kotlinx.coroutines.*
142import kotlinx.coroutines.channels.*
143
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300144fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
hadihariri7db55532018-09-15 10:35:08 +0200145 for (x in 1..5) send(x * x)
146}
147
Prendota65e6c8c2018-10-17 11:51:08 +0300148fun main() = runBlocking {
149//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200150 val squares = produceSquares()
151 squares.consumeEach { println(it) }
152 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +0300153//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200154}
155```
156
Alexander Prendotacbeef102018-09-27 18:42:04 +0300157</div>
158
Inego69c26df2019-04-21 14:51:25 +0700159> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt).
hadihariri7db55532018-09-15 10:35:08 +0200160
161<!--- TEST
1621
1634
1649
16516
16625
167Done!
168-->
169
170### Pipelines
171
172A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
173
Alexander Prendotacbeef102018-09-27 18:42:04 +0300174<div class="sample" markdown="1" theme="idea" data-highlight-only>
175
hadihariri7db55532018-09-15 10:35:08 +0200176```kotlin
177fun CoroutineScope.produceNumbers() = produce<Int> {
178 var x = 1
179 while (true) send(x++) // infinite stream of integers starting from 1
180}
181```
182
Alexander Prendotacbeef102018-09-27 18:42:04 +0300183</div>
184
hadihariri7db55532018-09-15 10:35:08 +0200185And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
armansimonyan1352f10c72018-10-11 02:52:58 +0400186In the example below, the numbers are just squared:
hadihariri7db55532018-09-15 10:35:08 +0200187
Alexander Prendotacbeef102018-09-27 18:42:04 +0300188<div class="sample" markdown="1" theme="idea" data-highlight-only>
189
hadihariri7db55532018-09-15 10:35:08 +0200190```kotlin
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300191fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
hadihariri7db55532018-09-15 10:35:08 +0200192 for (x in numbers) send(x * x)
193}
194```
195
Alexander Prendotacbeef102018-09-27 18:42:04 +0300196</div>
197
hadihariri7db55532018-09-15 10:35:08 +0200198The main code starts and connects the whole pipeline:
199
Prendota65e6c8c2018-10-17 11:51:08 +0300200<!--- CLEAR -->
201
202<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300203
hadihariri7db55532018-09-15 10:35:08 +0200204```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300205import kotlinx.coroutines.*
206import kotlinx.coroutines.channels.*
207
208fun main() = runBlocking {
209//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200210 val numbers = produceNumbers() // produces integers from 1 and on
211 val squares = square(numbers) // squares integers
Tatsuya Fujisaki9be85a42020-01-12 10:39:33 +0900212 repeat(5) {
213 println(squares.receive()) // print first five
214 }
hadihariri7db55532018-09-15 10:35:08 +0200215 println("Done!") // we are done
216 coroutineContext.cancelChildren() // cancel children coroutines
Prendota65e6c8c2018-10-17 11:51:08 +0300217//sampleEnd
218}
219
Prendota65e6c8c2018-10-17 11:51:08 +0300220fun CoroutineScope.produceNumbers() = produce<Int> {
221 var x = 1
222 while (true) send(x++) // infinite stream of integers starting from 1
223}
Joffrey Bion3feef052018-12-12 23:57:53 +0100224
Prendota65e6c8c2018-10-17 11:51:08 +0300225fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
226 for (x in numbers) send(x * x)
hadihariri7db55532018-09-15 10:35:08 +0200227}
228```
229
Alexander Prendotacbeef102018-09-27 18:42:04 +0300230</div>
231
Inego69c26df2019-04-21 14:51:25 +0700232> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt).
hadihariri7db55532018-09-15 10:35:08 +0200233
234<!--- TEST
2351
2364
2379
23816
23925
240Done!
241-->
242
243> All functions that create coroutines are defined as extensions on [CoroutineScope],
armansimonyan1352f10c72018-10-11 02:52:58 +0400244so that we can rely on [structured concurrency](https://kotlinlang.org/docs/reference/coroutines/composing-suspending-functions.html#structured-concurrency-with-async) to make
hadihariri7db55532018-09-15 10:35:08 +0200245sure that we don't have lingering global coroutines in our application.
246
247### Prime numbers with pipeline
248
249Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
250of coroutines. We start with an infinite sequence of numbers.
Alexander Prendotacbeef102018-09-27 18:42:04 +0300251
252<div class="sample" markdown="1" theme="idea" data-highlight-only>
hadihariri7db55532018-09-15 10:35:08 +0200253
254```kotlin
255fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
256 var x = start
257 while (true) send(x++) // infinite stream of integers from start
258}
259```
260
Alexander Prendotacbeef102018-09-27 18:42:04 +0300261</div>
262
hadihariri7db55532018-09-15 10:35:08 +0200263The following pipeline stage filters an incoming stream of numbers, removing all the numbers
264that are divisible by the given prime number:
265
Alexander Prendotacbeef102018-09-27 18:42:04 +0300266<div class="sample" markdown="1" theme="idea" data-highlight-only>
267
hadihariri7db55532018-09-15 10:35:08 +0200268```kotlin
269fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
270 for (x in numbers) if (x % prime != 0) send(x)
271}
272```
273
Alexander Prendotacbeef102018-09-27 18:42:04 +0300274</div>
275
hadihariri7db55532018-09-15 10:35:08 +0200276Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
277and launching new pipeline stage for each prime number found:
278
279```
280numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
281```
282
283The following example prints the first ten prime numbers,
284running the whole pipeline in the context of the main thread. Since all the coroutines are launched in
285the scope of the main [runBlocking] coroutine
286we don't have to keep an explicit list of all the coroutines we have started.
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300287We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren]
hadihariri7db55532018-09-15 10:35:08 +0200288extension function to cancel all the children coroutines after we have printed
289the first ten prime numbers.
290
Prendota65e6c8c2018-10-17 11:51:08 +0300291<!--- CLEAR -->
292
293<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300294
hadihariri7db55532018-09-15 10:35:08 +0200295```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300296import kotlinx.coroutines.*
297import kotlinx.coroutines.channels.*
298
299fun main() = runBlocking {
300//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200301 var cur = numbersFrom(2)
Tatsuya Fujisaki9be85a42020-01-12 10:39:33 +0900302 repeat(10) {
hadihariri7db55532018-09-15 10:35:08 +0200303 val prime = cur.receive()
304 println(prime)
305 cur = filter(cur, prime)
306 }
307 coroutineContext.cancelChildren() // cancel all children to let main finish
Prendota65e6c8c2018-10-17 11:51:08 +0300308//sampleEnd
309}
310
311fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
312 var x = start
313 while (true) send(x++) // infinite stream of integers from start
314}
315
316fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
317 for (x in numbers) if (x % prime != 0) send(x)
hadihariri7db55532018-09-15 10:35:08 +0200318}
319```
320
Alexander Prendotacbeef102018-09-27 18:42:04 +0300321</div>
322
Inego69c26df2019-04-21 14:51:25 +0700323> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt).
hadihariri7db55532018-09-15 10:35:08 +0200324
325The output of this code is:
326
327```text
3282
3293
3305
3317
33211
33313
33417
33519
33623
33729
338```
339
340<!--- TEST -->
341
Inegoebe519a2019-04-21 13:22:27 +0700342Note that you can build the same pipeline using
Ahmad El-Melegy153e21f2019-03-23 23:27:36 +0200343[`iterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/iterator.html)
hadihariri7db55532018-09-15 10:35:08 +0200344coroutine builder from the standard library.
Ahmad El-Melegy153e21f2019-03-23 23:27:36 +0200345Replace `produce` with `iterator`, `send` with `yield`, `receive` with `next`,
hadihariri7db55532018-09-15 10:35:08 +0200346`ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either.
347However, the benefit of a pipeline that uses channels as shown above is that it can actually use
348multiple CPU cores if you run it in [Dispatchers.Default] context.
349
350Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
351other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
Ahmad El-Melegy153e21f2019-03-23 23:27:36 +0200352built using `sequence`/`iterator`, because they do not allow arbitrary suspension, unlike
hadihariri7db55532018-09-15 10:35:08 +0200353`produce`, which is fully asynchronous.
354
355### Fan-out
356
357Multiple coroutines may receive from the same channel, distributing work between themselves.
358Let us start with a producer coroutine that is periodically producing integers
359(ten numbers per second):
360
Alexander Prendotacbeef102018-09-27 18:42:04 +0300361<div class="sample" markdown="1" theme="idea" data-highlight-only>
362
hadihariri7db55532018-09-15 10:35:08 +0200363```kotlin
364fun CoroutineScope.produceNumbers() = produce<Int> {
365 var x = 1 // start from 1
366 while (true) {
367 send(x++) // produce next
368 delay(100) // wait 0.1s
369 }
370}
371```
372
Alexander Prendotacbeef102018-09-27 18:42:04 +0300373</div>
374
hadihariri7db55532018-09-15 10:35:08 +0200375Then we can have several processor coroutines. In this example, they just print their id and
376received number:
377
Alexander Prendotacbeef102018-09-27 18:42:04 +0300378<div class="sample" markdown="1" theme="idea" data-highlight-only>
379
hadihariri7db55532018-09-15 10:35:08 +0200380```kotlin
381fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
382 for (msg in channel) {
383 println("Processor #$id received $msg")
384 }
385}
386```
387
Alexander Prendotacbeef102018-09-27 18:42:04 +0300388</div>
389
hadihariri7db55532018-09-15 10:35:08 +0200390Now let us launch five processors and let them work for almost a second. See what happens:
391
Prendota65e6c8c2018-10-17 11:51:08 +0300392<!--- CLEAR -->
393
394<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300395
hadihariri7db55532018-09-15 10:35:08 +0200396```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300397import kotlinx.coroutines.*
398import kotlinx.coroutines.channels.*
399
400fun main() = runBlocking<Unit> {
401//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200402 val producer = produceNumbers()
403 repeat(5) { launchProcessor(it, producer) }
404 delay(950)
405 producer.cancel() // cancel producer coroutine and thus kill them all
Prendota65e6c8c2018-10-17 11:51:08 +0300406//sampleEnd
407}
408
409fun CoroutineScope.produceNumbers() = produce<Int> {
410 var x = 1 // start from 1
411 while (true) {
412 send(x++) // produce next
413 delay(100) // wait 0.1s
414 }
415}
416
417fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
418 for (msg in channel) {
419 println("Processor #$id received $msg")
420 }
hadihariri7db55532018-09-15 10:35:08 +0200421}
422```
423
Alexander Prendotacbeef102018-09-27 18:42:04 +0300424</div>
425
Inego69c26df2019-04-21 14:51:25 +0700426> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt).
hadihariri7db55532018-09-15 10:35:08 +0200427
428The output will be similar to the the following one, albeit the processor ids that receive
429each specific integer may be different:
430
431```
432Processor #2 received 1
433Processor #4 received 2
434Processor #0 received 3
435Processor #1 received 4
436Processor #3 received 5
437Processor #2 received 6
438Processor #4 received 7
439Processor #0 received 8
440Processor #1 received 9
441Processor #3 received 10
442```
443
444<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
445
Inegoebe519a2019-04-21 13:22:27 +0700446Note that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
hadihariri7db55532018-09-15 10:35:08 +0200447over the channel that processor coroutines are doing.
448
449Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code.
450Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor
451coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach`
452always consumes (cancels) the underlying channel on its normal or abnormal completion.
453
454### Fan-in
455
456Multiple coroutines may send to the same channel.
457For example, let us have a channel of strings, and a suspending function that
458repeatedly sends a specified string to this channel with a specified delay:
459
Alexander Prendotacbeef102018-09-27 18:42:04 +0300460<div class="sample" markdown="1" theme="idea" data-highlight-only>
461
hadihariri7db55532018-09-15 10:35:08 +0200462```kotlin
463suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
464 while (true) {
465 delay(time)
466 channel.send(s)
467 }
468}
469```
470
Alexander Prendotacbeef102018-09-27 18:42:04 +0300471</div>
472
hadihariri7db55532018-09-15 10:35:08 +0200473Now, let us see what happens if we launch a couple of coroutines sending strings
474(in this example we launch them in the context of the main thread as main coroutine's children):
475
Prendota65e6c8c2018-10-17 11:51:08 +0300476<!--- CLEAR -->
477
478<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300479
hadihariri7db55532018-09-15 10:35:08 +0200480```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300481import kotlinx.coroutines.*
482import kotlinx.coroutines.channels.*
483
484fun main() = runBlocking {
485//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200486 val channel = Channel<String>()
487 launch { sendString(channel, "foo", 200L) }
488 launch { sendString(channel, "BAR!", 500L) }
489 repeat(6) { // receive first six
490 println(channel.receive())
491 }
492 coroutineContext.cancelChildren() // cancel all children to let main finish
Prendota65e6c8c2018-10-17 11:51:08 +0300493//sampleEnd
494}
495
496suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
497 while (true) {
498 delay(time)
499 channel.send(s)
500 }
hadihariri7db55532018-09-15 10:35:08 +0200501}
502```
503
Alexander Prendotacbeef102018-09-27 18:42:04 +0300504</div>
505
Inego69c26df2019-04-21 14:51:25 +0700506> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt).
hadihariri7db55532018-09-15 10:35:08 +0200507
508The output is:
509
510```text
511foo
512foo
513BAR!
514foo
515foo
516BAR!
517```
518
519<!--- TEST -->
520
521### Buffered channels
522
523The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
524meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
525if receive is invoked first, it is suspended until send is invoked.
526
527Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to
528specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
529similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
530
531Take a look at the behavior of the following code:
532
hadihariri7db55532018-09-15 10:35:08 +0200533
Prendota65e6c8c2018-10-17 11:51:08 +0300534<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300535
hadihariri7db55532018-09-15 10:35:08 +0200536```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300537import kotlinx.coroutines.*
538import kotlinx.coroutines.channels.*
539
540fun main() = runBlocking<Unit> {
541//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200542 val channel = Channel<Int>(4) // create buffered channel
543 val sender = launch { // launch sender coroutine
544 repeat(10) {
545 println("Sending $it") // print before sending each element
546 channel.send(it) // will suspend when buffer is full
547 }
548 }
549 // don't receive anything... just wait....
550 delay(1000)
551 sender.cancel() // cancel sender coroutine
Prendota65e6c8c2018-10-17 11:51:08 +0300552//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200553}
554```
555
Alexander Prendotacbeef102018-09-27 18:42:04 +0300556</div>
557
Inego69c26df2019-04-21 14:51:25 +0700558> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt).
hadihariri7db55532018-09-15 10:35:08 +0200559
560It prints "sending" _five_ times using a buffered channel with capacity of _four_:
561
562```text
563Sending 0
564Sending 1
565Sending 2
566Sending 3
567Sending 4
568```
569
570<!--- TEST -->
571
572The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
573
574### Channels are fair
575
576Send and receive operations to channels are _fair_ with respect to the order of their invocation from
577multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
578gets the element. In the following example two coroutines "ping" and "pong" are
579receiving the "ball" object from the shared "table" channel.
580
hadihariri7db55532018-09-15 10:35:08 +0200581
Prendota65e6c8c2018-10-17 11:51:08 +0300582<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300583
hadihariri7db55532018-09-15 10:35:08 +0200584```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300585import kotlinx.coroutines.*
586import kotlinx.coroutines.channels.*
587
588//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200589data class Ball(var hits: Int)
590
Prendota65e6c8c2018-10-17 11:51:08 +0300591fun main() = runBlocking {
hadihariri7db55532018-09-15 10:35:08 +0200592 val table = Channel<Ball>() // a shared table
593 launch { player("ping", table) }
594 launch { player("pong", table) }
595 table.send(Ball(0)) // serve the ball
596 delay(1000) // delay 1 second
597 coroutineContext.cancelChildren() // game over, cancel them
598}
599
600suspend fun player(name: String, table: Channel<Ball>) {
601 for (ball in table) { // receive the ball in a loop
602 ball.hits++
603 println("$name $ball")
604 delay(300) // wait a bit
605 table.send(ball) // send the ball back
606 }
607}
Prendota65e6c8c2018-10-17 11:51:08 +0300608//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200609```
610
Alexander Prendotacbeef102018-09-27 18:42:04 +0300611</div>
612
Inego69c26df2019-04-21 14:51:25 +0700613> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt).
hadihariri7db55532018-09-15 10:35:08 +0200614
615The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
616coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
617received by the "pong" coroutine, because it was already waiting for it:
618
619```text
620ping Ball(hits=1)
621pong Ball(hits=2)
622ping Ball(hits=3)
623pong Ball(hits=4)
624```
625
626<!--- TEST -->
627
Inegoebe519a2019-04-21 13:22:27 +0700628Note that sometimes channels may produce executions that look unfair due to the nature of the executor
hadihariri7db55532018-09-15 10:35:08 +0200629that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details.
630
631### Ticker channels
632
633Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel.
634Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce]
635pipelines and operators that do windowing and other time-dependent processing.
636Ticker channel can be used in [select] to perform "on tick" action.
637
638To create such channel use a factory method [ticker].
639To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
640
641Now let's see how it works in practice:
642
Alexander Prendotacbeef102018-09-27 18:42:04 +0300643<div class="sample" markdown="1" theme="idea" data-highlight-only>
644
hadihariri7db55532018-09-15 10:35:08 +0200645```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300646import kotlinx.coroutines.*
647import kotlinx.coroutines.channels.*
648
649fun main() = runBlocking<Unit> {
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300650 val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
hadihariri7db55532018-09-15 10:35:08 +0200651 var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
652 println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
653
654 nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
655 println("Next element is not ready in 50 ms: $nextElement")
656
657 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
658 println("Next element is ready in 100 ms: $nextElement")
659
660 // Emulate large consumption delays
661 println("Consumer pauses for 150ms")
662 delay(150)
663 // Next element is available immediately
664 nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
665 println("Next element is available immediately after large consumer delay: $nextElement")
666 // Note that the pause between `receive` calls is taken into account and next element arrives faster
667 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
668 println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
669
670 tickerChannel.cancel() // indicate that no more elements are needed
671}
672```
673
Alexander Prendotacbeef102018-09-27 18:42:04 +0300674</div>
675
Inego69c26df2019-04-21 14:51:25 +0700676> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt).
hadihariri7db55532018-09-15 10:35:08 +0200677
678It prints following lines:
679
680```text
681Initial element is available immediately: kotlin.Unit
682Next element is not ready in 50 ms: null
683Next element is ready in 100 ms: kotlin.Unit
684Consumer pauses for 150ms
685Next element is available immediately after large consumer delay: kotlin.Unit
686Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
687```
688
689<!--- TEST -->
690
691Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element
692delay if a pause occurs, trying to maintain a fixed rate of produced elements.
693
694Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed
695delay between elements.
696
697
698<!--- MODULE kotlinx-coroutines-core -->
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300699<!--- INDEX kotlinx.coroutines -->
700[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
701[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
Vsevolod Tolstopyatov706e3932018-10-13 15:40:32 +0300702[kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/kotlin.coroutines.-coroutine-context/cancel-children.html
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300703[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
704<!--- INDEX kotlinx.coroutines.channels -->
705[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
706[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
707[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
708[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html
709[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
710[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
711[Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html
712[ticker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html
713[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
714[TickerMode.FIXED_DELAY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y.html
715<!--- INDEX kotlinx.coroutines.selects -->
716[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
hadihariri7db55532018-09-15 10:35:08 +0200717<!--- END -->