docs: make select-expression & shared state and concurrency runnable
diff --git a/core/kotlinx-coroutines-core/test/guide/example-select-01.kt b/core/kotlinx-coroutines-core/test/guide/example-select-01.kt
index f4cdd28..f49cab6 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-select-01.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-select-01.kt
@@ -8,8 +8,6 @@
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
-import kotlinx.coroutines.*
-import kotlin.coroutines.*
fun CoroutineScope.fizz() = produce<String> {
while (true) { // sends "Fizz" every 300 ms
@@ -37,10 +35,12 @@
}
fun main() = runBlocking<Unit> {
+//sampleStart
val fizz = fizz()
val buzz = buzz()
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
- coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
+ coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
+//sampleEnd
}
diff --git a/core/kotlinx-coroutines-core/test/guide/example-select-02.kt b/core/kotlinx-coroutines-core/test/guide/example-select-02.kt
index ff7e5c6..02bd98c 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-select-02.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-select-02.kt
@@ -8,7 +8,6 @@
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
-import kotlin.coroutines.*
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
@@ -25,8 +24,9 @@
"b -> '$value'"
}
}
-
+
fun main() = runBlocking<Unit> {
+//sampleStart
val a = produce<String> {
repeat(4) { send("Hello $it") }
}
@@ -36,5 +36,6 @@
repeat(8) { // print first eight results
println(selectAorB(a, b))
}
- coroutineContext.cancelChildren()
-}
+ coroutineContext.cancelChildren()
+//sampleEnd
+}
diff --git a/core/kotlinx-coroutines-core/test/guide/example-select-03.kt b/core/kotlinx-coroutines-core/test/guide/example-select-03.kt
index 35ea2eb..7b79bbe 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-select-03.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-select-03.kt
@@ -8,7 +8,6 @@
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
-import kotlin.coroutines.*
fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> {
for (num in 1..10) { // produce 10 numbers from 1 to 10
@@ -21,6 +20,7 @@
}
fun main() = runBlocking<Unit> {
+//sampleStart
val side = Channel<Int>() // allocate side channel
launch { // this is a very fast consumer for the side channel
side.consumeEach { println("Side channel has $it") }
@@ -30,5 +30,6 @@
delay(250) // let us digest the consumed number properly, do not hurry
}
println("Done consuming")
- coroutineContext.cancelChildren()
+ coroutineContext.cancelChildren()
+//sampleEnd
}
diff --git a/core/kotlinx-coroutines-core/test/guide/example-select-04.kt b/core/kotlinx-coroutines-core/test/guide/example-select-04.kt
index 3662741..3da88f1 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-select-04.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-select-04.kt
@@ -6,10 +6,9 @@
package kotlinx.coroutines.guide.select04
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import java.util.*
-
+
fun CoroutineScope.asyncString(time: Int) = async {
delay(time.toLong())
"Waited for $time ms"
@@ -21,6 +20,7 @@
}
fun main() = runBlocking<Unit> {
+//sampleStart
val list = asyncStringsList()
val result = select<String> {
list.withIndex().forEach { (index, deferred) ->
@@ -32,4 +32,5 @@
println(result)
val countActive = list.count { it.isActive }
println("$countActive coroutines are still active")
+//sampleEnd
}
diff --git a/core/kotlinx-coroutines-core/test/guide/example-select-05.kt b/core/kotlinx-coroutines-core/test/guide/example-select-05.kt
index f7a4b9f..fbbad49 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-select-05.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-select-05.kt
@@ -8,8 +8,7 @@
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
-import kotlin.coroutines.*
-
+
fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
var current = input.receive() // start with first received deferred value
while (isActive) { // loop while not cancelled/closed
@@ -37,6 +36,7 @@
}
fun main() = runBlocking<Unit> {
+//sampleStart
val chan = Channel<Deferred<String>>() // the channel for test
launch { // launch printing coroutine
for (s in switchMapDeferreds(chan))
@@ -52,4 +52,5 @@
delay(1000) // give it time to process
chan.close() // close the channel ...
delay(500) // and wait some time to let it finish
+//sampleEnd
}
diff --git a/core/kotlinx-coroutines-core/test/guide/example-sync-01.kt b/core/kotlinx-coroutines-core/test/guide/example-sync-01.kt
index e380554..b232087 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-sync-01.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-sync-01.kt
@@ -6,8 +6,7 @@
package kotlinx.coroutines.guide.sync01
import kotlinx.coroutines.*
-import kotlin.system.*
-import kotlin.coroutines.*
+import kotlin.system.*
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
@@ -26,8 +25,10 @@
var counter = 0
fun main() = runBlocking<Unit> {
+//sampleStart
GlobalScope.massiveRun {
counter++
}
println("Counter = $counter")
+//sampleEnd
}
diff --git a/core/kotlinx-coroutines-core/test/guide/example-sync-01b.kt b/core/kotlinx-coroutines-core/test/guide/example-sync-01b.kt
index b242f30..041ef01 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-sync-01b.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-sync-01b.kt
@@ -7,7 +7,6 @@
import kotlinx.coroutines.*
import kotlin.system.*
-import kotlin.coroutines.*
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
@@ -27,8 +26,10 @@
var counter = 0
fun main() = runBlocking<Unit> {
+//sampleStart
CoroutineScope(mtContext).massiveRun { // use it instead of Dispatchers.Default in this sample and below
counter++
}
println("Counter = $counter")
+//sampleEnd
}
diff --git a/core/kotlinx-coroutines-core/test/guide/example-sync-02.kt b/core/kotlinx-coroutines-core/test/guide/example-sync-02.kt
index b1c10d8..45d1be5 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-sync-02.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-sync-02.kt
@@ -7,7 +7,6 @@
import kotlinx.coroutines.*
import kotlin.system.*
-import kotlin.coroutines.*
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
diff --git a/core/kotlinx-coroutines-core/test/guide/example-sync-03.kt b/core/kotlinx-coroutines-core/test/guide/example-sync-03.kt
index 1eabc1b..e1f2388 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-sync-03.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-sync-03.kt
@@ -8,7 +8,6 @@
import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
import kotlin.system.*
-import kotlin.coroutines.*
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
@@ -27,8 +26,10 @@
var counter = AtomicInteger()
fun main() = runBlocking<Unit> {
+//sampleStart
GlobalScope.massiveRun {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
+//sampleEnd
}
diff --git a/core/kotlinx-coroutines-core/test/guide/example-sync-04.kt b/core/kotlinx-coroutines-core/test/guide/example-sync-04.kt
index 5d1fd88..77b3b64 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-sync-04.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-sync-04.kt
@@ -7,7 +7,6 @@
import kotlinx.coroutines.*
import kotlin.system.*
-import kotlin.coroutines.*
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
@@ -27,10 +26,12 @@
var counter = 0
fun main() = runBlocking<Unit> {
+//sampleStart
GlobalScope.massiveRun { // run each coroutine with DefaultDispathcer
withContext(counterContext) { // but confine each increment to the single-threaded context
counter++
}
}
println("Counter = $counter")
+//sampleEnd
}
diff --git a/core/kotlinx-coroutines-core/test/guide/example-sync-05.kt b/core/kotlinx-coroutines-core/test/guide/example-sync-05.kt
index 752c0e8..b5ad9da 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-sync-05.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-sync-05.kt
@@ -7,7 +7,6 @@
import kotlinx.coroutines.*
import kotlin.system.*
-import kotlin.coroutines.*
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
@@ -27,8 +26,10 @@
var counter = 0
fun main() = runBlocking<Unit> {
+//sampleStart
CoroutineScope(counterContext).massiveRun { // run each coroutine in the single-threaded context
counter++
}
println("Counter = $counter")
+//sampleEnd
}
diff --git a/core/kotlinx-coroutines-core/test/guide/example-sync-06.kt b/core/kotlinx-coroutines-core/test/guide/example-sync-06.kt
index 891533d..029a7da 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-sync-06.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-sync-06.kt
@@ -8,7 +8,6 @@
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*
-import kotlin.coroutines.*
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
@@ -28,10 +27,12 @@
var counter = 0
fun main() = runBlocking<Unit> {
+//sampleStart
GlobalScope.massiveRun {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
+//sampleEnd
}
diff --git a/core/kotlinx-coroutines-core/test/guide/example-sync-07.kt b/core/kotlinx-coroutines-core/test/guide/example-sync-07.kt
index ccac26d..79bba72 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-sync-07.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-sync-07.kt
@@ -8,7 +8,6 @@
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.*
-import kotlin.coroutines.*
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
@@ -41,6 +40,7 @@
}
fun main() = runBlocking<Unit> {
+//sampleStart
val counter = counterActor() // create the actor
GlobalScope.massiveRun {
counter.send(IncCounter)
@@ -50,4 +50,5 @@
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
+//sampleEnd
}
diff --git a/docs/select-expression.md b/docs/select-expression.md
index 842ab3a..17f3dbb 100644
--- a/docs/select-expression.md
+++ b/docs/select-expression.md
@@ -5,10 +5,6 @@
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.$$1$$2
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
-import kotlinx.coroutines.selects.*
-->
<!--- KNIT ../core/kotlinx-coroutines-core/test/guide/.*\.kt -->
<!--- TEST_OUT ../core/kotlinx-coroutines-core/test/guide/test/SelectGuideTest.kt
@@ -49,11 +45,6 @@
Let us have two producers of strings: `fizz` and `buzz`. The `fizz` produces "Fizz" string every 300 ms:
-<!--- INCLUDE
-import kotlinx.coroutines.*
-import kotlin.coroutines.*
--->
-
<div class="sample" markdown="1" theme="idea" data-highlight-only>
```kotlin
@@ -105,16 +96,49 @@
Let us run it all seven times:
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<!--- CLEAR -->
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.selects.*
+
+fun CoroutineScope.fizz() = produce<String> {
+ while (true) { // sends "Fizz" every 300 ms
+ delay(300)
+ send("Fizz")
+ }
+}
+
+fun CoroutineScope.buzz() = produce<String> {
+ while (true) { // sends "Buzz!" every 500 ms
+ delay(500)
+ send("Buzz!")
+ }
+}
+
+suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
+ select<Unit> { // <Unit> means that this select expression does not produce any result
+ fizz.onReceive { value -> // this is the first select clause
+ println("fizz -> '$value'")
+ }
+ buzz.onReceive { value -> // this is the second select clause
+ println("buzz -> '$value'")
+ }
+ }
+}
+
fun main() = runBlocking<Unit> {
+//sampleStart
val fizz = fizz()
val buzz = buzz()
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
- coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
+ coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
+//sampleEnd
}
```
@@ -143,10 +167,6 @@
specific action when the channel is closed. The following example also shows that `select` is an expression that returns
the result of its selected clause:
-<!--- INCLUDE
-import kotlin.coroutines.*
--->
-
<div class="sample" markdown="1" theme="idea" data-highlight-only>
```kotlin
@@ -172,10 +192,33 @@
Let's use it with channel `a` that produces "Hello" string four times and
channel `b` that produces "World" four times:
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<!--- CLEAR -->
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.selects.*
+
+suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
+ select<String> {
+ a.onReceiveOrNull { value ->
+ if (value == null)
+ "Channel 'a' is closed"
+ else
+ "a -> '$value'"
+ }
+ b.onReceiveOrNull { value ->
+ if (value == null)
+ "Channel 'b' is closed"
+ else
+ "b -> '$value'"
+ }
+ }
+
fun main() = runBlocking<Unit> {
+//sampleStart
val a = produce<String> {
repeat(4) { send("Hello $it") }
}
@@ -185,8 +228,9 @@
repeat(8) { // print first eight results
println(selectAorB(a, b))
}
- coroutineContext.cancelChildren()
-}
+ coroutineContext.cancelChildren()
+//sampleEnd
+}
```
</div>
@@ -226,10 +270,6 @@
Let us write an example of producer of integers that sends its values to a `side` channel when
the consumers on its primary channel cannot keep up with it:
-<!--- INCLUDE
-import kotlin.coroutines.*
--->
-
<div class="sample" markdown="1" theme="idea" data-highlight-only>
```kotlin
@@ -248,10 +288,27 @@
Consumer is going to be quite slow, taking 250 ms to process each number:
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<!--- CLEAR -->
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.selects.*
+
+fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> {
+ for (num in 1..10) { // produce 10 numbers from 1 to 10
+ delay(100) // every 100 ms
+ select<Unit> {
+ onSend(num) {} // Send to the primary channel
+ side.onSend(num) {} // or to the side channel
+ }
+ }
+}
+
fun main() = runBlocking<Unit> {
+//sampleStart
val side = Channel<Int>() // allocate side channel
launch { // this is a very fast consumer for the side channel
side.consumeEach { println("Side channel has $it") }
@@ -261,7 +318,8 @@
delay(250) // let us digest the consumed number properly, do not hurry
}
println("Done consuming")
- coroutineContext.cancelChildren()
+ coroutineContext.cancelChildren()
+//sampleEnd
}
```
@@ -293,10 +351,6 @@
Let us start with an async function that returns a deferred string value after
a random delay:
-<!--- INCLUDE .*/example-select-04.kt
-import java.util.*
--->
-
<div class="sample" markdown="1" theme="idea" data-highlight-only>
```kotlin
@@ -326,10 +380,27 @@
so we can provide clauses for it using an arbitrary code. In this case we iterate over a list
of deferred values to provide `onAwait` clause for each deferred value.
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<!--- CLEAR -->
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.selects.*
+import java.util.*
+
+fun CoroutineScope.asyncString(time: Int) = async {
+ delay(time.toLong())
+ "Waited for $time ms"
+}
+
+fun CoroutineScope.asyncStringsList(): List<Deferred<String>> {
+ val random = Random(3)
+ return List(12) { asyncString(random.nextInt(1000)) }
+}
+
fun main() = runBlocking<Unit> {
+//sampleStart
val list = asyncStringsList()
val result = select<String> {
list.withIndex().forEach { (index, deferred) ->
@@ -341,6 +412,7 @@
println(result)
val countActive = list.count { it.isActive }
println("$countActive coroutines are still active")
+//sampleEnd
}
```
@@ -363,10 +435,6 @@
deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
[onReceiveOrNull][ReceiveChannel.onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`:
-<!--- INCLUDE
-import kotlin.coroutines.*
--->
-
<div class="sample" markdown="1" theme="idea" data-highlight-only>
```kotlin
@@ -411,10 +479,43 @@
The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
data to it:
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<!--- CLEAR -->
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.selects.*
+
+fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
+ var current = input.receive() // start with first received deferred value
+ while (isActive) { // loop while not cancelled/closed
+ val next = select<Deferred<String>?> { // return next deferred value from this select or null
+ input.onReceiveOrNull { update ->
+ update // replaces next value to wait
+ }
+ current.onAwait { value ->
+ send(value) // send value that current deferred has produced
+ input.receiveOrNull() // and use the next deferred from the input channel
+ }
+ }
+ if (next == null) {
+ println("Channel was closed")
+ break // out of loop
+ } else {
+ current = next
+ }
+ }
+}
+
+fun CoroutineScope.asyncString(str: String, time: Long) = async {
+ delay(time)
+ str
+}
+
fun main() = runBlocking<Unit> {
+//sampleStart
val chan = Channel<Deferred<String>>() // the channel for test
launch { // launch printing coroutine
for (s in switchMapDeferreds(chan))
@@ -430,6 +531,7 @@
delay(1000) // give it time to process
chan.close() // close the channel ...
delay(500) // and wait some time to let it finish
+//sampleEnd
}
```
diff --git a/docs/shared-mutable-state-and-concurrency.md b/docs/shared-mutable-state-and-concurrency.md
index 0ddbdab..4a44d3c 100644
--- a/docs/shared-mutable-state-and-concurrency.md
+++ b/docs/shared-mutable-state-and-concurrency.md
@@ -5,8 +5,6 @@
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.$$1$$2
-
-import kotlinx.coroutines.*
-->
<!--- KNIT ../core/kotlinx-coroutines-core/test/guide/.*\.kt -->
<!--- TEST_OUT ../core/kotlinx-coroutines-core/test/guide/test/SharedStateGuideTest.kt
@@ -44,24 +42,7 @@
Let us launch a hundred coroutines all doing the same action thousand times.
We'll also measure their completion time for further comparisons:
-<!--- INCLUDE .*/example-sync-03.kt
-import java.util.concurrent.atomic.*
--->
-
-<!--- INCLUDE .*/example-sync-06.kt
-import kotlinx.coroutines.sync.*
--->
-
-<!--- INCLUDE .*/example-sync-07.kt
-import kotlinx.coroutines.channels.*
--->
-
-<!--- INCLUDE .*/example-sync-([0-9a-z]+).kt
-import kotlin.system.*
-import kotlin.coroutines.*
--->
-
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
@@ -81,21 +62,40 @@
</div>
-<!--- INCLUDE .*/example-sync-([0-9a-z]+).kt -->
-
We start with a very simple action that increments a shared mutable variable using
multi-threaded [Dispatchers.Default] that is used in [GlobalScope].
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<!--- CLEAR -->
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
+import kotlinx.coroutines.*
+import kotlin.system.*
+
+suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
+ val n = 100 // number of coroutines to launch
+ val k = 1000 // times an action is repeated by each coroutine
+ val time = measureTimeMillis {
+ val jobs = List(n) {
+ launch {
+ repeat(k) { action() }
+ }
+ }
+ jobs.forEach { it.join() }
+ }
+ println("Completed ${n * k} actions in $time ms")
+}
+
var counter = 0
fun main() = runBlocking<Unit> {
+//sampleStart
GlobalScope.massiveRun {
counter++
}
println("Counter = $counter")
+//sampleEnd
}
```
@@ -115,17 +115,38 @@
the thread pool is running in only one thread in this case. To reproduce the problem you'll need to make the
following change:
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<!--- CLEAR -->
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
+import kotlinx.coroutines.*
+import kotlin.system.*
+
+suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
+ val n = 100 // number of coroutines to launch
+ val k = 1000 // times an action is repeated by each coroutine
+ val time = measureTimeMillis {
+ val jobs = List(n) {
+ launch {
+ repeat(k) { action() }
+ }
+ }
+ jobs.forEach { it.join() }
+ }
+ println("Completed ${n * k} actions in $time ms")
+}
+
val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
var counter = 0
fun main() = runBlocking<Unit> {
+//sampleStart
CoroutineScope(mtContext).massiveRun { // use it instead of Dispatchers.Default in this sample and below
counter++
}
println("Counter = $counter")
+//sampleEnd
}
```
@@ -142,9 +163,28 @@
There is common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<!--- CLEAR -->
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
+import kotlinx.coroutines.*
+import kotlin.system.*
+
+suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
+ val n = 100 // number of coroutines to launch
+ val k = 1000 // times an action is repeated by each coroutine
+ val time = measureTimeMillis {
+ val jobs = List(n) {
+ launch {
+ repeat(k) { action() }
+ }
+ }
+ jobs.forEach { it.join() }
+ }
+ println("Completed ${n * k} actions in $time ms")
+}
+
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
@@ -176,16 +216,38 @@
operations that needs to be performed on a shared state.
In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<!--- CLEAR -->
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
+import kotlinx.coroutines.*
+import java.util.concurrent.atomic.*
+import kotlin.system.*
+
+suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
+ val n = 100 // number of coroutines to launch
+ val k = 1000 // times an action is repeated by each coroutine
+ val time = measureTimeMillis {
+ val jobs = List(n) {
+ launch {
+ repeat(k) { action() }
+ }
+ }
+ jobs.forEach { it.join() }
+ }
+ println("Completed ${n * k} actions in $time ms")
+}
+
var counter = AtomicInteger()
fun main() = runBlocking<Unit> {
+//sampleStart
GlobalScope.massiveRun {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
+//sampleEnd
}
```
@@ -209,19 +271,40 @@
the single event-dispatch/application thread. It is easy to apply with coroutines by using a
single-threaded context.
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<!--- CLEAR -->
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
+import kotlinx.coroutines.*
+import kotlin.system.*
+
+suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
+ val n = 100 // number of coroutines to launch
+ val k = 1000 // times an action is repeated by each coroutine
+ val time = measureTimeMillis {
+ val jobs = List(n) {
+ launch {
+ repeat(k) { action() }
+ }
+ }
+ jobs.forEach { it.join() }
+ }
+ println("Completed ${n * k} actions in $time ms")
+}
+
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking<Unit> {
+//sampleStart
GlobalScope.massiveRun { // run each coroutine with DefaultDispathcer
withContext(counterContext) { // but confine each increment to the single-threaded context
counter++
}
}
println("Counter = $counter")
+//sampleEnd
}
```
@@ -244,17 +327,38 @@
the single-threaded context to start with.
Here we use [CoroutineScope()] function to convert coroutine context reference to [CoroutineScope]:
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<!--- CLEAR -->
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
+import kotlinx.coroutines.*
+import kotlin.system.*
+
+suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
+ val n = 100 // number of coroutines to launch
+ val k = 1000 // times an action is repeated by each coroutine
+ val time = measureTimeMillis {
+ val jobs = List(n) {
+ launch {
+ repeat(k) { action() }
+ }
+ }
+ jobs.forEach { it.join() }
+ }
+ println("Completed ${n * k} actions in $time ms")
+}
+
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking<Unit> {
+//sampleStart
CoroutineScope(counterContext).massiveRun { // run each coroutine in the single-threaded context
counter++
}
println("Counter = $counter")
+//sampleEnd
}
```
@@ -279,19 +383,41 @@
There is also [withLock] extension function that conveniently represents
`mutex.lock(); try { ... } finally { mutex.unlock() }` pattern:
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<!--- CLEAR -->
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.sync.*
+import kotlin.system.*
+
+suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
+ val n = 100 // number of coroutines to launch
+ val k = 1000 // times an action is repeated by each coroutine
+ val time = measureTimeMillis {
+ val jobs = List(n) {
+ launch {
+ repeat(k) { action() }
+ }
+ }
+ jobs.forEach { it.join() }
+ }
+ println("Completed ${n * k} actions in $time ms")
+}
+
val mutex = Mutex()
var counter = 0
fun main() = runBlocking<Unit> {
+//sampleStart
GlobalScope.massiveRun {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
+//sampleEnd
}
```
@@ -357,10 +483,47 @@
The main code is straightforward:
-<div class="sample" markdown="1" theme="idea" data-highlight-only>
+<!--- CLEAR -->
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlin.system.*
+
+suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
+ val n = 100 // number of coroutines to launch
+ val k = 1000 // times an action is repeated by each coroutine
+ val time = measureTimeMillis {
+ val jobs = List(n) {
+ launch {
+ repeat(k) { action() }
+ }
+ }
+ jobs.forEach { it.join() }
+ }
+ println("Completed ${n * k} actions in $time ms")
+}
+
+// Message types for counterActor
+sealed class CounterMsg
+object IncCounter : CounterMsg() // one-way message to increment counter
+class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
+
+// This function launches a new counter actor
+fun CoroutineScope.counterActor() = actor<CounterMsg> {
+ var counter = 0 // actor state
+ for (msg in channel) { // iterate over incoming messages
+ when (msg) {
+ is IncCounter -> counter++
+ is GetCounter -> msg.response.complete(counter)
+ }
+ }
+}
+
fun main() = runBlocking<Unit> {
+//sampleStart
val counter = counterActor() // create the actor
GlobalScope.massiveRun {
counter.send(IncCounter)
@@ -370,6 +533,7 @@
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
+//sampleEnd
}
```