blob: 9d8c89d3a6be4a64ebd8c916c826f46c74514bc4 [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlin.coroutines.experimental.CoroutineContext
/**
* Scope for [actor] coroutine builder.
*/
public interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> {
/**
* A reference to the mailbox channel that this coroutine [receives][receive] messages from.
* It is provided for convenience, so that the code in the coroutine can refer
* to the channel as `channel` as apposed to `this`.
* All the [ReceiveChannel] functions on this interface delegate to
* the channel instance returned by this function.
*/
val channel: Channel<E>
}
/**
* Return type for [actor] coroutine builder.
*/
public interface ActorJob<in E> : Job, SendChannel<E> {
/**
* A reference to the mailbox channel that this coroutine is receiving messages from.
* All the [SendChannel] functions on this interface delegate to
* the channel instance returned by this function.
*/
val channel: SendChannel<E>
}
/**
* Launches new coroutine that is receiving messages from its mailbox channel
* and returns a reference to the coroutine as an [ActorJob]. The resulting
* object can be used to [send][SendChannel.send] messages to this coroutine.
*
* The scope of the coroutine contains [ActorScope] interface, which implements
* both [CoroutineScope] and [ReceiveChannel], so that coroutine can invoke
* [receive][ReceiveChannel.receive] directly. The channel is [closed][SendChannel.close]
* when the coroutine completes.
* The running coroutine is cancelled when the its job is [cancelled][Job.cancel].
*
* The [context] for the new coroutine must be explicitly specified.
* See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
* The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
*
* By default, the coroutine is immediately scheduled for execution.
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
* the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function
* and will be started implicitly on the first invocation of [join][Job.join] or on a first message
* [sent][SendChannel.send] to this coroutine's mailbox channel.
*
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
* the resulting channel becomes _failed_, so that any attempt to send to such a channel throws exception.
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
* @param context context of the coroutine
* @param capacity capacity of the channel's buffer (no buffer by default)
* @param start coroutine start option
* @param block the coroutine code
*/
public fun <E> actor(
context: CoroutineContext,
capacity: Int = 0,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend ActorScope<E>.() -> Unit
): ActorJob<E> {
val newContext = newCoroutineContext(context)
val channel = Channel<E>(capacity)
val coroutine = if (start.isLazy)
LazyActorCoroutine(newContext, channel, block) else
ActorCoroutine(newContext, channel, active = true)
coroutine.initParentJob(context[Job])
start(block, coroutine, coroutine)
return coroutine
}
private open class ActorCoroutine<E>(
parentContext: CoroutineContext,
channel: Channel<E>,
active: Boolean
) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E>, ActorJob<E>
private class LazyActorCoroutine<E>(
parentContext: CoroutineContext,
channel: Channel<E>,
private val block: suspend ActorScope<E>.() -> Unit
) : ActorCoroutine<E>(parentContext, channel, active = false) {
override val channel: Channel<E> get() = this
override fun onStart() {
block.startCoroutineCancellable(this, this)
}
suspend override fun send(element: E) {
start()
return super.send(element)
}
override fun offer(element: E): Boolean {
start()
return super.offer(element)
}
override fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R) {
start()
return super.registerSelectSend(select, element, block)
}
}