Introduced "actor" coroutine builder
diff --git a/coroutines-guide.md b/coroutines-guide.md
index 8bf0f94..702a9bd 100644
--- a/coroutines-guide.md
+++ b/coroutines-guide.md
@@ -1700,6 +1700,10 @@
and a channel to communicate with other coroutines. A simple actor can be written as a function,
but an actor with a complex state is better suited for a class.
+There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its
+scope to receive messages from and combines the send channel into the resulting job object, so that a
+single reference to the actor can be carried around as its handle.
+
```kotlin
// Message types for counterActor
sealed class CounterMsg
@@ -1707,10 +1711,9 @@
class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply
// This function launches a new counter actor
-fun counterActor(request: ReceiveChannel<CounterMsg>) = launch(CommonPool) {
+fun counterActor() = actor<CounterMsg>(CommonPool) {
var counter = 0 // actor state
- while (isActive) { // main loop of the actor
- val msg = request.receive()
+ for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.send(counter)
@@ -1719,14 +1722,14 @@
}
fun main(args: Array<String>) = runBlocking<Unit> {
- val request = Channel<CounterMsg>()
- counterActor(request)
+ val counter = counterActor() // create the actor
massiveRun(CommonPool) {
- request.send(IncCounter)
+ counter.send(IncCounter)
}
val response = Channel<Int>()
- request.send(GetCounter(response))
+ counter.send(GetCounter(response))
println("Counter = ${response.receive()}")
+ counter.close() // shutdown the actor
}
```
@@ -1737,12 +1740,16 @@
Counter = 1000000
-->
-Notice, that it does not matter (for correctness) what context the actor itself is executed in. An actor is
+It does not matter (for correctness) what context the actor itself is executed in. An actor is
a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
works as a solution to the problem of shared mutable state.
-Actor is more efficient than locking under load, because in this case it always has work to do and does not
-have to switch at all.
+Actor is more efficient than locking under load, because in this case it always has work to do and it does not
+have to switch to a different context at all.
+
+> Note, that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated
+ with the channel that it receives messages from, while a producer is associated with the channel that it
+ sends elements to.
## Select expression
@@ -2116,6 +2123,7 @@
[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/close.html
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/invoke.html
+[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
<!--- INDEX kotlinx.coroutines.experimental.selects -->
[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
[SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/on-receive.html
diff --git a/kotlinx-coroutines-core/README.md b/kotlinx-coroutines-core/README.md
index fa9d8d1..ba791ea 100644
--- a/kotlinx-coroutines-core/README.md
+++ b/kotlinx-coroutines-core/README.md
@@ -9,6 +9,7 @@
| [launch] | [Job] | [CoroutineScope] | Launches coroutine that does not have any result
| [async] | [Deferred] | [CoroutineScope] | Returns a single value with the future result
| [produce][kotlinx.coroutines.experimental.channels.produce] | [ProducerJob][kotlinx.coroutines.experimental.channels.ProducerJob] | [ProducerScope][kotlinx.coroutines.experimental.channels.ProducerScope] | Produces a stream of elements
+| [actor][kotlinx.coroutines.experimental.channels.actor] | [ActorJob][kotlinx.coroutines.experimental.channels.ActorJob] | [ActorScope][kotlinx.coroutines.experimental.channels.ActorScope] | Processes a stream of messages
| [runBlocking] | `T` | [CoroutineScope] | Blocks the thread while the coroutine runs
Coroutine dispatchers implementing [CoroutineDispatcher]:
@@ -103,6 +104,9 @@
[kotlinx.coroutines.experimental.channels.produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
[kotlinx.coroutines.experimental.channels.ProducerJob]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-job/index.html
[kotlinx.coroutines.experimental.channels.ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-scope/index.html
+[kotlinx.coroutines.experimental.channels.actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
+[kotlinx.coroutines.experimental.channels.ActorJob]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-actor-job/index.html
+[kotlinx.coroutines.experimental.channels.ActorScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-actor-scope/index.html
[kotlinx.coroutines.experimental.channels.Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
[kotlinx.coroutines.experimental.channels.SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/send.html
[kotlinx.coroutines.experimental.channels.ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/receive.html
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
new file mode 100644
index 0000000..835117c
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CoroutineDispatcher
+import kotlinx.coroutines.experimental.CoroutineScope
+import kotlinx.coroutines.experimental.Job
+import kotlinx.coroutines.experimental.newCoroutineContext
+import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.startCoroutine
+
+/**
+ * Scope for [actor] coroutine builder.
+ */
+public interface ActorScope<out E> : CoroutineScope, ReceiveChannel<E> {
+ /**
+ * A reference to the mailbox channel that this coroutine [receives][receive] messages from.
+ * It is provided for convenience, so that the code in the coroutine can refer
+ * to the channel as `channel` as apposed to `this`.
+ * All the [ReceiveChannel] functions on this interface delegate to
+ * the channel instance returned by this function.
+ */
+ val channel: ReceiveChannel<E>
+}
+
+/**
+ * Return type for [actor] coroutine builder.
+ */
+public interface ActorJob<in E> : Job, SendChannel<E> {
+ /**
+ * A reference to the mailbox channel that this coroutine is receiving messages from.
+ * All the [SendChannel] functions on this interface delegate to
+ * the channel instance returned by this function.
+ */
+ val channel: SendChannel<E>
+}
+
+/**
+ * Launches new coroutine that is receiving messages from its mailbox channel
+ * and returns a reference to the coroutine as an [ActorJob]. The resulting
+ * object can be used to [send][SendChannel.send] messages to this coroutine.
+ *
+ * The scope of the coroutine contains [ActorScope] interface, which implements
+ * both [CoroutineScope] and [ReceiveChannel], so that coroutine can invoke
+ * [receive][ReceiveChannel.receive] directly. The channel is [closed][SendChannel.close]
+ * when the coroutine completes.
+ * The running coroutine is cancelled when the its job is [cancelled][Job.cancel].
+ *
+ * The [context] for the new coroutine must be explicitly specified.
+ * See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
+ * The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
+ * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
+ *
+ * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
+ * the resulting channel becomes _failed_, so that any attempt to send to such a channel throws exception.
+ *
+ * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
+ *
+ * @param context context of the coroutine
+ * @param capacity capacity of the channel's buffer (no buffer by default)
+ * @param block the coroutine code
+ */
+public fun <E> actor(
+ context: CoroutineContext,
+ capacity: Int = 0,
+ block: suspend ActorScope<E>.() -> Unit
+): ActorJob<E> {
+ val channel = Channel<E>(capacity)
+ return ActorCoroutine(newCoroutineContext(context), channel).apply {
+ initParentJob(context[Job])
+ block.startCoroutine(this, this)
+ }
+}
+
+private class ActorCoroutine<E>(parentContext: CoroutineContext, channel: Channel<E>) :
+ ChannelCoroutine<E>(parentContext, channel), ActorScope<E>, ActorJob<E>
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
new file mode 100644
index 0000000..484774b
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.AbstractCoroutine
+import kotlinx.coroutines.experimental.JobSupport
+import kotlinx.coroutines.experimental.handleCoroutineException
+import kotlin.coroutines.experimental.CoroutineContext
+
+internal open class ChannelCoroutine<E>(
+ override val parentContext: CoroutineContext,
+ val channel: Channel<E>
+) : AbstractCoroutine<Unit>(active = true), Channel<E> by channel {
+ override fun afterCompletion(state: Any?, mode: Int) {
+ val cause = (state as? JobSupport.CompletedExceptionally)?.cause
+ if (!channel.close(cause) && cause != null)
+ handleCoroutineException(context, cause)
+ }
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
index 24c6855..d4b0834 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
@@ -16,7 +16,10 @@
package kotlinx.coroutines.experimental.channels
-import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.CoroutineDispatcher
+import kotlinx.coroutines.experimental.CoroutineScope
+import kotlinx.coroutines.experimental.Job
+import kotlinx.coroutines.experimental.newCoroutineContext
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.startCoroutine
@@ -60,7 +63,8 @@
/**
* Launches new coroutine to produce a stream of values by sending them to a channel
- * and returns a reference to the coroutine as a [ProducerJob].
+ * and returns a reference to the coroutine as a [ProducerJob]. This resulting
+ * object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
*
* The scope of the coroutine contains [ProducerScope] interface, which implements
* both [CoroutineScope] and [SendChannel], so that coroutine can invoke
@@ -77,6 +81,10 @@
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
+ *
+ * @param context context of the coroutine
+ * @param capacity capacity of the channel's buffer (no buffer by default)
+ * @param block the coroutine code
*/
public fun <E> produce(
context: CoroutineContext,
@@ -101,13 +109,5 @@
): ProducerJob<E> =
produce(context, capacity, block)
-private class ProducerCoroutine<E>(
- override val parentContext: CoroutineContext,
- override val channel: Channel<E>
-) : AbstractCoroutine<Unit>(active = true), ProducerScope<E>, ProducerJob<E>, Channel<E> by channel {
- override fun afterCompletion(state: Any?, mode: Int) {
- val cause = (state as? CompletedExceptionally)?.cause
- if (!channel.close(cause) && cause != null)
- handleCoroutineException(context, cause)
- }
-}
+private class ProducerCoroutine<E>(parentContext: CoroutineContext, channel: Channel<E>) :
+ ChannelCoroutine<E>(parentContext, channel), ProducerScope<E>, ProducerJob<E>
diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt
index b9abd6c..b789617 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt
@@ -40,7 +40,7 @@
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) { // run each coroutine in CommonPool
- run(counterContext) { // but confine each increment to a single-threaded context
+ run(counterContext) { // but confine each increment to the single-threaded context
counter++
}
}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt
index 22e7f9d..adf8612 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt
@@ -39,7 +39,7 @@
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
- massiveRun(counterContext) { // run each coroutine in single-threaded context
+ massiveRun(counterContext) { // run each coroutine in the single-threaded context
counter++
}
println("Counter = $counter")
diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt
index 21b1fd8..1f7129c 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-07.kt
@@ -42,10 +42,9 @@
class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with reply
// This function launches a new counter actor
-fun counterActor(request: ReceiveChannel<CounterMsg>) = launch(CommonPool) {
+fun counterActor() = actor<CounterMsg>(CommonPool) {
var counter = 0 // actor state
- while (isActive) { // main loop of the actor
- val msg = request.receive()
+ for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.send(counter)
@@ -54,12 +53,12 @@
}
fun main(args: Array<String>) = runBlocking<Unit> {
- val request = Channel<CounterMsg>()
- counterActor(request)
+ val counter = counterActor() // create the actor
massiveRun(CommonPool) {
- request.send(IncCounter)
+ counter.send(IncCounter)
}
val response = Channel<Int>()
- request.send(GetCounter(response))
+ counter.send(GetCounter(response))
println("Counter = ${response.receive()}")
+ counter.close() // shutdown the actor
}