blob: 10a45a929fdb820208fdaf3f5739faa71b8d8904 [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.*
import kotlin.coroutines.experimental.*
/**
* Scope for [buildChannel].
*/
public interface ChannelBuilder<in E> : CoroutineScope, SendChannel<E>
/**
* Return type for [buildChannel].
*/
public interface ChannelJob<out E> : Job, ReceiveChannel<E>
/**
* Launches new coroutine without blocking current thread to send data over channel
* and returns a reference to the coroutine as a [ChannelJob], which implements
* both [Job] and [ReceiveChannel].
* The scope of the coroutine contains [ChannelBuilder] interface, which implements
* both [CoroutineScope] and [SendChannel], so that coroutine can invoke
* [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
* when the coroutine completes.
* The running coroutine is cancelled when the its job is [cancelled][Job.cancel].
*
* The [context] for the new coroutine must be explicitly specified.
* See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
* The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
*
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*/
public fun <E> buildChannel(
context: CoroutineContext,
capacity: Int = 0,
block: suspend ChannelBuilder<E>.() -> Unit
): ChannelJob<E> {
val channel = Channel<E>(capacity)
return ChannelCoroutine(newCoroutineContext(context), channel).apply {
initParentJob(context[Job])
block.startCoroutine(this, this)
}
}
private class ChannelCoroutine<E>(
context: CoroutineContext,
val channel: Channel<E>
) : AbstractCoroutine<Unit>(context), ChannelBuilder<E>, ChannelJob<E>, Channel<E> by channel {
override fun afterCompletion(state: Any?) {
val cause = (state as? CompletedExceptionally)?.exception
if (!channel.close(cause) && cause != null)
handleCoroutineException(context, cause)
}
}