Select expression is modularized.
SelectClause(0,1,2) interfaces are introduced, so that synchronization
constructs can define their select clauses without having to modify
the source of the SelectBuilder.
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CompletableDeferred.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CompletableDeferred.kt
index f58b7ac..106fe97 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CompletableDeferred.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CompletableDeferred.kt
@@ -16,7 +16,7 @@
package kotlinx.coroutines.experimental
-import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlinx.coroutines.experimental.selects.SelectClause1
/**
* A [Deferred] that can be completed via public functions
@@ -86,8 +86,8 @@
private class CompletableDeferredImpl<T> : JobSupport(true), CompletableDeferred<T> {
override fun getCompleted(): T = getCompletedInternal() as T
suspend override fun await(): T = awaitInternal() as T
- override fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R) =
- registerSelectAwaitInternal(select, block as (suspend (Any?) -> R))
+ override val onAwait: SelectClause1<T>
+ get() = this as SelectClause1<T>
override fun complete(value: T): Boolean {
loopOnState { state ->
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index b1dfa39..8d5f520 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -16,8 +16,7 @@
package kotlinx.coroutines.experimental
-import kotlinx.coroutines.experimental.selects.SelectBuilder
-import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlinx.coroutines.experimental.selects.SelectClause1
import kotlinx.coroutines.experimental.selects.select
import kotlin.coroutines.experimental.CoroutineContext
@@ -91,16 +90,17 @@
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
- * This function can be used in [select] invocation with [onAwait][SelectBuilder.onAwait] clause.
+ * This function can be used in [select] invocation with [onAwait] clause.
* Use [isCompleted] to check for completion of this deferred value without waiting.
*/
public suspend fun await(): T
/**
- * Registers [onAwait][SelectBuilder.onAwait] select clause.
- * @suppress **This is unstable API and it is subject to change.**
+ * Clause for [select] expression of [await] suspending function that selects with the deferred value when it is
+ * resolved. The [select] invocation fails if the deferred value completes exceptionally (either fails or
+ * it cancelled).
*/
- public fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R)
+ public val onAwait: SelectClause1<T>
/**
* Returns *completed* result or throws [IllegalStateException] if this deferred value has not
@@ -175,8 +175,8 @@
) : AbstractCoroutine<T>(parentContext, active), Deferred<T> {
override fun getCompleted(): T = getCompletedInternal() as T
suspend override fun await(): T = awaitInternal() as T
- override fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R) =
- registerSelectAwaitInternal(select, block as (suspend (Any?) -> R))
+ override val onAwait: SelectClause1<T>
+ get() = this as SelectClause1<T>
}
private class LazyDeferredCoroutine<T>(
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 1baa7fe..922d414 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -22,7 +22,8 @@
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
import kotlinx.coroutines.experimental.internal.OpDescriptor
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
-import kotlinx.coroutines.experimental.selects.SelectBuilder
+import kotlinx.coroutines.experimental.selects.SelectClause0
+import kotlinx.coroutines.experimental.selects.SelectClause1
import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlinx.coroutines.experimental.selects.select
import java.util.concurrent.Future
@@ -168,11 +169,17 @@
* This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
* suspending function is suspended, this function immediately resumes with [CancellationException].
*
- * This function can be used in [select] invocation with [onJoin][SelectBuilder.onJoin] clause.
+ * This function can be used in [select] invocation with [onJoin] clause.
* Use [isCompleted] to check for completion of this job without waiting.
*/
public suspend fun join()
+ /**
+ * Clause for [select] expression of [join] suspending function that selects when the job is complete.
+ * This clause never fails, even if the job completes exceptionally.
+ */
+ public val onJoin: SelectClause0
+
// ------------ low-level state-notification ------------
/**
@@ -226,12 +233,6 @@
// ------------ unstable internal API ------------
/**
- * Registers [onJoin][SelectBuilder.onJoin] select clause.
- * @suppress **This is unstable API and it is subject to change.**
- */
- public fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R)
-
- /**
* @suppress **Error**: Operator '+' on two Job objects is meaningless.
* Job is a coroutine context element and `+` is a set-sum operator for coroutine contexts.
* The job to the right of `+` just replaces the job the left of `+`.
@@ -375,7 +376,7 @@
* @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
* @suppress **This is unstable API and it is subject to change.**
*/
-public open class JobSupport(active: Boolean) : Job {
+public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause1<Any?> {
override val key: CoroutineContext.Key<*> get() = Job
/*
@@ -728,7 +729,11 @@
cont.disposeOnCompletion(invokeOnCompletion(ResumeOnCompletion(this, cont)))
}
- override fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R) {
+ final override val onJoin: SelectClause0
+ get() = this
+
+ // registerSelectJoin
+ final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
// fast-path -- check state and select/return if needed
loopOnState { state ->
if (select.isSelected) return
@@ -956,7 +961,8 @@
})
}
- protected fun <R> registerSelectAwaitInternal(select: SelectInstance<R>, block: suspend (Any?) -> R) {
+ // registerSelectAwaitInternal
+ override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (Any?) -> R) {
// fast-path -- check state and select/return if needed
loopOnState { state ->
if (select.isSelected) return
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
index c739720..0c76c96 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
@@ -17,7 +17,7 @@
package kotlinx.coroutines.experimental
import kotlinx.coroutines.experimental.NonCancellable.isActive
-import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlinx.coroutines.experimental.selects.SelectClause0
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
/**
@@ -49,13 +49,8 @@
throw UnsupportedOperationException("This job is always active")
}
- /**
- * Always throws [UnsupportedOperationException].
- * @suppress **This is unstable API and it is subject to change.**
- */
- override fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R) {
- throw UnsupportedOperationException("This job is always active")
- }
+ override val onJoin: SelectClause0
+ get() = throw UnsupportedOperationException("This job is always active")
/** Always throws [IllegalStateException]. */
override fun getCompletionException(): CancellationException = throw IllegalStateException("This job is always active")
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index af009fa..529e638 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -22,6 +22,8 @@
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
import kotlinx.coroutines.experimental.removeOnCancel
import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
+import kotlinx.coroutines.experimental.selects.SelectClause1
+import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlinx.coroutines.experimental.suspendAtomicCancellableCoroutine
import kotlin.coroutines.experimental.startCoroutine
@@ -134,8 +136,7 @@
*/
protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
val prev = node.prev
- if (prev is SendBuffered<*>)
- prev.remove()
+ (prev as? SendBuffered<*>)?.remove()
}
/**
@@ -165,8 +166,7 @@
override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
super.finishOnSuccess(affected, next)
// remove previous SendBuffered
- if (affected is SendBuffered<*>)
- affected.remove()
+ (affected as? SendBuffered<*>)?.remove()
}
}
@@ -315,11 +315,11 @@
}
}
- private inner class TryEnqueueSendDesc<E, R>(
+ private inner class TryEnqueueSendDesc<R>(
element: E,
select: SelectInstance<R>,
- block: suspend () -> R
- ) : AddLastDesc<SendSelect<R>>(queue, SendSelect(element, select, block)) {
+ block: suspend (SendChannel<E>) -> R
+ ) : AddLastDesc<SendSelect<E, R>>(queue, SendSelect(element, this@AbstractSendChannel, select, block)) {
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
if (affected is ReceiveOrClosed<*>) {
return affected as? Closed<*> ?: ENQUEUE_FAILED
@@ -339,7 +339,14 @@
}
}
- override fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R) {
+ final override val onSend: SelectClause2<E, SendChannel<E>>
+ get() = object : SelectClause2<E, SendChannel<E>> {
+ override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
+ registerSelectSend(select, param, block)
+ }
+ }
+
+ private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
while (true) {
if (select.isSelected) return
if (isFull) {
@@ -357,7 +364,7 @@
offerResult === ALREADY_SELECTED -> return
offerResult === OFFER_FAILED -> {} // retry
offerResult === OFFER_SUCCESS -> {
- block.startCoroutineUndispatched(select.completion)
+ block.startCoroutineUndispatched(receiver = this, completion = select.completion)
return
}
offerResult is Closed<*> -> throw offerResult.sendException
@@ -369,17 +376,18 @@
// ------ private ------
- private class SendSelect<R>(
+ private class SendSelect<E, R>(
override val pollResult: Any?,
+ @JvmField val channel: SendChannel<E>,
@JvmField val select: SelectInstance<R>,
- @JvmField val block: suspend () -> R
+ @JvmField val block: suspend (SendChannel<E>) -> R
) : LockFreeLinkedListNode(), Send, DisposableHandle {
override fun tryResumeSend(idempotent: Any?): Any? =
if (select.trySelect(idempotent)) SELECT_STARTED else null
override fun completeResumeSend(token: Any) {
check(token === SELECT_STARTED)
- block.startCoroutine(select.completion)
+ block.startCoroutine(receiver = channel, completion = select.completion)
}
fun disposeOnSelect() {
@@ -390,7 +398,7 @@
remove()
}
- override fun toString(): String = "SendSelect($pollResult)[$select]"
+ override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
}
private class SendBuffered<out E>(
@@ -614,8 +622,15 @@
}
}
+ final override val onReceive: SelectClause1<E>
+ get() = object : SelectClause1<E> {
+ override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
+ registerSelectReceive(select, block)
+ }
+ }
+
@Suppress("UNCHECKED_CAST")
- override fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
+ private fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
while (true) {
if (select.isSelected) return
if (isEmpty) {
@@ -641,8 +656,15 @@
}
}
+ final override val onReceiveOrNull: SelectClause1<E?>
+ get() = object : SelectClause1<E?> {
+ override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
+ registerSelectReceiveOrNull(select, block)
+ }
+ }
+
@Suppress("UNCHECKED_CAST")
- override fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
+ private fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
while (true) {
if (select.isSelected) return
if (isEmpty) {
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
index 9d8c89d..b2de4a9 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
@@ -17,6 +17,7 @@
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlin.coroutines.experimental.CoroutineContext
@@ -105,7 +106,7 @@
parentContext: CoroutineContext,
channel: Channel<E>,
private val block: suspend ActorScope<E>.() -> Unit
-) : ActorCoroutine<E>(parentContext, channel, active = false) {
+) : ActorCoroutine<E>(parentContext, channel, active = false), SelectClause2<E, SendChannel<E>> {
override val channel: Channel<E> get() = this
override fun onStart() {
@@ -122,9 +123,13 @@
return super.offer(element)
}
- override fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R) {
+ override val onSend: SelectClause2<E, SendChannel<E>>
+ get() = this
+
+ // registerSelectSend
+ override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
start()
- return super.registerSelectSend(select, element, block)
+ super.onSend.registerSelectClause2(select, param, block)
}
}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
index 2bddca3..a5f096e 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -21,8 +21,8 @@
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
-import kotlinx.coroutines.experimental.selects.SelectBuilder
-import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlinx.coroutines.experimental.selects.SelectClause1
+import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.select
import kotlinx.coroutines.experimental.yield
@@ -66,12 +66,23 @@
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
- * This function can be used in [select] invocation with [onSend][SelectBuilder.onSend] clause.
+ * This function can be used in [select] invocation with [onSend] clause.
* Use [offer] to try sending to this channel without waiting.
*/
public suspend fun send(element: E)
/**
+ * Clause for [select] expression of [send] suspending function that selects when the element that is specified
+ * as parameter is sent to the channel. When the clause is selected the reference to this channel
+ * is passed into the corresponding block.
+ *
+ * The [select] invocation fails with [ClosedSendChannelException] if the channel
+ * [isClosedForSend][SendChannel.isClosedForSend] _normally_ or with the original
+ * [close][SendChannel.close] cause exception if the channel has _failed_.
+ */
+ public val onSend: SelectClause2<E, SendChannel<E>>
+
+ /**
* Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
* and returns `true`. Otherwise, it returns `false` immediately
* or throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
@@ -80,12 +91,6 @@
public fun offer(element: E): Boolean
/**
- * Registers [onSend][SelectBuilder.onSend] select clause.
- * @suppress **This is unstable API and it is subject to change.**
- */
- public fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R)
-
- /**
* Closes this channel with an optional exceptional [cause].
* This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
* Conceptually, its sends a special "close token" over this channel. Immediately after invocation of this function
@@ -137,12 +142,21 @@
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
- * This function can be used in [select] invocation with [onReceive][SelectBuilder.onReceive] clause.
+ * This function can be used in [select] invocation with [onReceive] clause.
* Use [poll] to try receiving from this channel without waiting.
*/
public suspend fun receive(): E
/**
+ * Clause for [select] expression of [receive] suspending function that selects with the element that
+ * is received from the channel.
+ * The [select] invocation fails with [ClosedReceiveChannelException] if the channel
+ * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_ or with the original
+ * [close][SendChannel.close] cause exception if the channel has _failed_.
+ */
+ public val onReceive: SelectClause1<E>
+
+ /**
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
* or returns `null` if the channel is [closed][isClosedForReceive] _normally_,
* or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
@@ -159,12 +173,20 @@
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
- * This function can be used in [select] invocation with [onReceiveOrNull][SelectBuilder.onReceiveOrNull] clause.
+ * This function can be used in [select] invocation with [onReceiveOrNull] clause.
* Use [poll] to try receiving from this channel without waiting.
*/
public suspend fun receiveOrNull(): E?
/**
+ * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
+ * is received from the channel or selects with `null` if if the channel
+ * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_. The [select] invocation fails with
+ * the original [close][SendChannel.close] cause exception if the channel has _failed_.
+ */
+ public val onReceiveOrNull: SelectClause1<E?>
+
+ /**
* Retrieves and removes the head of this queue, or returns `null` if this queue [isEmpty]
* or is [closed][isClosedForReceive] _normally_,
* or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
@@ -177,18 +199,6 @@
* throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public operator fun iterator(): ChannelIterator<E>
-
- /**
- * Registers [onReceive][SelectBuilder.onReceive] select clause.
- * @suppress **This is unstable API and it is subject to change.**
- */
- public fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R)
-
- /**
- * Registers [onReceiveOrNull][SelectBuilder.onReceiveOrNull] select clause.
- * @suppress **This is unstable API and it is subject to change.**
- */
- public fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R)
}
/**
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
index 1132bec..a2ee21c 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
@@ -20,6 +20,7 @@
import kotlinx.atomicfu.loop
import kotlinx.coroutines.experimental.internal.Symbol
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
+import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.SelectInstance
/**
@@ -233,13 +234,20 @@
}
}
- override fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R) {
+ override val onSend: SelectClause2<E, SendChannel<E>>
+ get() = object : SelectClause2<E, SendChannel<E>> {
+ override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
+ registerSelectSend(select, param, block)
+ }
+ }
+
+ private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
if (!select.trySelect(null)) return
offerInternal(element)?.let {
select.resumeSelectCancellableWithException(it.sendException)
return
}
- block.startCoroutineUndispatched(select.completion)
+ block.startCoroutineUndispatched(receiver = this, completion = select.completion)
}
private class Subscriber<E>(
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
index 1bfff3c..0f0c55f 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
@@ -19,8 +19,6 @@
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.loop
import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.channels.ClosedReceiveChannelException
-import kotlinx.coroutines.experimental.channels.ClosedSendChannelException
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.internal.*
@@ -37,49 +35,25 @@
*/
public interface SelectBuilder<in R> {
/**
- * Clause for [Job.join] suspending function that selects the given [block] when the job is complete.
- * This clause never fails, even if the job completes exceptionally.
+ * Registers clause in this [select] expression without additional parameters that does not select any value.
*/
- public fun Job.onJoin(block: suspend () -> R)
+ public operator fun SelectClause0.invoke(block: suspend () -> R)
/**
- * Clause for [Deferred.await] suspending function that selects the given [block] with the deferred value is
- * resolved. The [select] invocation fails if the deferred value completes exceptionally (either fails or
- * it cancelled).
+ * Registers clause in this [select] expression without additional parameters that selects value of type [Q].
*/
- public fun <T> Deferred<T>.onAwait(block: suspend (T) -> R)
+ public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
/**
- * Clause for [SendChannel.send] suspending function that selects the given [block] when the [element] is sent to
- * the channel. The [select] invocation fails with [ClosedSendChannelException] if the channel
- * [isClosedForSend][SendChannel.isClosedForSend] _normally_ or with the original
- * [close][SendChannel.close] cause exception if the channel has _failed_.
+ * Registers clause in this [select] expression with additional parameter of type [P] that selects value of type [Q].
*/
- public fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R)
+ public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
/**
- * Clause for [ReceiveChannel.receive] suspending function that selects the given [block] with the element that
- * is received from the channel. The [select] invocation fails with [ClosedReceiveChannelException] if the channel
- * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_ or with the original
- * [close][SendChannel.close] cause exception if the channel has _failed_.
+ * Registers clause in this [select] expression with additional parameter nullable parameter of type [P]
+ * with the `null` value for this parameter that selects value of type [Q].
*/
- public fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R)
-
- /**
- * Clause for [ReceiveChannel.receiveOrNull] suspending function that selects the given [block] with the element that
- * is received from the channel or selects the given [block] with `null` if if the channel
- * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_. The [select] invocation fails with
- * the original [close][SendChannel.close] cause exception if the channel has _failed_.
- */
- public fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R)
-
- /**
- * Clause for [Mutex.lock] suspending function that selects the given [block] when the mutex is locked.
- *
- * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
- * is already locked with the same token (same identity), this clause throws [IllegalStateException].
- */
- public fun Mutex.onLock(owner: Any? = null, block: suspend () -> R)
+ public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R) = invoke(null, block)
/**
* Clause that selects the given [block] after a specified timeout passes.
@@ -88,6 +62,63 @@
* @param unit timeout unit (milliseconds by default)
*/
public fun onTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> R)
+
+ /** @suppress **Deprecated: for binary compatibility only **/
+ @Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+ public fun Job.onJoin(block: suspend () -> R) { onJoin(block) }
+
+ /** @suppress **Deprecated: for binary compatibility only **/
+ @Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+ public fun <T> Deferred<T>.onAwait(block: suspend (T) -> R) { onAwait(block) }
+
+ /** @suppress **Deprecated: for binary compatibility only **/
+ @Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+ public fun Mutex.onLock(owner: Any? = null, block: suspend () -> R) { onLock { block() } }
+
+ /** @suppress **Deprecated: for binary compatibility only **/
+ @Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+ public fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R) { onSend(element) { block() } }
+
+ /** @suppress **Deprecated: for binary compatibility only **/
+ @Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+ public fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R) { onReceive(block) }
+
+ /** @suppress **Deprecated: for binary compatibility only **/
+ @Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+ public fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R) { onReceiveOrNull(block) }
+}
+
+/**
+ * Clause for [select] expression without additional parameters that does not select any value.
+ */
+public interface SelectClause0 {
+ /**
+ * Registers this clause with the specified [select] instance and [block] of code.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ public fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R)
+}
+
+/**
+ * Clause for [select] expression without additional parameters that selects value of type [Q].
+ */
+public interface SelectClause1<out Q> {
+ /**
+ * Registers this clause with the specified [select] instance and [block] of code.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ public fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (Q) -> R)
+}
+
+/**
+ * Clause for [select] expression with additional parameter of type [P] that selects value of type [Q].
+ */
+public interface SelectClause2<in P, out Q> {
+ /**
+ * Registers this clause with the specified [select] instance and [block] of code.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ public fun <R> registerSelectClause2(select: SelectInstance<R>, param: P, block: suspend (Q) -> R)
}
/**
@@ -120,15 +151,18 @@
/**
* Returns completion continuation of this select instance.
* This select instance must be _selected_ first.
- * All resumption through this instance happen _directly_ (as if `mode` is [MODE_DIRECT]).
+ * All resumption through this instance happen _directly_ without going through dispatcher ([MODE_DIRECT]).
*/
public val completion: Continuation<R>
/**
- * Resumes this instance with [MODE_CANCELLABLE].
+ * Resumes this instance in a cancellable way ([MODE_CANCELLABLE]).
*/
public fun resumeSelectCancellableWithException(exception: Throwable)
+ /**
+ * Disposes the specified handle when this instance is selected.
+ */
public fun disposeOnSelect(handle: DisposableHandle)
}
@@ -393,28 +427,16 @@
}
}
- override fun Job.onJoin(block: suspend () -> R) {
- registerSelectJoin(this@SelectBuilderImpl, block)
+ override fun SelectClause0.invoke(block: suspend () -> R) {
+ registerSelectClause0(this@SelectBuilderImpl, block)
}
- override fun <T> Deferred<T>.onAwait(block: suspend (T) -> R) {
- registerSelectAwait(this@SelectBuilderImpl, block)
+ override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
+ registerSelectClause1(this@SelectBuilderImpl, block)
}
- override fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R) {
- registerSelectSend(this@SelectBuilderImpl, element, block)
- }
-
- override fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R) {
- registerSelectReceive(this@SelectBuilderImpl, block)
- }
-
- override fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R) {
- registerSelectReceiveOrNull(this@SelectBuilderImpl, block)
- }
-
- override fun Mutex.onLock(owner: Any?, block: suspend () -> R) {
- registerSelectLock(this@SelectBuilderImpl, owner, block)
+ override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) {
+ registerSelectClause2(this@SelectBuilderImpl, param, block)
}
override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt
index b6b84bc..cf79b47 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt
@@ -16,11 +16,6 @@
package kotlinx.coroutines.experimental.selects
-import kotlinx.coroutines.experimental.Deferred
-import kotlinx.coroutines.experimental.Job
-import kotlinx.coroutines.experimental.channels.ReceiveChannel
-import kotlinx.coroutines.experimental.channels.SendChannel
-import kotlinx.coroutines.experimental.sync.Mutex
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.Continuation
@@ -69,28 +64,16 @@
return instance.getResult()
}
- override fun Job.onJoin(block: suspend () -> R) {
- clauses += { registerSelectJoin(instance, block) }
+ override fun SelectClause0.invoke(block: suspend () -> R) {
+ clauses += { registerSelectClause0(instance, block) }
}
- override fun <T> Deferred<T>.onAwait(block: suspend (T) -> R) {
- clauses += { registerSelectAwait(instance, block) }
+ override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
+ clauses += { registerSelectClause1(instance, block) }
}
- override fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R) {
- clauses += { registerSelectSend(instance, element, block) }
- }
-
- override fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R) {
- clauses += { registerSelectReceive(instance, block) }
- }
-
- override fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R) {
- clauses += { registerSelectReceiveOrNull(instance, block) }
- }
-
- override fun Mutex.onLock(owner: Any?, block: suspend () -> R) {
- clauses += { registerSelectLock(instance, owner, block) }
+ override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) {
+ clauses += { registerSelectClause2(instance, param, block) }
}
override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt
index 9028ba3..3130085 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt
@@ -22,7 +22,7 @@
import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
-import kotlinx.coroutines.experimental.selects.SelectBuilder
+import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlinx.coroutines.experimental.selects.select
import kotlin.coroutines.experimental.startCoroutine
@@ -76,7 +76,7 @@
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
- * This function can be used in [select] invocation with [onLock][SelectBuilder.onLock] clause.
+ * This function can be used in [select] invocation with [onLock] clause.
* Use [tryLock] to try acquire lock without waiting.
*
* @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
@@ -85,10 +85,11 @@
public suspend fun lock(owner: Any? = null)
/**
- * Registers [onLock][SelectBuilder.onLock] select clause.
- * @suppress **This is unstable API and it is subject to change.**
+ * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
+ * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
+ * the reference to this mutex is passed into the corresponding block.
*/
- public fun <R> registerSelectLock(select: SelectInstance<R>, owner: Any?, block: suspend () -> R)
+ public val onLock: SelectClause2<Any?, Mutex>
/**
* Checks mutex locked by owner
@@ -169,7 +170,7 @@
override fun toString(): String = "Empty[$locked]"
}
-internal class MutexImpl(locked: Boolean) : Mutex {
+internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
// State is: Empty | LockedQueue | OpDescriptor
// shared objects while we have no waiters
private val _state = atomic<Any?>(if (locked) EmptyLocked else EmptyUnlocked)
@@ -251,7 +252,12 @@
}
}
- override fun <R> registerSelectLock(select: SelectInstance<R>, owner: Any?, block: suspend () -> R) {
+ override val onLock: SelectClause2<Any?, Mutex>
+ get() = this
+
+ // registerSelectLock
+ @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
+ override fun <R> registerSelectClause2(select: SelectInstance<R>, owner: Any?, block: suspend (Mutex) -> R) {
while (true) { // lock-free loop on state
if (select.isSelected) return
val state = _state.value
@@ -264,7 +270,7 @@
val failure = select.performAtomicTrySelect(TryLockDesc(this, owner))
when {
failure == null -> { // success
- block.startCoroutineUndispatched(select.completion)
+ block.startCoroutineUndispatched(receiver = this, completion = select.completion)
return
}
failure === ALREADY_SELECTED -> return // already selected -- bail out
@@ -325,8 +331,8 @@
owner: Any?,
queue: LockedQueue,
select: SelectInstance<R>,
- block: suspend () -> R
- ) : AddLastDesc<LockSelect<R>>(queue, LockSelect(owner, select, block)) {
+ block: suspend (Mutex) -> R
+ ) : AddLastDesc<LockSelect<R>>(queue, LockSelect(owner, mutex, select, block)) {
override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
if (mutex._state.value !== queue) return ENQUEUE_FAIL
return super.onPrepare(affected, next)
@@ -455,15 +461,16 @@
private class LockSelect<R>(
owner: Any?,
+ @JvmField val mutex: Mutex,
@JvmField val select: SelectInstance<R>,
- @JvmField val block: suspend () -> R
+ @JvmField val block: suspend (Mutex) -> R
) : LockWaiter(owner) {
override fun tryResumeLockWaiter(): Any? = if (select.trySelect(null)) SELECT_SUCCESS else null
override fun completeResumeLockWaiter(token: Any) {
check(token === SELECT_SUCCESS)
- block.startCoroutine(select.completion)
+ block.startCoroutine(receiver = mutex, completion = select.completion)
}
- override fun toString(): String = "LockSelect[$owner, $select]"
+ override fun toString(): String = "LockSelect[$owner, $mutex, $select]"
}
// atomic unlock operation that checks that waiters queue is empty
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
index f940c1b..f0b55c2 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
@@ -16,7 +16,7 @@
package kotlinx.coroutines.experimental.channels
-import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlinx.coroutines.experimental.selects.SelectClause1
enum class TestChannelKind {
RENDEZVOUS {
@@ -70,8 +70,8 @@
suspend override fun receiveOrNull(): E? = sub.receiveOrNull()
override fun poll(): E? = sub.poll()
override fun iterator(): ChannelIterator<E> = sub.iterator()
- override fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) =
- sub.registerSelectReceive(select, block)
- override fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) =
- sub.registerSelectReceiveOrNull(select, block)
+ override val onReceive: SelectClause1<E>
+ get() = sub.onReceive
+ override val onReceiveOrNull: SelectClause1<E?>
+ get() = sub.onReceiveOrNull
}
diff --git a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt
index 11dc70e..224357c 100644
--- a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt
@@ -24,6 +24,7 @@
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.handleCoroutineException
import kotlinx.coroutines.experimental.newCoroutineContext
+import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlinx.coroutines.experimental.sync.Mutex
import org.reactivestreams.Publisher
@@ -49,7 +50,7 @@
public fun <T> publish(
context: CoroutineContext,
block: suspend ProducerScope<T>.() -> Unit
-): Publisher<T> = Publisher<T> { subscriber ->
+): Publisher<T> = Publisher { subscriber ->
val newContext = newCoroutineContext(context)
val coroutine = PublisherCoroutine(newContext, subscriber)
coroutine.initParentJob(context[Job])
@@ -61,10 +62,10 @@
private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
-private class PublisherCoroutine<T>(
+private class PublisherCoroutine<in T>(
parentContext: CoroutineContext,
private val subscriber: Subscriber<T>
-) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription {
+) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this
// Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
@@ -85,7 +86,7 @@
return true
}
- public suspend override fun send(element: T): Unit {
+ public suspend override fun send(element: T) {
// fast-path -- try send without suspension
if (offer(element)) return
// slow-path does suspend
@@ -97,11 +98,17 @@
doLockedNext(element)
}
- override fun <R> registerSelectSend(select: SelectInstance<R>, element: T, block: suspend () -> R) =
- mutex.registerSelectLock(select, null) {
+ override val onSend: SelectClause2<T, SendChannel<T>>
+ get() = this
+
+ // registerSelectSend
+ @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
+ override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+ mutex.onLock.registerSelectClause2(select, null) {
doLockedNext(element)
- block()
+ block(this)
}
+ }
// assert: mutex.isLocked()
private fun doLockedNext(elem: T) {
diff --git a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt
index f7c4b5e..5688d2d 100644
--- a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt
@@ -24,6 +24,7 @@
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.handleCoroutineException
import kotlinx.coroutines.experimental.newCoroutineContext
+import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlinx.coroutines.experimental.sync.Mutex
import rx.Observable
@@ -63,10 +64,10 @@
private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
-private class RxObservableCoroutine<T>(
+private class RxObservableCoroutine<in T>(
parentContext: CoroutineContext,
private val subscriber: Subscriber<T>
-) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Producer, Subscription {
+) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Producer, Subscription, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this
// Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
@@ -87,7 +88,7 @@
return true
}
- public suspend override fun send(element: T): Unit {
+ public suspend override fun send(element: T) {
// fast-path -- try send without suspension
if (offer(element)) return
// slow-path does suspend
@@ -99,11 +100,17 @@
doLockedNext(element)
}
- override fun <R> registerSelectSend(select: SelectInstance<R>, element: T, block: suspend () -> R) =
- mutex.registerSelectLock(select, null) {
+ override val onSend: SelectClause2<T, SendChannel<T>>
+ get() = this
+
+ // registerSelectSend
+ @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
+ override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+ mutex.onLock.registerSelectClause2(select, null) {
doLockedNext(element)
- block()
+ block(this)
}
+ }
// assert: mutex.isLocked()
private fun doLockedNext(elem: T) {
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt
index 5409e44..2e81e23 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt
@@ -27,6 +27,7 @@
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.handleCoroutineException
import kotlinx.coroutines.experimental.newCoroutineContext
+import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlinx.coroutines.experimental.sync.Mutex
import kotlin.coroutines.experimental.CoroutineContext
@@ -65,7 +66,7 @@
private class RxObservableCoroutine<T>(
parentContext: CoroutineContext,
private val subscriber: ObservableEmitter<T>
-) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Cancellable {
+) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Cancellable, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this
// Mutex is locked when while subscriber.onXXX is being invoked
@@ -98,11 +99,17 @@
doLockedNext(element)
}
- override fun <R> registerSelectSend(select: SelectInstance<R>, element: T, block: suspend () -> R) =
- mutex.registerSelectLock(select, null) {
+ override val onSend: SelectClause2<T, SendChannel<T>>
+ get() = this
+
+ // registerSelectSend
+ @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
+ override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+ mutex.onLock.registerSelectClause2(select, null) {
doLockedNext(element)
- block()
+ block(this)
}
+ }
// assert: mutex.isLocked()
private fun doLockedNext(elem: T) {