blob: 2acb2424fe7a6a37b80d6c223475f82263d73a40 [file] [log] [blame] [view]
hadihariri7db55532018-09-15 10:35:08 +02001<!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt
2/*
3 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
4 */
5
6// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
Roman Elizarov0950dfa2018-07-13 10:33:25 +03007package kotlinx.coroutines.guide.$$1$$2
hadihariri7db55532018-09-15 10:35:08 +02008-->
Vsevolod Tolstopyatove50a0fa2019-01-28 11:34:24 +03009<!--- KNIT ../kotlinx-coroutines-core/jvm/test/guide/.*\.kt -->
10<!--- TEST_OUT ../kotlinx-coroutines-core/jvm/test/guide/test/ChannelsGuideTest.kt
hadihariri7db55532018-09-15 10:35:08 +020011// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
Roman Elizarov0950dfa2018-07-13 10:33:25 +030012package kotlinx.coroutines.guide.test
hadihariri7db55532018-09-15 10:35:08 +020013
14import org.junit.Test
15
16class ChannelsGuideTest {
17-->
Prendotab8a559d2018-11-30 16:24:23 +030018**Table of contents**
hadihariri7db55532018-09-15 10:35:08 +020019
20<!--- TOC -->
21
Denys Ma97fd432019-06-05 03:10:34 +030022* [Channels](#channels)
hadihariri7db55532018-09-15 10:35:08 +020023 * [Channel basics](#channel-basics)
24 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
25 * [Building channel producers](#building-channel-producers)
26 * [Pipelines](#pipelines)
27 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
28 * [Fan-out](#fan-out)
29 * [Fan-in](#fan-in)
30 * [Buffered channels](#buffered-channels)
31 * [Channels are fair](#channels-are-fair)
32 * [Ticker channels](#ticker-channels)
33
34<!--- END_TOC -->
35
Denys Ma97fd432019-06-05 03:10:34 +030036## Channels
hadihariri7db55532018-09-15 10:35:08 +020037
38Deferred values provide a convenient way to transfer a single value between coroutines.
39Channels provide a way to transfer a stream of values.
40
hadihariri7db55532018-09-15 10:35:08 +020041### Channel basics
42
43A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
44instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
45a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
46
Alexander Prendotacbeef102018-09-27 18:42:04 +030047
Prendota65e6c8c2018-10-17 11:51:08 +030048<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +030049
hadihariri7db55532018-09-15 10:35:08 +020050```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +030051import kotlinx.coroutines.*
52import kotlinx.coroutines.channels.*
53
54fun main() = runBlocking {
55//sampleStart
hadihariri7db55532018-09-15 10:35:08 +020056 val channel = Channel<Int>()
57 launch {
58 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
59 for (x in 1..5) channel.send(x * x)
60 }
61 // here we print five received integers:
62 repeat(5) { println(channel.receive()) }
63 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +030064//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +020065}
66```
67
Alexander Prendotacbeef102018-09-27 18:42:04 +030068</div>
69
Inego69c26df2019-04-21 14:51:25 +070070> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt).
hadihariri7db55532018-09-15 10:35:08 +020071
72The output of this code is:
73
74```text
751
764
779
7816
7925
80Done!
81```
82
83<!--- TEST -->
84
85### Closing and iteration over channels
86
87Unlike a queue, a channel can be closed to indicate that no more elements are coming.
88On the receiver side it is convenient to use a regular `for` loop to receive elements
89from the channel.
90
91Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
92The iteration stops as soon as this close token is received, so there is a guarantee
93that all previously sent elements before the close are received:
94
Prendota65e6c8c2018-10-17 11:51:08 +030095<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +030096
hadihariri7db55532018-09-15 10:35:08 +020097```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +030098import kotlinx.coroutines.*
99import kotlinx.coroutines.channels.*
100
101fun main() = runBlocking {
102//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200103 val channel = Channel<Int>()
104 launch {
105 for (x in 1..5) channel.send(x * x)
106 channel.close() // we're done sending
107 }
108 // here we print received values using `for` loop (until the channel is closed)
109 for (y in channel) println(y)
110 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +0300111//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200112}
113```
114
Alexander Prendotacbeef102018-09-27 18:42:04 +0300115</div>
116
Inego69c26df2019-04-21 14:51:25 +0700117> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt).
hadihariri7db55532018-09-15 10:35:08 +0200118
119<!--- TEST
1201
1214
1229
12316
12425
125Done!
126-->
127
128### Building channel producers
129
130The pattern where a coroutine is producing a sequence of elements is quite common.
131This is a part of _producer-consumer_ pattern that is often found in concurrent code.
132You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
133to common sense that results must be returned from functions.
134
arman simonyan02b33022018-10-10 22:06:04 +0400135There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side,
hadihariri7db55532018-09-15 10:35:08 +0200136and an extension function [consumeEach], that replaces a `for` loop on the consumer side:
137
Prendota65e6c8c2018-10-17 11:51:08 +0300138<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300139
hadihariri7db55532018-09-15 10:35:08 +0200140```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300141import kotlinx.coroutines.*
142import kotlinx.coroutines.channels.*
143
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300144fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
hadihariri7db55532018-09-15 10:35:08 +0200145 for (x in 1..5) send(x * x)
146}
147
Prendota65e6c8c2018-10-17 11:51:08 +0300148fun main() = runBlocking {
149//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200150 val squares = produceSquares()
151 squares.consumeEach { println(it) }
152 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +0300153//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200154}
155```
156
Alexander Prendotacbeef102018-09-27 18:42:04 +0300157</div>
158
Inego69c26df2019-04-21 14:51:25 +0700159> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt).
hadihariri7db55532018-09-15 10:35:08 +0200160
161<!--- TEST
1621
1634
1649
16516
16625
167Done!
168-->
169
170### Pipelines
171
172A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
173
Alexander Prendotacbeef102018-09-27 18:42:04 +0300174<div class="sample" markdown="1" theme="idea" data-highlight-only>
175
hadihariri7db55532018-09-15 10:35:08 +0200176```kotlin
177fun CoroutineScope.produceNumbers() = produce<Int> {
178 var x = 1
179 while (true) send(x++) // infinite stream of integers starting from 1
180}
181```
182
Alexander Prendotacbeef102018-09-27 18:42:04 +0300183</div>
184
hadihariri7db55532018-09-15 10:35:08 +0200185And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
armansimonyan1352f10c72018-10-11 02:52:58 +0400186In the example below, the numbers are just squared:
hadihariri7db55532018-09-15 10:35:08 +0200187
Alexander Prendotacbeef102018-09-27 18:42:04 +0300188<div class="sample" markdown="1" theme="idea" data-highlight-only>
189
hadihariri7db55532018-09-15 10:35:08 +0200190```kotlin
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300191fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
hadihariri7db55532018-09-15 10:35:08 +0200192 for (x in numbers) send(x * x)
193}
194```
195
Alexander Prendotacbeef102018-09-27 18:42:04 +0300196</div>
197
hadihariri7db55532018-09-15 10:35:08 +0200198The main code starts and connects the whole pipeline:
199
Prendota65e6c8c2018-10-17 11:51:08 +0300200<!--- CLEAR -->
201
202<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300203
hadihariri7db55532018-09-15 10:35:08 +0200204```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300205import kotlinx.coroutines.*
206import kotlinx.coroutines.channels.*
207
208fun main() = runBlocking {
209//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200210 val numbers = produceNumbers() // produces integers from 1 and on
211 val squares = square(numbers) // squares integers
212 for (i in 1..5) println(squares.receive()) // print first five
213 println("Done!") // we are done
214 coroutineContext.cancelChildren() // cancel children coroutines
Prendota65e6c8c2018-10-17 11:51:08 +0300215//sampleEnd
216}
217
Prendota65e6c8c2018-10-17 11:51:08 +0300218fun CoroutineScope.produceNumbers() = produce<Int> {
219 var x = 1
220 while (true) send(x++) // infinite stream of integers starting from 1
221}
Joffrey Bion3feef052018-12-12 23:57:53 +0100222
Prendota65e6c8c2018-10-17 11:51:08 +0300223fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
224 for (x in numbers) send(x * x)
hadihariri7db55532018-09-15 10:35:08 +0200225}
226```
227
Alexander Prendotacbeef102018-09-27 18:42:04 +0300228</div>
229
Inego69c26df2019-04-21 14:51:25 +0700230> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt).
hadihariri7db55532018-09-15 10:35:08 +0200231
232<!--- TEST
2331
2344
2359
23616
23725
238Done!
239-->
240
241> All functions that create coroutines are defined as extensions on [CoroutineScope],
armansimonyan1352f10c72018-10-11 02:52:58 +0400242so that we can rely on [structured concurrency](https://kotlinlang.org/docs/reference/coroutines/composing-suspending-functions.html#structured-concurrency-with-async) to make
hadihariri7db55532018-09-15 10:35:08 +0200243sure that we don't have lingering global coroutines in our application.
244
245### Prime numbers with pipeline
246
247Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
248of coroutines. We start with an infinite sequence of numbers.
Alexander Prendotacbeef102018-09-27 18:42:04 +0300249
250<div class="sample" markdown="1" theme="idea" data-highlight-only>
hadihariri7db55532018-09-15 10:35:08 +0200251
252```kotlin
253fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
254 var x = start
255 while (true) send(x++) // infinite stream of integers from start
256}
257```
258
Alexander Prendotacbeef102018-09-27 18:42:04 +0300259</div>
260
hadihariri7db55532018-09-15 10:35:08 +0200261The following pipeline stage filters an incoming stream of numbers, removing all the numbers
262that are divisible by the given prime number:
263
Alexander Prendotacbeef102018-09-27 18:42:04 +0300264<div class="sample" markdown="1" theme="idea" data-highlight-only>
265
hadihariri7db55532018-09-15 10:35:08 +0200266```kotlin
267fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
268 for (x in numbers) if (x % prime != 0) send(x)
269}
270```
271
Alexander Prendotacbeef102018-09-27 18:42:04 +0300272</div>
273
hadihariri7db55532018-09-15 10:35:08 +0200274Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
275and launching new pipeline stage for each prime number found:
276
277```
278numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
279```
280
281The following example prints the first ten prime numbers,
282running the whole pipeline in the context of the main thread. Since all the coroutines are launched in
283the scope of the main [runBlocking] coroutine
284we don't have to keep an explicit list of all the coroutines we have started.
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300285We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren]
hadihariri7db55532018-09-15 10:35:08 +0200286extension function to cancel all the children coroutines after we have printed
287the first ten prime numbers.
288
Prendota65e6c8c2018-10-17 11:51:08 +0300289<!--- CLEAR -->
290
291<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300292
hadihariri7db55532018-09-15 10:35:08 +0200293```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300294import kotlinx.coroutines.*
295import kotlinx.coroutines.channels.*
296
297fun main() = runBlocking {
298//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200299 var cur = numbersFrom(2)
300 for (i in 1..10) {
301 val prime = cur.receive()
302 println(prime)
303 cur = filter(cur, prime)
304 }
305 coroutineContext.cancelChildren() // cancel all children to let main finish
Prendota65e6c8c2018-10-17 11:51:08 +0300306//sampleEnd
307}
308
309fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
310 var x = start
311 while (true) send(x++) // infinite stream of integers from start
312}
313
314fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
315 for (x in numbers) if (x % prime != 0) send(x)
hadihariri7db55532018-09-15 10:35:08 +0200316}
317```
318
Alexander Prendotacbeef102018-09-27 18:42:04 +0300319</div>
320
Inego69c26df2019-04-21 14:51:25 +0700321> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt).
hadihariri7db55532018-09-15 10:35:08 +0200322
323The output of this code is:
324
325```text
3262
3273
3285
3297
33011
33113
33217
33319
33423
33529
336```
337
338<!--- TEST -->
339
Inegoebe519a2019-04-21 13:22:27 +0700340Note that you can build the same pipeline using
Ahmad El-Melegy153e21f2019-03-23 23:27:36 +0200341[`iterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/iterator.html)
hadihariri7db55532018-09-15 10:35:08 +0200342coroutine builder from the standard library.
Ahmad El-Melegy153e21f2019-03-23 23:27:36 +0200343Replace `produce` with `iterator`, `send` with `yield`, `receive` with `next`,
hadihariri7db55532018-09-15 10:35:08 +0200344`ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either.
345However, the benefit of a pipeline that uses channels as shown above is that it can actually use
346multiple CPU cores if you run it in [Dispatchers.Default] context.
347
348Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
349other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
Ahmad El-Melegy153e21f2019-03-23 23:27:36 +0200350built using `sequence`/`iterator`, because they do not allow arbitrary suspension, unlike
hadihariri7db55532018-09-15 10:35:08 +0200351`produce`, which is fully asynchronous.
352
353### Fan-out
354
355Multiple coroutines may receive from the same channel, distributing work between themselves.
356Let us start with a producer coroutine that is periodically producing integers
357(ten numbers per second):
358
Alexander Prendotacbeef102018-09-27 18:42:04 +0300359<div class="sample" markdown="1" theme="idea" data-highlight-only>
360
hadihariri7db55532018-09-15 10:35:08 +0200361```kotlin
362fun CoroutineScope.produceNumbers() = produce<Int> {
363 var x = 1 // start from 1
364 while (true) {
365 send(x++) // produce next
366 delay(100) // wait 0.1s
367 }
368}
369```
370
Alexander Prendotacbeef102018-09-27 18:42:04 +0300371</div>
372
hadihariri7db55532018-09-15 10:35:08 +0200373Then we can have several processor coroutines. In this example, they just print their id and
374received number:
375
Alexander Prendotacbeef102018-09-27 18:42:04 +0300376<div class="sample" markdown="1" theme="idea" data-highlight-only>
377
hadihariri7db55532018-09-15 10:35:08 +0200378```kotlin
379fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
380 for (msg in channel) {
381 println("Processor #$id received $msg")
382 }
383}
384```
385
Alexander Prendotacbeef102018-09-27 18:42:04 +0300386</div>
387
hadihariri7db55532018-09-15 10:35:08 +0200388Now let us launch five processors and let them work for almost a second. See what happens:
389
Prendota65e6c8c2018-10-17 11:51:08 +0300390<!--- CLEAR -->
391
392<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300393
hadihariri7db55532018-09-15 10:35:08 +0200394```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300395import kotlinx.coroutines.*
396import kotlinx.coroutines.channels.*
397
398fun main() = runBlocking<Unit> {
399//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200400 val producer = produceNumbers()
401 repeat(5) { launchProcessor(it, producer) }
402 delay(950)
403 producer.cancel() // cancel producer coroutine and thus kill them all
Prendota65e6c8c2018-10-17 11:51:08 +0300404//sampleEnd
405}
406
407fun CoroutineScope.produceNumbers() = produce<Int> {
408 var x = 1 // start from 1
409 while (true) {
410 send(x++) // produce next
411 delay(100) // wait 0.1s
412 }
413}
414
415fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
416 for (msg in channel) {
417 println("Processor #$id received $msg")
418 }
hadihariri7db55532018-09-15 10:35:08 +0200419}
420```
421
Alexander Prendotacbeef102018-09-27 18:42:04 +0300422</div>
423
Inego69c26df2019-04-21 14:51:25 +0700424> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt).
hadihariri7db55532018-09-15 10:35:08 +0200425
426The output will be similar to the the following one, albeit the processor ids that receive
427each specific integer may be different:
428
429```
430Processor #2 received 1
431Processor #4 received 2
432Processor #0 received 3
433Processor #1 received 4
434Processor #3 received 5
435Processor #2 received 6
436Processor #4 received 7
437Processor #0 received 8
438Processor #1 received 9
439Processor #3 received 10
440```
441
442<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
443
Inegoebe519a2019-04-21 13:22:27 +0700444Note that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
hadihariri7db55532018-09-15 10:35:08 +0200445over the channel that processor coroutines are doing.
446
447Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code.
448Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor
449coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach`
450always consumes (cancels) the underlying channel on its normal or abnormal completion.
451
452### Fan-in
453
454Multiple coroutines may send to the same channel.
455For example, let us have a channel of strings, and a suspending function that
456repeatedly sends a specified string to this channel with a specified delay:
457
Alexander Prendotacbeef102018-09-27 18:42:04 +0300458<div class="sample" markdown="1" theme="idea" data-highlight-only>
459
hadihariri7db55532018-09-15 10:35:08 +0200460```kotlin
461suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
462 while (true) {
463 delay(time)
464 channel.send(s)
465 }
466}
467```
468
Alexander Prendotacbeef102018-09-27 18:42:04 +0300469</div>
470
hadihariri7db55532018-09-15 10:35:08 +0200471Now, let us see what happens if we launch a couple of coroutines sending strings
472(in this example we launch them in the context of the main thread as main coroutine's children):
473
Prendota65e6c8c2018-10-17 11:51:08 +0300474<!--- CLEAR -->
475
476<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300477
hadihariri7db55532018-09-15 10:35:08 +0200478```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300479import kotlinx.coroutines.*
480import kotlinx.coroutines.channels.*
481
482fun main() = runBlocking {
483//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200484 val channel = Channel<String>()
485 launch { sendString(channel, "foo", 200L) }
486 launch { sendString(channel, "BAR!", 500L) }
487 repeat(6) { // receive first six
488 println(channel.receive())
489 }
490 coroutineContext.cancelChildren() // cancel all children to let main finish
Prendota65e6c8c2018-10-17 11:51:08 +0300491//sampleEnd
492}
493
494suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
495 while (true) {
496 delay(time)
497 channel.send(s)
498 }
hadihariri7db55532018-09-15 10:35:08 +0200499}
500```
501
Alexander Prendotacbeef102018-09-27 18:42:04 +0300502</div>
503
Inego69c26df2019-04-21 14:51:25 +0700504> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt).
hadihariri7db55532018-09-15 10:35:08 +0200505
506The output is:
507
508```text
509foo
510foo
511BAR!
512foo
513foo
514BAR!
515```
516
517<!--- TEST -->
518
519### Buffered channels
520
521The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
522meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
523if receive is invoked first, it is suspended until send is invoked.
524
525Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to
526specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
527similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
528
529Take a look at the behavior of the following code:
530
hadihariri7db55532018-09-15 10:35:08 +0200531
Prendota65e6c8c2018-10-17 11:51:08 +0300532<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300533
hadihariri7db55532018-09-15 10:35:08 +0200534```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300535import kotlinx.coroutines.*
536import kotlinx.coroutines.channels.*
537
538fun main() = runBlocking<Unit> {
539//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200540 val channel = Channel<Int>(4) // create buffered channel
541 val sender = launch { // launch sender coroutine
542 repeat(10) {
543 println("Sending $it") // print before sending each element
544 channel.send(it) // will suspend when buffer is full
545 }
546 }
547 // don't receive anything... just wait....
548 delay(1000)
549 sender.cancel() // cancel sender coroutine
Prendota65e6c8c2018-10-17 11:51:08 +0300550//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200551}
552```
553
Alexander Prendotacbeef102018-09-27 18:42:04 +0300554</div>
555
Inego69c26df2019-04-21 14:51:25 +0700556> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt).
hadihariri7db55532018-09-15 10:35:08 +0200557
558It prints "sending" _five_ times using a buffered channel with capacity of _four_:
559
560```text
561Sending 0
562Sending 1
563Sending 2
564Sending 3
565Sending 4
566```
567
568<!--- TEST -->
569
570The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
571
572### Channels are fair
573
574Send and receive operations to channels are _fair_ with respect to the order of their invocation from
575multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
576gets the element. In the following example two coroutines "ping" and "pong" are
577receiving the "ball" object from the shared "table" channel.
578
hadihariri7db55532018-09-15 10:35:08 +0200579
Prendota65e6c8c2018-10-17 11:51:08 +0300580<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300581
hadihariri7db55532018-09-15 10:35:08 +0200582```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300583import kotlinx.coroutines.*
584import kotlinx.coroutines.channels.*
585
586//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200587data class Ball(var hits: Int)
588
Prendota65e6c8c2018-10-17 11:51:08 +0300589fun main() = runBlocking {
hadihariri7db55532018-09-15 10:35:08 +0200590 val table = Channel<Ball>() // a shared table
591 launch { player("ping", table) }
592 launch { player("pong", table) }
593 table.send(Ball(0)) // serve the ball
594 delay(1000) // delay 1 second
595 coroutineContext.cancelChildren() // game over, cancel them
596}
597
598suspend fun player(name: String, table: Channel<Ball>) {
599 for (ball in table) { // receive the ball in a loop
600 ball.hits++
601 println("$name $ball")
602 delay(300) // wait a bit
603 table.send(ball) // send the ball back
604 }
605}
Prendota65e6c8c2018-10-17 11:51:08 +0300606//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200607```
608
Alexander Prendotacbeef102018-09-27 18:42:04 +0300609</div>
610
Inego69c26df2019-04-21 14:51:25 +0700611> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt).
hadihariri7db55532018-09-15 10:35:08 +0200612
613The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
614coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
615received by the "pong" coroutine, because it was already waiting for it:
616
617```text
618ping Ball(hits=1)
619pong Ball(hits=2)
620ping Ball(hits=3)
621pong Ball(hits=4)
622```
623
624<!--- TEST -->
625
Inegoebe519a2019-04-21 13:22:27 +0700626Note that sometimes channels may produce executions that look unfair due to the nature of the executor
hadihariri7db55532018-09-15 10:35:08 +0200627that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details.
628
629### Ticker channels
630
631Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel.
632Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce]
633pipelines and operators that do windowing and other time-dependent processing.
634Ticker channel can be used in [select] to perform "on tick" action.
635
636To create such channel use a factory method [ticker].
637To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
638
639Now let's see how it works in practice:
640
Alexander Prendotacbeef102018-09-27 18:42:04 +0300641<div class="sample" markdown="1" theme="idea" data-highlight-only>
642
hadihariri7db55532018-09-15 10:35:08 +0200643```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300644import kotlinx.coroutines.*
645import kotlinx.coroutines.channels.*
646
647fun main() = runBlocking<Unit> {
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300648 val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
hadihariri7db55532018-09-15 10:35:08 +0200649 var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
650 println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
651
652 nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
653 println("Next element is not ready in 50 ms: $nextElement")
654
655 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
656 println("Next element is ready in 100 ms: $nextElement")
657
658 // Emulate large consumption delays
659 println("Consumer pauses for 150ms")
660 delay(150)
661 // Next element is available immediately
662 nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
663 println("Next element is available immediately after large consumer delay: $nextElement")
664 // Note that the pause between `receive` calls is taken into account and next element arrives faster
665 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
666 println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
667
668 tickerChannel.cancel() // indicate that no more elements are needed
669}
670```
671
Alexander Prendotacbeef102018-09-27 18:42:04 +0300672</div>
673
Inego69c26df2019-04-21 14:51:25 +0700674> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt).
hadihariri7db55532018-09-15 10:35:08 +0200675
676It prints following lines:
677
678```text
679Initial element is available immediately: kotlin.Unit
680Next element is not ready in 50 ms: null
681Next element is ready in 100 ms: kotlin.Unit
682Consumer pauses for 150ms
683Next element is available immediately after large consumer delay: kotlin.Unit
684Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
685```
686
687<!--- TEST -->
688
689Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element
690delay if a pause occurs, trying to maintain a fixed rate of produced elements.
691
692Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed
693delay between elements.
694
695
696<!--- MODULE kotlinx-coroutines-core -->
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300697<!--- INDEX kotlinx.coroutines -->
698[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
699[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
Vsevolod Tolstopyatov706e3932018-10-13 15:40:32 +0300700[kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/kotlin.coroutines.-coroutine-context/cancel-children.html
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300701[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
702<!--- INDEX kotlinx.coroutines.channels -->
703[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
704[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
705[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
706[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html
707[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
708[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
709[Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html
710[ticker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html
711[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
712[TickerMode.FIXED_DELAY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y.html
713<!--- INDEX kotlinx.coroutines.selects -->
714[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
hadihariri7db55532018-09-15 10:35:08 +0200715<!--- END -->