blob: 3c1d3bfb891c45461c994b8887f3118950d2bc5c [file] [log] [blame] [view]
hadihariri7db55532018-09-15 10:35:08 +02001<!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt
2/*
3 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
4 */
5
6// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
Roman Elizarov0950dfa2018-07-13 10:33:25 +03007package kotlinx.coroutines.guide.$$1$$2
hadihariri7db55532018-09-15 10:35:08 +02008-->
Vsevolod Tolstopyatove50a0fa2019-01-28 11:34:24 +03009<!--- KNIT ../kotlinx-coroutines-core/jvm/test/guide/.*\.kt -->
10<!--- TEST_OUT ../kotlinx-coroutines-core/jvm/test/guide/test/ChannelsGuideTest.kt
hadihariri7db55532018-09-15 10:35:08 +020011// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
Roman Elizarov0950dfa2018-07-13 10:33:25 +030012package kotlinx.coroutines.guide.test
hadihariri7db55532018-09-15 10:35:08 +020013
14import org.junit.Test
15
16class ChannelsGuideTest {
17-->
Prendotab8a559d2018-11-30 16:24:23 +030018**Table of contents**
hadihariri7db55532018-09-15 10:35:08 +020019
20<!--- TOC -->
21
Vsevolod Tolstopyatovb590aa32018-09-27 18:34:05 +030022* [Channels (experimental)](#channels-experimental)
hadihariri7db55532018-09-15 10:35:08 +020023 * [Channel basics](#channel-basics)
24 * [Closing and iteration over channels](#closing-and-iteration-over-channels)
25 * [Building channel producers](#building-channel-producers)
26 * [Pipelines](#pipelines)
27 * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
28 * [Fan-out](#fan-out)
29 * [Fan-in](#fan-in)
30 * [Buffered channels](#buffered-channels)
31 * [Channels are fair](#channels-are-fair)
32 * [Ticker channels](#ticker-channels)
33
34<!--- END_TOC -->
35
hadihariri7db55532018-09-15 10:35:08 +020036## Channels (experimental)
37
38Deferred values provide a convenient way to transfer a single value between coroutines.
39Channels provide a way to transfer a stream of values.
40
41> Channels are an experimental feature of `kotlinx.coroutines`. Their API is expected to
42evolve in the upcoming updates of the `kotlinx.coroutines` library with potentially
43breaking changes.
44
45### Channel basics
46
47A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
48instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
49a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
50
Alexander Prendotacbeef102018-09-27 18:42:04 +030051
Prendota65e6c8c2018-10-17 11:51:08 +030052<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +030053
hadihariri7db55532018-09-15 10:35:08 +020054```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +030055import kotlinx.coroutines.*
56import kotlinx.coroutines.channels.*
57
58fun main() = runBlocking {
59//sampleStart
hadihariri7db55532018-09-15 10:35:08 +020060 val channel = Channel<Int>()
61 launch {
62 // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
63 for (x in 1..5) channel.send(x * x)
64 }
65 // here we print five received integers:
66 repeat(5) { println(channel.receive()) }
67 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +030068//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +020069}
70```
71
Alexander Prendotacbeef102018-09-27 18:42:04 +030072</div>
73
Vsevolod Tolstopyatove50a0fa2019-01-28 11:34:24 +030074> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt)
hadihariri7db55532018-09-15 10:35:08 +020075
76The output of this code is:
77
78```text
791
804
819
8216
8325
84Done!
85```
86
87<!--- TEST -->
88
89### Closing and iteration over channels
90
91Unlike a queue, a channel can be closed to indicate that no more elements are coming.
92On the receiver side it is convenient to use a regular `for` loop to receive elements
93from the channel.
94
95Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
96The iteration stops as soon as this close token is received, so there is a guarantee
97that all previously sent elements before the close are received:
98
Prendota65e6c8c2018-10-17 11:51:08 +030099<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300100
hadihariri7db55532018-09-15 10:35:08 +0200101```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300102import kotlinx.coroutines.*
103import kotlinx.coroutines.channels.*
104
105fun main() = runBlocking {
106//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200107 val channel = Channel<Int>()
108 launch {
109 for (x in 1..5) channel.send(x * x)
110 channel.close() // we're done sending
111 }
112 // here we print received values using `for` loop (until the channel is closed)
113 for (y in channel) println(y)
114 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +0300115//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200116}
117```
118
Alexander Prendotacbeef102018-09-27 18:42:04 +0300119</div>
120
Vsevolod Tolstopyatove50a0fa2019-01-28 11:34:24 +0300121> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt)
hadihariri7db55532018-09-15 10:35:08 +0200122
123<!--- TEST
1241
1254
1269
12716
12825
129Done!
130-->
131
132### Building channel producers
133
134The pattern where a coroutine is producing a sequence of elements is quite common.
135This is a part of _producer-consumer_ pattern that is often found in concurrent code.
136You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
137to common sense that results must be returned from functions.
138
arman simonyan02b33022018-10-10 22:06:04 +0400139There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side,
hadihariri7db55532018-09-15 10:35:08 +0200140and an extension function [consumeEach], that replaces a `for` loop on the consumer side:
141
Prendota65e6c8c2018-10-17 11:51:08 +0300142<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300143
hadihariri7db55532018-09-15 10:35:08 +0200144```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300145import kotlinx.coroutines.*
146import kotlinx.coroutines.channels.*
147
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300148fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
hadihariri7db55532018-09-15 10:35:08 +0200149 for (x in 1..5) send(x * x)
150}
151
Prendota65e6c8c2018-10-17 11:51:08 +0300152fun main() = runBlocking {
153//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200154 val squares = produceSquares()
155 squares.consumeEach { println(it) }
156 println("Done!")
Prendota65e6c8c2018-10-17 11:51:08 +0300157//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200158}
159```
160
Alexander Prendotacbeef102018-09-27 18:42:04 +0300161</div>
162
Vsevolod Tolstopyatove50a0fa2019-01-28 11:34:24 +0300163> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt)
hadihariri7db55532018-09-15 10:35:08 +0200164
165<!--- TEST
1661
1674
1689
16916
17025
171Done!
172-->
173
174### Pipelines
175
176A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
177
Alexander Prendotacbeef102018-09-27 18:42:04 +0300178<div class="sample" markdown="1" theme="idea" data-highlight-only>
179
hadihariri7db55532018-09-15 10:35:08 +0200180```kotlin
181fun CoroutineScope.produceNumbers() = produce<Int> {
182 var x = 1
183 while (true) send(x++) // infinite stream of integers starting from 1
184}
185```
186
Alexander Prendotacbeef102018-09-27 18:42:04 +0300187</div>
188
hadihariri7db55532018-09-15 10:35:08 +0200189And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
armansimonyan1352f10c72018-10-11 02:52:58 +0400190In the example below, the numbers are just squared:
hadihariri7db55532018-09-15 10:35:08 +0200191
Alexander Prendotacbeef102018-09-27 18:42:04 +0300192<div class="sample" markdown="1" theme="idea" data-highlight-only>
193
hadihariri7db55532018-09-15 10:35:08 +0200194```kotlin
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300195fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
hadihariri7db55532018-09-15 10:35:08 +0200196 for (x in numbers) send(x * x)
197}
198```
199
Alexander Prendotacbeef102018-09-27 18:42:04 +0300200</div>
201
hadihariri7db55532018-09-15 10:35:08 +0200202The main code starts and connects the whole pipeline:
203
Prendota65e6c8c2018-10-17 11:51:08 +0300204<!--- CLEAR -->
205
206<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300207
hadihariri7db55532018-09-15 10:35:08 +0200208```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300209import kotlinx.coroutines.*
210import kotlinx.coroutines.channels.*
211
212fun main() = runBlocking {
213//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200214 val numbers = produceNumbers() // produces integers from 1 and on
215 val squares = square(numbers) // squares integers
216 for (i in 1..5) println(squares.receive()) // print first five
217 println("Done!") // we are done
218 coroutineContext.cancelChildren() // cancel children coroutines
Prendota65e6c8c2018-10-17 11:51:08 +0300219//sampleEnd
220}
221
Prendota65e6c8c2018-10-17 11:51:08 +0300222fun CoroutineScope.produceNumbers() = produce<Int> {
223 var x = 1
224 while (true) send(x++) // infinite stream of integers starting from 1
225}
Joffrey Bion3feef052018-12-12 23:57:53 +0100226
Prendota65e6c8c2018-10-17 11:51:08 +0300227fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
228 for (x in numbers) send(x * x)
hadihariri7db55532018-09-15 10:35:08 +0200229}
230```
231
Alexander Prendotacbeef102018-09-27 18:42:04 +0300232</div>
233
Vsevolod Tolstopyatove50a0fa2019-01-28 11:34:24 +0300234> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt)
hadihariri7db55532018-09-15 10:35:08 +0200235
236<!--- TEST
2371
2384
2399
24016
24125
242Done!
243-->
244
245> All functions that create coroutines are defined as extensions on [CoroutineScope],
armansimonyan1352f10c72018-10-11 02:52:58 +0400246so that we can rely on [structured concurrency](https://kotlinlang.org/docs/reference/coroutines/composing-suspending-functions.html#structured-concurrency-with-async) to make
hadihariri7db55532018-09-15 10:35:08 +0200247sure that we don't have lingering global coroutines in our application.
248
249### Prime numbers with pipeline
250
251Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
252of coroutines. We start with an infinite sequence of numbers.
Alexander Prendotacbeef102018-09-27 18:42:04 +0300253
254<div class="sample" markdown="1" theme="idea" data-highlight-only>
hadihariri7db55532018-09-15 10:35:08 +0200255
256```kotlin
257fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
258 var x = start
259 while (true) send(x++) // infinite stream of integers from start
260}
261```
262
Alexander Prendotacbeef102018-09-27 18:42:04 +0300263</div>
264
hadihariri7db55532018-09-15 10:35:08 +0200265The following pipeline stage filters an incoming stream of numbers, removing all the numbers
266that are divisible by the given prime number:
267
Alexander Prendotacbeef102018-09-27 18:42:04 +0300268<div class="sample" markdown="1" theme="idea" data-highlight-only>
269
hadihariri7db55532018-09-15 10:35:08 +0200270```kotlin
271fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
272 for (x in numbers) if (x % prime != 0) send(x)
273}
274```
275
Alexander Prendotacbeef102018-09-27 18:42:04 +0300276</div>
277
hadihariri7db55532018-09-15 10:35:08 +0200278Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
279and launching new pipeline stage for each prime number found:
280
281```
282numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
283```
284
285The following example prints the first ten prime numbers,
286running the whole pipeline in the context of the main thread. Since all the coroutines are launched in
287the scope of the main [runBlocking] coroutine
288we don't have to keep an explicit list of all the coroutines we have started.
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300289We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren]
hadihariri7db55532018-09-15 10:35:08 +0200290extension function to cancel all the children coroutines after we have printed
291the first ten prime numbers.
292
Prendota65e6c8c2018-10-17 11:51:08 +0300293<!--- CLEAR -->
294
295<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300296
hadihariri7db55532018-09-15 10:35:08 +0200297```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300298import kotlinx.coroutines.*
299import kotlinx.coroutines.channels.*
300
301fun main() = runBlocking {
302//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200303 var cur = numbersFrom(2)
304 for (i in 1..10) {
305 val prime = cur.receive()
306 println(prime)
307 cur = filter(cur, prime)
308 }
309 coroutineContext.cancelChildren() // cancel all children to let main finish
Prendota65e6c8c2018-10-17 11:51:08 +0300310//sampleEnd
311}
312
313fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
314 var x = start
315 while (true) send(x++) // infinite stream of integers from start
316}
317
318fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
319 for (x in numbers) if (x % prime != 0) send(x)
hadihariri7db55532018-09-15 10:35:08 +0200320}
321```
322
Alexander Prendotacbeef102018-09-27 18:42:04 +0300323</div>
324
Vsevolod Tolstopyatove50a0fa2019-01-28 11:34:24 +0300325> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt)
hadihariri7db55532018-09-15 10:35:08 +0200326
327The output of this code is:
328
329```text
3302
3313
3325
3337
33411
33513
33617
33719
33823
33929
340```
341
342<!--- TEST -->
343
344Note, that you can build the same pipeline using
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300345[`buildIterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/build-iterator.html)
hadihariri7db55532018-09-15 10:35:08 +0200346coroutine builder from the standard library.
347Replace `produce` with `buildIterator`, `send` with `yield`, `receive` with `next`,
348`ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either.
349However, the benefit of a pipeline that uses channels as shown above is that it can actually use
350multiple CPU cores if you run it in [Dispatchers.Default] context.
351
352Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
353other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
354built using `buildSequence`/`buildIterator`, because they do not allow arbitrary suspension, unlike
355`produce`, which is fully asynchronous.
356
357### Fan-out
358
359Multiple coroutines may receive from the same channel, distributing work between themselves.
360Let us start with a producer coroutine that is periodically producing integers
361(ten numbers per second):
362
Alexander Prendotacbeef102018-09-27 18:42:04 +0300363<div class="sample" markdown="1" theme="idea" data-highlight-only>
364
hadihariri7db55532018-09-15 10:35:08 +0200365```kotlin
366fun CoroutineScope.produceNumbers() = produce<Int> {
367 var x = 1 // start from 1
368 while (true) {
369 send(x++) // produce next
370 delay(100) // wait 0.1s
371 }
372}
373```
374
Alexander Prendotacbeef102018-09-27 18:42:04 +0300375</div>
376
hadihariri7db55532018-09-15 10:35:08 +0200377Then we can have several processor coroutines. In this example, they just print their id and
378received number:
379
Alexander Prendotacbeef102018-09-27 18:42:04 +0300380<div class="sample" markdown="1" theme="idea" data-highlight-only>
381
hadihariri7db55532018-09-15 10:35:08 +0200382```kotlin
383fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
384 for (msg in channel) {
385 println("Processor #$id received $msg")
386 }
387}
388```
389
Alexander Prendotacbeef102018-09-27 18:42:04 +0300390</div>
391
hadihariri7db55532018-09-15 10:35:08 +0200392Now let us launch five processors and let them work for almost a second. See what happens:
393
Prendota65e6c8c2018-10-17 11:51:08 +0300394<!--- CLEAR -->
395
396<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300397
hadihariri7db55532018-09-15 10:35:08 +0200398```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300399import kotlinx.coroutines.*
400import kotlinx.coroutines.channels.*
401
402fun main() = runBlocking<Unit> {
403//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200404 val producer = produceNumbers()
405 repeat(5) { launchProcessor(it, producer) }
406 delay(950)
407 producer.cancel() // cancel producer coroutine and thus kill them all
Prendota65e6c8c2018-10-17 11:51:08 +0300408//sampleEnd
409}
410
411fun CoroutineScope.produceNumbers() = produce<Int> {
412 var x = 1 // start from 1
413 while (true) {
414 send(x++) // produce next
415 delay(100) // wait 0.1s
416 }
417}
418
419fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
420 for (msg in channel) {
421 println("Processor #$id received $msg")
422 }
hadihariri7db55532018-09-15 10:35:08 +0200423}
424```
425
Alexander Prendotacbeef102018-09-27 18:42:04 +0300426</div>
427
Vsevolod Tolstopyatove50a0fa2019-01-28 11:34:24 +0300428> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt)
hadihariri7db55532018-09-15 10:35:08 +0200429
430The output will be similar to the the following one, albeit the processor ids that receive
431each specific integer may be different:
432
433```
434Processor #2 received 1
435Processor #4 received 2
436Processor #0 received 3
437Processor #1 received 4
438Processor #3 received 5
439Processor #2 received 6
440Processor #4 received 7
441Processor #0 received 8
442Processor #1 received 9
443Processor #3 received 10
444```
445
446<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
447
448Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
449over the channel that processor coroutines are doing.
450
451Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code.
452Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor
453coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach`
454always consumes (cancels) the underlying channel on its normal or abnormal completion.
455
456### Fan-in
457
458Multiple coroutines may send to the same channel.
459For example, let us have a channel of strings, and a suspending function that
460repeatedly sends a specified string to this channel with a specified delay:
461
Alexander Prendotacbeef102018-09-27 18:42:04 +0300462<div class="sample" markdown="1" theme="idea" data-highlight-only>
463
hadihariri7db55532018-09-15 10:35:08 +0200464```kotlin
465suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
466 while (true) {
467 delay(time)
468 channel.send(s)
469 }
470}
471```
472
Alexander Prendotacbeef102018-09-27 18:42:04 +0300473</div>
474
hadihariri7db55532018-09-15 10:35:08 +0200475Now, let us see what happens if we launch a couple of coroutines sending strings
476(in this example we launch them in the context of the main thread as main coroutine's children):
477
Prendota65e6c8c2018-10-17 11:51:08 +0300478<!--- CLEAR -->
479
480<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300481
hadihariri7db55532018-09-15 10:35:08 +0200482```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300483import kotlinx.coroutines.*
484import kotlinx.coroutines.channels.*
485
486fun main() = runBlocking {
487//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200488 val channel = Channel<String>()
489 launch { sendString(channel, "foo", 200L) }
490 launch { sendString(channel, "BAR!", 500L) }
491 repeat(6) { // receive first six
492 println(channel.receive())
493 }
494 coroutineContext.cancelChildren() // cancel all children to let main finish
Prendota65e6c8c2018-10-17 11:51:08 +0300495//sampleEnd
496}
497
498suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
499 while (true) {
500 delay(time)
501 channel.send(s)
502 }
hadihariri7db55532018-09-15 10:35:08 +0200503}
504```
505
Alexander Prendotacbeef102018-09-27 18:42:04 +0300506</div>
507
Vsevolod Tolstopyatove50a0fa2019-01-28 11:34:24 +0300508> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt)
hadihariri7db55532018-09-15 10:35:08 +0200509
510The output is:
511
512```text
513foo
514foo
515BAR!
516foo
517foo
518BAR!
519```
520
521<!--- TEST -->
522
523### Buffered channels
524
525The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
526meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
527if receive is invoked first, it is suspended until send is invoked.
528
529Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to
530specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
531similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
532
533Take a look at the behavior of the following code:
534
hadihariri7db55532018-09-15 10:35:08 +0200535
Prendota65e6c8c2018-10-17 11:51:08 +0300536<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300537
hadihariri7db55532018-09-15 10:35:08 +0200538```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300539import kotlinx.coroutines.*
540import kotlinx.coroutines.channels.*
541
542fun main() = runBlocking<Unit> {
543//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200544 val channel = Channel<Int>(4) // create buffered channel
545 val sender = launch { // launch sender coroutine
546 repeat(10) {
547 println("Sending $it") // print before sending each element
548 channel.send(it) // will suspend when buffer is full
549 }
550 }
551 // don't receive anything... just wait....
552 delay(1000)
553 sender.cancel() // cancel sender coroutine
Prendota65e6c8c2018-10-17 11:51:08 +0300554//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200555}
556```
557
Alexander Prendotacbeef102018-09-27 18:42:04 +0300558</div>
559
Vsevolod Tolstopyatove50a0fa2019-01-28 11:34:24 +0300560> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt)
hadihariri7db55532018-09-15 10:35:08 +0200561
562It prints "sending" _five_ times using a buffered channel with capacity of _four_:
563
564```text
565Sending 0
566Sending 1
567Sending 2
568Sending 3
569Sending 4
570```
571
572<!--- TEST -->
573
574The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
575
576### Channels are fair
577
578Send and receive operations to channels are _fair_ with respect to the order of their invocation from
579multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
580gets the element. In the following example two coroutines "ping" and "pong" are
581receiving the "ball" object from the shared "table" channel.
582
hadihariri7db55532018-09-15 10:35:08 +0200583
Prendota65e6c8c2018-10-17 11:51:08 +0300584<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
Alexander Prendotacbeef102018-09-27 18:42:04 +0300585
hadihariri7db55532018-09-15 10:35:08 +0200586```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300587import kotlinx.coroutines.*
588import kotlinx.coroutines.channels.*
589
590//sampleStart
hadihariri7db55532018-09-15 10:35:08 +0200591data class Ball(var hits: Int)
592
Prendota65e6c8c2018-10-17 11:51:08 +0300593fun main() = runBlocking {
hadihariri7db55532018-09-15 10:35:08 +0200594 val table = Channel<Ball>() // a shared table
595 launch { player("ping", table) }
596 launch { player("pong", table) }
597 table.send(Ball(0)) // serve the ball
598 delay(1000) // delay 1 second
599 coroutineContext.cancelChildren() // game over, cancel them
600}
601
602suspend fun player(name: String, table: Channel<Ball>) {
603 for (ball in table) { // receive the ball in a loop
604 ball.hits++
605 println("$name $ball")
606 delay(300) // wait a bit
607 table.send(ball) // send the ball back
608 }
609}
Prendota65e6c8c2018-10-17 11:51:08 +0300610//sampleEnd
hadihariri7db55532018-09-15 10:35:08 +0200611```
612
Alexander Prendotacbeef102018-09-27 18:42:04 +0300613</div>
614
Vsevolod Tolstopyatove50a0fa2019-01-28 11:34:24 +0300615> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt)
hadihariri7db55532018-09-15 10:35:08 +0200616
617The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
618coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
619received by the "pong" coroutine, because it was already waiting for it:
620
621```text
622ping Ball(hits=1)
623pong Ball(hits=2)
624ping Ball(hits=3)
625pong Ball(hits=4)
626```
627
628<!--- TEST -->
629
630Note, that sometimes channels may produce executions that look unfair due to the nature of the executor
631that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details.
632
633### Ticker channels
634
635Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel.
636Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce]
637pipelines and operators that do windowing and other time-dependent processing.
638Ticker channel can be used in [select] to perform "on tick" action.
639
640To create such channel use a factory method [ticker].
641To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
642
643Now let's see how it works in practice:
644
Alexander Prendotacbeef102018-09-27 18:42:04 +0300645<div class="sample" markdown="1" theme="idea" data-highlight-only>
646
hadihariri7db55532018-09-15 10:35:08 +0200647```kotlin
Prendota65e6c8c2018-10-17 11:51:08 +0300648import kotlinx.coroutines.*
649import kotlinx.coroutines.channels.*
650
651fun main() = runBlocking<Unit> {
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300652 val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
hadihariri7db55532018-09-15 10:35:08 +0200653 var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
654 println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
655
656 nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
657 println("Next element is not ready in 50 ms: $nextElement")
658
659 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
660 println("Next element is ready in 100 ms: $nextElement")
661
662 // Emulate large consumption delays
663 println("Consumer pauses for 150ms")
664 delay(150)
665 // Next element is available immediately
666 nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
667 println("Next element is available immediately after large consumer delay: $nextElement")
668 // Note that the pause between `receive` calls is taken into account and next element arrives faster
669 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
670 println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
671
672 tickerChannel.cancel() // indicate that no more elements are needed
673}
674```
675
Alexander Prendotacbeef102018-09-27 18:42:04 +0300676</div>
677
Vsevolod Tolstopyatove50a0fa2019-01-28 11:34:24 +0300678> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt)
hadihariri7db55532018-09-15 10:35:08 +0200679
680It prints following lines:
681
682```text
683Initial element is available immediately: kotlin.Unit
684Next element is not ready in 50 ms: null
685Next element is ready in 100 ms: kotlin.Unit
686Consumer pauses for 150ms
687Next element is available immediately after large consumer delay: kotlin.Unit
688Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
689```
690
691<!--- TEST -->
692
693Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element
694delay if a pause occurs, trying to maintain a fixed rate of produced elements.
695
696Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed
697delay between elements.
698
699
700<!--- MODULE kotlinx-coroutines-core -->
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300701<!--- INDEX kotlinx.coroutines -->
702[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
703[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
Vsevolod Tolstopyatov706e3932018-10-13 15:40:32 +0300704[kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/kotlin.coroutines.-coroutine-context/cancel-children.html
Roman Elizarov0950dfa2018-07-13 10:33:25 +0300705[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
706<!--- INDEX kotlinx.coroutines.channels -->
707[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
708[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
709[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
710[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html
711[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
712[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
713[Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html
714[ticker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html
715[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
716[TickerMode.FIXED_DELAY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y.html
717<!--- INDEX kotlinx.coroutines.selects -->
718[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
hadihariri7db55532018-09-15 10:35:08 +0200719<!--- END -->