Select statement with onSend/onReceive/onAwait clauses
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index 38356b6..9c75420 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -16,6 +16,9 @@
package kotlinx.coroutines.experimental
+import kotlinx.coroutines.experimental.intrinsics.startUndispatchedCoroutine
+import kotlinx.coroutines.experimental.selects.SelectBuilder
+import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.startCoroutine
@@ -67,6 +70,12 @@
public suspend fun await(): T
/**
+ * Registers [onAwait][SelectBuilder.onAwait] select clause.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ public fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R)
+
+ /**
* Returns *completed* result or throws [IllegalStateException] if this deferred value has not
* [completed][isCompleted] yet. It throws the corresponding exception if this deferred has
* [completed exceptionally][isCompletedExceptionally].
@@ -118,9 +127,9 @@
async(context, block = block)
private open class DeferredCoroutine<T>(
- context: CoroutineContext,
+ override val parentContext: CoroutineContext,
active: Boolean
-) : AbstractCoroutine<T>(context, active), Deferred<T> {
+) : AbstractCoroutine<T>(active), Deferred<T> {
override val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
override val isCancelled: Boolean get() = state is Cancelled
@@ -152,6 +161,25 @@
})
}
+ override fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R) {
+ if (select.isSelected) return
+ val state = this.state
+ if (state is Incomplete) {
+ select.unregisterOnCompletion(invokeOnCompletion(SelectOnCompletion(this, select, block)))
+ } else
+ selectCompletion(select, block, state)
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ internal fun <R> selectCompletion(select: SelectInstance<R>, block: suspend (T) -> R, state: Any? = this.state) {
+ if (select.trySelect(idempotent = null)) {
+ if (state is CompletedExceptionally)
+ select.resumeSelectWithException(state.exception)
+ else
+ block.startUndispatchedCoroutine(state as T, select.completion)
+ }
+ }
+
@Suppress("UNCHECKED_CAST")
override fun getCompleted(): T {
val state = this.state
@@ -161,10 +189,19 @@
}
}
+private class SelectOnCompletion<T, R>(
+ deferred: DeferredCoroutine<T>,
+ private val select: SelectInstance<R>,
+ private val block: suspend (T) -> R
+) : JobNode<DeferredCoroutine<T>>(deferred) {
+ override fun invoke(reason: Throwable?) = job.selectCompletion(select, block)
+ override fun toString(): String = "SelectOnCompletion[$select]"
+}
+
private class LazyDeferredCoroutine<T>(
- context: CoroutineContext,
- val block: suspend CoroutineScope.() -> T
-) : DeferredCoroutine<T>(context, active = false) {
+ parentContext: CoroutineContext,
+ private val block: suspend CoroutineScope.() -> T
+) : DeferredCoroutine<T>(parentContext, active = false) {
override fun onStart() {
block.startCoroutine(this, this)
}