blob: f9b4721366f3f1ab3a5d0b80dac102225239bed8 [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental.selects
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.ClosedReceiveChannelException
import kotlinx.coroutines.experimental.channels.ClosedSendChannelException
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
import kotlinx.coroutines.experimental.sync.Mutex
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.ContinuationInterceptor
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
/**
* Scope for [select] invocation.
*/
public interface SelectBuilder<in R> {
/**
* Clause for [Job.join] suspending function that selects the given [block] when the job is complete.
* This clause never fails, even if the job completes exceptionally.
*/
public fun Job.onJoin(block: suspend () -> R)
/**
* Clause for [Deferred.await] suspending function that selects the given [block] with the deferred value is
* resolved. The [select] invocation fails if the deferred value completes exceptionally (either fails or
* it cancelled).
*/
public fun <T> Deferred<T>.onAwait(block: suspend (T) -> R)
/**
* Clause for [SendChannel.send] suspending function that selects the given [block] when the [element] is sent to
* the channel. The [select] invocation fails with [ClosedSendChannelException] if the channel
* [isClosedForSend][SendChannel.isClosedForSend] _normally_ or with the original
* [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R)
/**
* Clause for [ReceiveChannel.receive] suspending function that selects the given [block] with the element that
* is received from the channel. The [select] invocation fails with [ClosedReceiveChannelException] if the channel
* [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_ or with the original
* [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R)
/**
* Clause for [ReceiveChannel.receiveOrNull] suspending function that selects the given [block] with the element that
* is received from the channel or selects the given [block] with `null` if if the channel
* [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_. The [select] invocation fails with
* the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R)
/**
* Clause for [Mutex.lock] suspending function that selects the given [block] when the mutex is locked.
*
* @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
* is already locked with the same token (same identity), this clause throws [IllegalStateException].
*/
public fun Mutex.onLock(owner: Any? = null, block: suspend () -> R)
/**
* Clause that selects the given [block] after a specified timeout passes.
*
* @param time timeout time
* @param unit timeout unit (milliseconds by default)
*/
public fun onTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> R)
}
/**
* Internal representation of select instance. This instance is called _selected_ when
* the clause to execute is already picked.
*
* @suppress **This is unstable API and it is subject to change.**
*/
public interface SelectInstance<in R> {
/**
* Returns `true` when this [select] statement had already picked a clause to execute.
*/
public val isSelected: Boolean
/**
* Tries to select this instance.
*/
public fun trySelect(idempotent: Any?): Boolean
/**
* Performs action atomically with [trySelect].
*/
public fun performAtomicTrySelect(desc: AtomicDesc): Any?
/**
* Performs action atomically when [isSelected] is `false`.
*/
public fun performAtomicIfNotSelected(desc: AtomicDesc): Any?
/**
* Returns completion continuation of this select instance.
* This select instance must be _selected_ first.
* All resumption through this instance happen _directly_ (as if `mode` is [MODE_DIRECT]).
*/
public val completion: Continuation<R>
/**
* Resumes this instance with [MODE_CANCELLABLE].
*/
public fun resumeSelectCancellableWithException(exception: Throwable)
public fun disposeOnSelect(handle: DisposableHandle)
}
/**
* Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
* in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
* is either _selected_ or _fails_.
*
* At most one clause is *atomically* selected and its block is executed. The result of the selected clause
* becomes the result of the select. If any clause _fails_, then the select invocation produces the
* corresponding exception. No clause is selected in this case.
*
* This select function is _biased_ to the first clause. When multiple clauses can be selected at the same time,
* the first one of them gets priority. Use [selectUnbiased] for an unbiased (randomized) selection among
* the clauses.
* There is no `default` clause for select expression. Instead, each selectable suspending function has the
* corresponding non-suspending version that can be used with a regular `when` expression to select one
* of the alternatives or to perform default (`else`) action if none of them can be immediately selected.
*
* | **Receiver** | **Suspending function** | **Select clause** | **Non-suspending version**
* | ---------------- | --------------------------------------------- | ------------------------------------------------ | --------------------------
* | [Job] | [join][Job.join] | [onJoin][SelectBuilder.onJoin] | [isCompleted][Job.isCompleted]
* | [Deferred] | [await][Deferred.await] | [onAwait][SelectBuilder.onAwait] | [isCompleted][Job.isCompleted]
* | [SendChannel] | [send][SendChannel.send] | [onSend][SelectBuilder.onSend] | [offer][SendChannel.offer]
* | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][SelectBuilder.onReceive] | [poll][ReceiveChannel.poll]
* | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][SelectBuilder.onReceiveOrNull] | [poll][ReceiveChannel.poll]
* | [Mutex] | [lock][Mutex.lock] | [onLock][SelectBuilder.onLock] | [tryLock][Mutex.tryLock]
* | none | [delay] | [onTimeout][SelectBuilder.onTimeout] | none
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
*
* Atomicity of cancellation depends on the clause: [onSend][SelectBuilder.onSend], [onReceive][SelectBuilder.onReceive],
* [onReceiveOrNull][SelectBuilder.onReceiveOrNull], and [onLock][SelectBuilder.onLock] clauses are
* *atomically cancellable*. When select throws [CancellationException] it means that those clauses had not performed
* their respective operations.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when this select operation
* was already resumed on atomically cancellable clause and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*/
public inline suspend fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R =
suspendCoroutineOrReturn { cont ->
val scope = SelectBuilderImpl(cont)
try {
builder(scope)
} catch (e: Throwable) {
scope.handleBuilderException(e)
}
scope.getResult()
}
internal val ALREADY_SELECTED: Any = Symbol("ALREADY_SELECTED")
private val UNDECIDED: Any = Symbol("UNDECIDED")
private val RESUMED: Any = Symbol("RESUMED")
@PublishedApi
internal class SelectBuilderImpl<in R>(
private val delegate: Continuation<R>
) : LockFreeLinkedListHead(), SelectBuilder<R>, SelectInstance<R>, Continuation<R> {
// selection state is "this" (list of nodes) initially and is replaced by idempotent marker (or null) when selected
@Volatile
private var _state: Any? = this
// this is basically our own SafeContinuation
@Volatile
private var result: Any? = UNDECIDED
// cancellability support
@Volatile
private var parentHandle: DisposableHandle? = null
/* Result state machine
+-----------+ getResult +---------------------+ resume +---------+
| UNDECIDED | ------------> | COROUTINE_SUSPENDED | ---------> | RESUMED |
+-----------+ +---------------------+ +---------+
|
| resume
V
+------------+ getResult
| value/Fail | -----------+
+------------+ |
^ |
| |
+-------------------+
*/
companion object {
private val STATE: AtomicReferenceFieldUpdater<SelectBuilderImpl<*>, Any?> =
AtomicReferenceFieldUpdater.newUpdater(SelectBuilderImpl::class.java, Any::class.java, "_state")
private val RESULT: AtomicReferenceFieldUpdater<SelectBuilderImpl<*>, Any?> =
AtomicReferenceFieldUpdater.newUpdater(SelectBuilderImpl::class.java, Any::class.java, "result")
}
override val context: CoroutineContext get() = delegate.context
override val completion: Continuation<R> get() = this
private inline fun doResume(value: () -> Any?, block: () -> Unit) {
check(isSelected) { "Must be selected first" }
while (true) { // lock-free loop
val result = this.result // atomic read
when {
result === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, value())) return
result === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
block()
return
}
else -> throw IllegalStateException("Already resumed")
}
}
}
// Resumes in MODE_DIRECT
override fun resume(value: R) {
doResume({ value }) {
delegate.resumeDirect(value)
}
}
// Resumes in MODE_DIRECT
override fun resumeWithException(exception: Throwable) {
doResume({ Fail(exception) }) {
delegate.resumeDirectWithException(exception)
}
}
// Resumes in MODE_CANCELLABLE
override fun resumeSelectCancellableWithException(exception: Throwable) {
doResume({ Fail(exception) }) {
delegate.resumeCancellableWithException(exception)
}
}
@PublishedApi
internal fun getResult(): Any? {
if (!isSelected) initCancellability()
var result = this.result // atomic read
if (result === UNDECIDED) {
if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
result = this.result // reread volatile var
}
when {
result === RESUMED -> throw IllegalStateException("Already resumed")
result is Fail -> throw result.exception
else -> return result // either COROUTINE_SUSPENDED or data
}
}
private fun initCancellability() {
val parent = context[Job] ?: return
val newRegistration = parent.invokeOnCancellation { cause ->
if (trySelect(null))
resumeSelectCancellableWithException(cause ?: CancellationException("Select was cancelled"))
}
parentHandle = newRegistration
// now check our state _after_ registering
if (isSelected) newRegistration.dispose()
}
private val state: Any? get() {
while (true) { // lock-free helping loop
val state = _state
if (state !is OpDescriptor) return state
state.perform(this)
}
}
@PublishedApi
internal fun handleBuilderException(e: Throwable) {
if (trySelect(null))
resumeWithException(e)
else
handleCoroutineException(context, e)
}
override val isSelected: Boolean get() = state !== this
override fun disposeOnSelect(handle: DisposableHandle) {
val node = DisposeNode(handle)
while (true) { // lock-free loop on state
val state = this.state
if (state === this) {
if (addLastIf(node, { this.state === this }))
return
} else { // already selected
handle.dispose()
return
}
}
}
private fun doAfterSelect() {
parentHandle?.dispose()
forEach<DisposeNode> {
it.handle.dispose()
}
}
// it is just like start(), but support idempotent start
override fun trySelect(idempotent: Any?): Boolean {
check(idempotent !is OpDescriptor) { "cannot use OpDescriptor as idempotent marker"}
while (true) { // lock-free loop on state
val state = this.state
when {
state === this -> {
if (STATE.compareAndSet(this, this, idempotent)) {
doAfterSelect()
return true
}
}
// otherwise -- already selected
idempotent == null -> return false // already selected
state === idempotent -> return true // was selected with this marker
else -> return false
}
}
}
override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
private inner class AtomicSelectOp(
@JvmField val desc: AtomicDesc,
@JvmField val select: Boolean
) : AtomicOp<Any?>() {
override fun prepare(affected: Any?): Any? {
// only originator of operation makes preparation move of installing descriptor into this selector's state
// helpers should never do it, or risk ruining progress when they come late
if (affected == null) {
// we are originator (affected reference is not null if helping)
prepareIfNotSelected()?.let { return it }
}
return desc.prepare(this)
}
override fun complete(affected: Any?, failure: Any?) {
completeSelect(failure)
desc.complete(this, failure)
}
fun prepareIfNotSelected(): Any? {
while (true) { // lock-free loop on state
val state = _state
when {
state === this@AtomicSelectOp -> return null // already in progress
state is OpDescriptor -> state.perform(this@SelectBuilderImpl) // help
state === this@SelectBuilderImpl -> {
if (STATE.compareAndSet(this@SelectBuilderImpl, this@SelectBuilderImpl, this@AtomicSelectOp))
return null // success
}
else -> return ALREADY_SELECTED
}
}
}
private fun completeSelect(failure: Any?) {
val selectSuccess = select && failure == null
val update = if (selectSuccess) null else this@SelectBuilderImpl
if (STATE.compareAndSet(this@SelectBuilderImpl, this@AtomicSelectOp, update)) {
if (selectSuccess)
doAfterSelect()
}
}
}
override fun Job.onJoin(block: suspend () -> R) {
registerSelectJoin(this@SelectBuilderImpl, block)
}
override fun <T> Deferred<T>.onAwait(block: suspend (T) -> R) {
registerSelectAwait(this@SelectBuilderImpl, block)
}
override fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R) {
registerSelectSend(this@SelectBuilderImpl, element, block)
}
override fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R) {
registerSelectReceive(this@SelectBuilderImpl, block)
}
override fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R) {
registerSelectReceiveOrNull(this@SelectBuilderImpl, block)
}
override fun Mutex.onLock(owner: Any?, block: suspend () -> R) {
registerSelectLock(this@SelectBuilderImpl, owner, block)
}
override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
require(time >= 0) { "Timeout time $time cannot be negative" }
if (time == 0L) {
if (trySelect(null))
block.startCoroutineUndispatched(completion)
return
}
val action = Runnable {
// todo: we could have replaced startCoroutine with startCoroutineUndispatched
// But we need a way to know that Delay.invokeOnTimeout had used the right thread
if (trySelect(null))
block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
}
val delay = context[ContinuationInterceptor] as? Delay
if (delay != null)
disposeOnSelect(delay.invokeOnTimeout(time, unit, action)) else
disposeOnSelect(DisposableFutureHandle(defaultExecutor.schedule(action, time, unit)))
}
private class DisposeNode(
@JvmField val handle: DisposableHandle
) : LockFreeLinkedListNode()
private class Fail(
@JvmField val exception: Throwable
)
}