blob: dfb059ccbefb337b73ec709b28fa13805187cec5 [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental.selects
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.ClosedReceiveChannelException
import kotlinx.coroutines.experimental.channels.ClosedSendChannelException
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.internal.AtomicDesc
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
/**
* Scope for [select] invocation.
*/
public interface SelectBuilder<in R> : CoroutineScope {
/**
* Clause for [Deferred.await] suspending function that selects the given [block] with the deferred value is
* resolved. The [select] invocation fails if the deferred value completes exceptionally (either fails or
* it cancelled).
*/
public fun <T> Deferred<T>.onAwait(block: suspend (T) -> R)
/**
* Clause for [SendChannel.send] suspending function that selects the given [block] when the [element] is sent to
* the channel. The [select] invocation fails with [ClosedSendChannelException] if the channel
* [isClosedForSend][SendChannel.isClosedForSend] _normally_ or with the original
* [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R)
/**
* Clause for [ReceiveChannel.receive] suspending function that selects the given [block] with the element that
* is received from the channel. The [select] invocation fails with [ClosedReceiveChannelException] if the channel
* [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_ or with the original
* [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R)
/**
* Clause for [ReceiveChannel.receiveOrNull] suspending function that selects the given [block] with the element that
* is received from the channel or selects the given [block] with `null` if if the channel
* [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_. The [select] invocation fails with
* the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R)
}
/**
* Internal representation of select instance. This instance is called _selected_ when
* the clause to execute is already picked.
*
* @suppress **This is unstable API and it is subject to change.**
*/
public interface SelectInstance<in R> {
/**
* Returns `true` when this [select] statement had already picked a clause to execute.
*/
public val isSelected: Boolean
/**
* Tries to select this instance.
*/
public fun trySelect(idempotent: Any?): Boolean
/**
* Performs action atomically with [trySelect].
*/
public fun performAtomicTrySelect(desc: AtomicDesc): Any?
/**
* Performs action atomically when [isSelected] is `false`.
*/
public fun performAtomicIfNotSelected(desc: AtomicDesc): Any?
public val completion: Continuation<R>
public fun resumeSelectWithException(exception: Throwable)
public fun invokeOnCompletion(handler: CompletionHandler): Job.Registration
public fun unregisterOnCompletion(registration: Job.Registration)
}
/**
* Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
* in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
* is either _selected_ or _fails_.
*
* At most one clause is *atomically* selected and its block is executed. The result of the selected clause
* becomes the result of the select. If any clause _fails_, then the select invocation produces the
* corresponding exception. No clause is selected in this case.
*
* There is no `default` clause for select expression. Instead, each selectable suspending function has the
* corresponding non-suspending version that can be used with a regular `when` expression to select one
* of the alternatives or to perform default (`else`) action if none of them can be immediately selected.
*
* | **Receiver** | **Suspending function** | **Select clause** | **Non-suspending version**
* | ---------------- | --------------------------------------------- | ------------------------------------------------ | --------------------------
* | [Deferred] | [await][Deferred.await] | [onAwait][SelectBuilder.onAwait] | [isCompleted][Deferred.isCompleted]
* | [SendChannel] | [send][SendChannel.send] | [onSend][SelectBuilder.onSend] | [offer][SendChannel.offer]
* | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][SelectBuilder.onReceive] | [poll][ReceiveChannel.poll]
* | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][SelectBuilder.onReceiveOrNull] | [poll][ReceiveChannel.poll]
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
* Cancellation of suspended select is *atomic* -- when this function
* throws [CancellationException] it means that no clause was selected.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*/
public inline suspend fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R =
suspendCoroutineOrReturn { cont ->
val scope = SelectBuilderImpl(cont)
try {
builder(scope)
} catch (e: Throwable) {
scope.handleBuilderException(e)
}
scope.initSelectResult()
}
/*
:todo: It is best to rewrite this class without the use of CancellableContinuationImpl and JobSupport infrastructure
This way JobSupport will not have to provide trySelect(idempotent) operation can can save some checks and bytes
to carry on that idempotent start token.
*/
@PublishedApi
internal class SelectBuilderImpl<in R>(
delegate: Continuation<R>
) : CancellableContinuationImpl<R>(delegate, active = false), SelectBuilder<R>, SelectInstance<R> {
@PublishedApi
internal fun handleBuilderException(e: Throwable) {
if (trySelect(idempotent = null)) {
val token = tryResumeWithException(e)
if (token != null)
completeResume(token)
else
handleCoroutineException(context, e)
}
}
@PublishedApi
internal fun initSelectResult(): Any? {
if (!isSelected) initCancellability()
return getResult()
}
// coroutines that are started inside this select are directly subordinate to the parent job
override fun createContext(): CoroutineContext = delegate.context
override fun onParentCompletion(cause: Throwable?) {
/*
Select is cancelled only when no clause was selected yet. If a clause was selected, then
it is the concern of the coroutine that was started by that clause to cancel on its suspension
points.
*/
if (trySelect(null))
cancel(cause)
}
override fun defaultResumeMode(): Int = MODE_DIRECT // all resumes through completion are dispatched directly
override val completion: Continuation<R> get() {
check(isSelected) { "Must be selected first" }
return this
}
override fun resumeSelectWithException(exception: Throwable) {
check(isSelected) { "Must be selected first" }
resumeWithException(exception, mode = 0)
}
override fun <T> Deferred<T>.onAwait(block: suspend (T) -> R) {
registerSelectAwait(this@SelectBuilderImpl, block)
}
override fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R) {
registerSelectSend(this@SelectBuilderImpl, element, block)
}
override fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R) {
registerSelectReceive(this@SelectBuilderImpl, block)
}
override fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R) {
registerSelectReceiveOrNull(this@SelectBuilderImpl, block)
}
override fun unregisterOnCompletion(registration: Job.Registration) {
invokeOnCompletion(UnregisterOnCompletion(this, registration))
}
}