Guide for channel basics
diff --git a/coroutines-guide.md b/coroutines-guide.md
index 7d0304d..a9abae6 100644
--- a/coroutines-guide.md
+++ b/coroutines-guide.md
@@ -33,6 +33,15 @@
* [Children of a coroutine](#children-of-a-coroutine)
* [Combining contexts](#combining-contexts)
* [Naming coroutines for debugging](#naming-coroutines-for-debugging)
+* [Channels](#channels)
+ * [Channel basics](#channel-basics)
+ * [Closing and iteration over channels](#closing-and-iteration-over-channels)
+ * [Building channel producers](#building-channel-producers)
+ * [Pipelines](#pipelines)
+ * [Prime numbers with pipeline](#prime-numbers-with-pipeline)
+ * [Fan-out](#fan-out)
+ * [Fan-in](#fan-in)
+ * [Buffered channels](#buffered-channels)
<!--- KNIT kotlinx-coroutines-core/src/test/kotlin/guide/.*\.kt -->
@@ -441,8 +450,8 @@
### Sequential by default
Assume that we have two suspending functions defined elsewhere that do something useful like some kind of
-remote service call or computation. We'll just pretend they are useful, but each one will just actaully
-delay for a second for the purpose of this example:
+remote service call or computation. We just pretend they are useful, but actually each one just
+delays for a second for the purpose of this example:
<!--- INCLUDE .*/example-compose-([0-9]+).kt
import kotlin.system.measureTimeMillis
@@ -672,7 +681,7 @@
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-context-03.kt)
-There are three coroutines. The main couroutine (#1) -- `runBlocking` one,
+There are three coroutines. The main coroutine (#1) -- `runBlocking` one,
and two coroutines computing deferred values `a` (#2) and `b` (#3).
They are all executing in the context of `runBlocking` and are confined to the main thread.
The output of this code is:
@@ -687,7 +696,7 @@
thread, but the identifier of the currently executing coroutine is appended to it. This identifier
is consecutively assigned to all created coroutines when debugging mode is turned on.
-You can read more about debugging facilities in documentation for `newCoroutineContext` function.
+You can read more about debugging facilities in the documentation for `newCoroutineContext` function.
### Jumping between threads
@@ -822,7 +831,7 @@
### Naming coroutines for debugging
-Automatically assignmed ids are good when coroutines log often and you just need to correlate log records
+Automatically assigned ids are good when coroutines log often and you just need to correlate log records
coming from the same coroutine. However, when coroutine is tied to the processing of a specific request
or doing some specific background task, it is better to name it explicitly for debugging purposes.
Coroutine name serves the same function as a thread name. It'll get displayed in the thread name that
@@ -861,4 +870,319 @@
[main @main#1] The answer for v1 / v2 = 42
```
+## Channels
+Deferred values provide a convenient way to transfer a single value between coroutines.
+Channels provide a way to transfer a stream of values.
+
+<!--- INCLUDE .*/example-channel-([0-9]+).kt
+import kotlinx.coroutines.experimental.channels.*
+-->
+
+### Channel basics
+
+A `Channel` is conceptually very similar to `BlockingQueue`. One key difference is that
+instead of a blocking `put` operation it has a suspending `send`, and instead of
+a blocking `take` operation it has a suspending `receive`.
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val channel = Channel<Int>()
+ launch(CommonPool) {
+ // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
+ for (x in 1..5) channel.send(x * x)
+ }
+ // here we print five received integers:
+ repeat(5) { println(channel.receive()) }
+ println("Done!")
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt)
+
+### Closing and iteration over channels
+
+Unlike a queue, a channel can be closed to indicate that no more elements are coming.
+On the receiver side it is convenient to use a regular `for` loop to receive elements
+from the channel.
+
+Conceptually, a `close` is like sending a special close token to the channel.
+The iteration stops as soon as this close token is received, so there is a guarantee
+that all previously sent elements before the close are received:
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val channel = Channel<Int>()
+ launch(CommonPool) {
+ for (x in 1..5) channel.send(x * x)
+ channel.close() // we're done sending
+ }
+ // here we print received values using `for` loop (until the channel is closed)
+ for (y in channel) println(y)
+ println("Done!")
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt)
+
+### Building channel producers
+
+The pattern where a coroutine is producing a sequence of elements into a channel is quite common.
+You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
+to common sense that results must be returned from functions. Here is a convenience
+coroutine builder named `buildChannel` that makes it easy to do it right:
+
+```kotlin
+fun produceSquares() = buildChannel<Int>(CommonPool) {
+ for (x in 1..5) send(x * x)
+}
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val squares = produceSquares()
+ for (y in squares) println(y)
+ println("Done!")
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt)
+
+### Pipelines
+
+Pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
+
+```kotlin
+fun produceNumbers() = buildChannel<Int>(CommonPool) {
+ var x = 1
+ while (true) send(x++) // infinite stream of integers starting from 1
+}
+```
+
+And another coroutine or coroutines are receiving that stream, doing some processing, and sending the result.
+In the below example the numbers are just squared:
+
+```kotlin
+fun square(numbers: ReceiveChannel<Int>) = buildChannel<Int>(CommonPool) {
+ for (x in numbers) send(x * x)
+}
+```
+
+The main code starts and connects pipeline:
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val numbers = produceNumbers() // produces integers from 1 and on
+ val squares = square(numbers) // squares integers
+ for (i in 1..5) println(squares.receive()) // print first five
+ println("Done!") // we are done
+ squares.cancel() // need to cancel these coroutines in a larger app
+ numbers.cancel()
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt)
+
+We don't have to cancel these coroutines in this example app, because
+[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
+but in a larger app we'll need to stop our pipeline if we don't need it anymore.
+Alternatively, we could have run pipeline coroutines as
+[children of a coroutine](#children-of-a-coroutine).
+
+### Prime numbers with pipeline
+
+Let's take pipelines to the extreme, with an example that generates prime numbers using a pipeline
+of coroutines. We start with an infinite sequence of numbers. This time we introduce an
+explicit context parameter, so that caller can control where our coroutines run:
+
+<!--- INCLUDE kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
+import kotlin.coroutines.experimental.CoroutineContext
+-->
+
+```kotlin
+fun numbersFrom(context: CoroutineContext, start: Int) = buildChannel<Int>(context) {
+ var x = start
+ while (true) send(x++) // infinite stream of integers from start
+}
+```
+
+The following pipeline stage filters an incoming stream of numbers, removing all the numbers
+that are divisible by the given prime number:
+
+```kotlin
+fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = buildChannel<Int>(context) {
+ for (x in numbers) if (x % prime != 0) send(x)
+}
+```
+
+Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
+and launching new pipeline stage for each prime number found. The following example prints first ten prime numbers,
+running the whole pipeline in the context of the main thread:
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ var cur = numbersFrom(context, 2)
+ for (i in 1..10) {
+ val prime = cur.receive()
+ println(prime)
+ cur = filter(context, cur, prime)
+ }
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt)
+
+The output of this code is:
+
+```
+2
+3
+5
+7
+11
+13
+17
+19
+23
+29
+```
+
+### Fan-out
+
+Multiple coroutines may receive from the same channel, distributing work between themselves.
+Let us start with a producer coroutine that is periodically producing integers
+(ten numbers per second):
+
+```kotlin
+fun produceNumbers() = buildChannel<Int>(CommonPool) {
+ var x = 1 // start from 1
+ while (true) {
+ send(x++) // produce next
+ delay(100) // wait 0.1s
+ }
+}
+```
+
+Then we can have several processor coroutines. In this example, they just print their id and
+received number:
+
+```kotlin
+fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
+ while (true) {
+ val x = channel.receive()
+ println("Processor #$id received $x")
+ }
+}
+```
+
+Now let us launch five processors and let them work for a second. See what happens:
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val producer = produceNumbers()
+ repeat(5) { launchProcessor(it, producer) }
+ delay(1000)
+ producer.cancel() // cancel producer coroutine and thus kill them all
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt)
+
+The output will be similar to the the following one, albeit the processor ids that receive
+each specific integer may be different:
+
+```
+Processor #2 received 1
+Processor #4 received 2
+Processor #0 received 3
+Processor #1 received 4
+Processor #3 received 5
+Processor #2 received 6
+Processor #4 received 7
+Processor #0 received 8
+Processor #1 received 9
+Processor #3 received 10
+```
+
+Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
+over the channel that processor coroutines are doing.
+
+### Fan-in
+
+Multiple coroutines may send to the same channel.
+For example, let us have a channel of strings, and a suspending function that
+repeatedly sends a specified string to this channel with a specified delay:
+
+```kotlin
+suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
+ while (true) {
+ delay(time)
+ channel.send(s)
+ }
+}
+```
+
+Now, let us see what happen if we launch a couple of coroutines sending strings
+(in this example we launch them in the context of the main thread):
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val channel = Channel<String>()
+ launch(context) { sendString(channel, "foo", 200L) }
+ launch(context) { sendString(channel, "BAR!", 500L) }
+ repeat(6) { // receive first six
+ println(channel.receive())
+ }
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt)
+
+The output is:
+
+```
+foo
+foo
+BAR!
+foo
+foo
+BAR!
+```
+
+### Buffered channels
+
+The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
+meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
+if receive is invoked first, it is suspended until send is invoked.
+
+Both `Channel()` factory and `buildChanner{}` builder take an optional `capacity` parameter to
+specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
+similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
+
+Take a look at the behavior of the following code:
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val channel = Channel<Int>(4) // create buffered channel
+ launch(context) { // launch sender coroutine
+ repeat(10) {
+ println("Sending $it") // print before sending each element
+ channel.send(it) // will suspend when buffer is full
+ }
+ }
+ // don't receive anything... just wait....
+ delay(1000)
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt)
+
+It prints "sending" _five_ times using a buffered channel with capacity of _four_:
+
+```
+Sending 0
+Sending 1
+Sending 2
+Sending 3
+Sending 4
+```
+
+The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt
new file mode 100644
index 0000000..b978e60
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-01.kt
@@ -0,0 +1,16 @@
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package guide.channel.example01
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val channel = Channel<Int>()
+ launch(CommonPool) {
+ // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
+ for (x in 1..5) channel.send(x * x)
+ }
+ // here we print five received integers:
+ repeat(5) { println(channel.receive()) }
+ println("Done!")
+}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt
new file mode 100644
index 0000000..5b083cc
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-02.kt
@@ -0,0 +1,16 @@
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package guide.channel.example02
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val channel = Channel<Int>()
+ launch(CommonPool) {
+ for (x in 1..5) channel.send(x * x)
+ channel.close() // we're done sending
+ }
+ // here we print received values using `for` loop (until the channel is closed)
+ for (y in channel) println(y)
+ println("Done!")
+}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt
new file mode 100644
index 0000000..f280e85
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-03.kt
@@ -0,0 +1,15 @@
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package guide.channel.example03
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+
+fun produceSquares() = buildChannel<Int>(CommonPool) {
+ for (x in 1..5) send(x * x)
+}
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val squares = produceSquares()
+ for (y in squares) println(y)
+ println("Done!")
+}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt
new file mode 100644
index 0000000..adc8c78
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-04.kt
@@ -0,0 +1,23 @@
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package guide.channel.example04
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+
+fun produceNumbers() = buildChannel<Int>(CommonPool) {
+ var x = 1
+ while (true) send(x++) // infinite stream of integers starting from 1
+}
+
+fun square(numbers: ReceiveChannel<Int>) = buildChannel<Int>(CommonPool) {
+ for (x in numbers) send(x * x)
+}
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val numbers = produceNumbers() // produces integers from 1 and on
+ val squares = square(numbers) // squares integers
+ for (i in 1..5) println(squares.receive()) // print first five
+ println("Done!") // we are done
+ squares.cancel() // need to cancel these coroutines in a larger app
+ numbers.cancel()
+}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
new file mode 100644
index 0000000..e72d0dd
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
@@ -0,0 +1,24 @@
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package guide.channel.example05
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import kotlin.coroutines.experimental.CoroutineContext
+
+fun numbersFrom(context: CoroutineContext, start: Int) = buildChannel<Int>(context) {
+ var x = start
+ while (true) send(x++) // infinite stream of integers from start
+}
+
+fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = buildChannel<Int>(context) {
+ for (x in numbers) if (x % prime != 0) send(x)
+}
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ var cur = numbersFrom(context, 2)
+ for (i in 1..10) {
+ val prime = cur.receive()
+ println(prime)
+ cur = filter(context, cur, prime)
+ }
+}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt
new file mode 100644
index 0000000..6e06305
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-06.kt
@@ -0,0 +1,27 @@
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package guide.channel.example06
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+
+fun produceNumbers() = buildChannel<Int>(CommonPool) {
+ var x = 1 // start from 1
+ while (true) {
+ send(x++) // produce next
+ delay(100) // wait 0.1s
+ }
+}
+
+fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
+ while (true) {
+ val x = channel.receive()
+ println("Processor #$id received $x")
+ }
+}
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val producer = produceNumbers()
+ repeat(5) { launchProcessor(it, producer) }
+ delay(1000)
+ producer.cancel() // cancel producer coroutine and thus kill them all
+}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt
new file mode 100644
index 0000000..c60442b
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt
@@ -0,0 +1,21 @@
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package guide.channel.example07
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+
+suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
+ while (true) {
+ delay(time)
+ channel.send(s)
+ }
+}
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val channel = Channel<String>()
+ launch(context) { sendString(channel, "foo", 200L) }
+ launch(context) { sendString(channel, "BAR!", 500L) }
+ repeat(6) { // receive first six
+ println(channel.receive())
+ }
+}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt
new file mode 100644
index 0000000..2232a4f
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt
@@ -0,0 +1,17 @@
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package guide.channel.example08
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val channel = Channel<Int>(4) // create buffered channel
+ launch(context) { // launch sender coroutine
+ repeat(10) {
+ println("Sending $it") // print before sending each element
+ channel.send(it) // will suspend when buffer is full
+ }
+ }
+ // don't receive anything... just wait....
+ delay(1000)
+}