Select expression is modularized.
SelectClause(0,1,2) interfaces are introduced, so that synchronization
constructs can define their select clauses without having to modify
the source of the SelectBuilder.
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CompletableDeferred.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CompletableDeferred.kt
index f58b7ac..106fe97 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CompletableDeferred.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CompletableDeferred.kt
@@ -16,7 +16,7 @@
 
 package kotlinx.coroutines.experimental
 
-import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlinx.coroutines.experimental.selects.SelectClause1
 
 /**
  * A [Deferred] that can be completed via public functions
@@ -86,8 +86,8 @@
 private class CompletableDeferredImpl<T> : JobSupport(true), CompletableDeferred<T> {
     override fun getCompleted(): T = getCompletedInternal() as T
     suspend override fun await(): T = awaitInternal() as T
-    override fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R) =
-        registerSelectAwaitInternal(select, block as (suspend (Any?) -> R))
+    override val onAwait: SelectClause1<T>
+        get() = this as SelectClause1<T>
 
     override fun complete(value: T): Boolean {
         loopOnState { state ->
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index b1dfa39..8d5f520 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -16,8 +16,7 @@
 
 package kotlinx.coroutines.experimental
 
-import kotlinx.coroutines.experimental.selects.SelectBuilder
-import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlinx.coroutines.experimental.selects.SelectClause1
 import kotlinx.coroutines.experimental.selects.select
 import kotlin.coroutines.experimental.CoroutineContext
 
@@ -91,16 +90,17 @@
      * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
      * immediately resumes with [CancellationException].
      *
-     * This function can be used in [select] invocation with [onAwait][SelectBuilder.onAwait] clause.
+     * This function can be used in [select] invocation with [onAwait] clause.
      * Use [isCompleted] to check for completion of this deferred value without waiting.
      */
     public suspend fun await(): T
 
     /**
-     * Registers [onAwait][SelectBuilder.onAwait] select clause.
-     * @suppress **This is unstable API and it is subject to change.**
+     * Clause for [select] expression of [await] suspending function that selects with the deferred value when it is
+     * resolved. The [select] invocation fails if the deferred value completes exceptionally (either fails or
+     * it cancelled).
      */
-    public fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R)
+    public val onAwait: SelectClause1<T>
 
     /**
      * Returns *completed* result or throws [IllegalStateException] if this deferred value has not
@@ -175,8 +175,8 @@
 ) : AbstractCoroutine<T>(parentContext, active), Deferred<T> {
     override fun getCompleted(): T = getCompletedInternal() as T
     suspend override fun await(): T = awaitInternal() as T
-    override fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R) =
-        registerSelectAwaitInternal(select, block as (suspend (Any?) -> R))
+    override val onAwait: SelectClause1<T>
+        get() = this as SelectClause1<T>
 }
 
 private class LazyDeferredCoroutine<T>(
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 1baa7fe..922d414 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -22,7 +22,8 @@
 import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
 import kotlinx.coroutines.experimental.internal.OpDescriptor
 import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
-import kotlinx.coroutines.experimental.selects.SelectBuilder
+import kotlinx.coroutines.experimental.selects.SelectClause0
+import kotlinx.coroutines.experimental.selects.SelectClause1
 import kotlinx.coroutines.experimental.selects.SelectInstance
 import kotlinx.coroutines.experimental.selects.select
 import java.util.concurrent.Future
@@ -168,11 +169,17 @@
      * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
      * suspending function is suspended, this function immediately resumes with [CancellationException].
      *
-     * This function can be used in [select] invocation with [onJoin][SelectBuilder.onJoin] clause.
+     * This function can be used in [select] invocation with [onJoin] clause.
      * Use [isCompleted] to check for completion of this job without waiting.
      */
     public suspend fun join()
 
+    /**
+     * Clause for [select] expression of [join] suspending function that selects when the job is complete.
+     * This clause never fails, even if the job completes exceptionally.
+     */
+    public val onJoin: SelectClause0
+
     // ------------ low-level state-notification ------------
 
     /**
@@ -226,12 +233,6 @@
     // ------------ unstable internal API ------------
 
     /**
-     * Registers [onJoin][SelectBuilder.onJoin] select clause.
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    public fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R)
-
-    /**
      * @suppress **Error**: Operator '+' on two Job objects is meaningless.
      * Job is a coroutine context element and `+` is a set-sum operator for coroutine contexts.
      * The job to the right of `+` just replaces the job the left of `+`.
@@ -375,7 +376,7 @@
  * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
  * @suppress **This is unstable API and it is subject to change.**
  */
-public open class JobSupport(active: Boolean) : Job {
+public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause1<Any?> {
     override val key: CoroutineContext.Key<*> get() = Job
 
     /*
@@ -728,7 +729,11 @@
         cont.disposeOnCompletion(invokeOnCompletion(ResumeOnCompletion(this, cont)))
     }
 
-    override fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R) {
+    final override val onJoin: SelectClause0
+        get() = this
+
+    // registerSelectJoin
+    final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
         // fast-path -- check state and select/return if needed
         loopOnState { state ->
             if (select.isSelected) return
@@ -956,7 +961,8 @@
         })
     }
 
-    protected fun <R> registerSelectAwaitInternal(select: SelectInstance<R>, block: suspend (Any?) -> R) {
+    // registerSelectAwaitInternal
+    override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (Any?) -> R) {
         // fast-path -- check state and select/return if needed
         loopOnState { state ->
             if (select.isSelected) return
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
index c739720..0c76c96 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
@@ -17,7 +17,7 @@
 package kotlinx.coroutines.experimental
 
 import kotlinx.coroutines.experimental.NonCancellable.isActive
-import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlinx.coroutines.experimental.selects.SelectClause0
 import kotlin.coroutines.experimental.AbstractCoroutineContextElement
 
 /**
@@ -49,13 +49,8 @@
         throw UnsupportedOperationException("This job is always active")
     }
 
-    /**
-     * Always throws [UnsupportedOperationException].
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    override fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R)  {
-        throw UnsupportedOperationException("This job is always active")
-    }
+    override val onJoin: SelectClause0
+        get() = throw UnsupportedOperationException("This job is always active")
 
     /** Always throws [IllegalStateException]. */
     override fun getCompletionException(): CancellationException = throw IllegalStateException("This job is always active")
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index af009fa..529e638 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -22,6 +22,8 @@
 import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
 import kotlinx.coroutines.experimental.removeOnCancel
 import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
+import kotlinx.coroutines.experimental.selects.SelectClause1
+import kotlinx.coroutines.experimental.selects.SelectClause2
 import kotlinx.coroutines.experimental.selects.SelectInstance
 import kotlinx.coroutines.experimental.suspendAtomicCancellableCoroutine
 import kotlin.coroutines.experimental.startCoroutine
@@ -134,8 +136,7 @@
      */
     protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
         val prev = node.prev
-        if (prev is SendBuffered<*>)
-            prev.remove()
+        (prev as? SendBuffered<*>)?.remove()
     }
 
     /**
@@ -165,8 +166,7 @@
         override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
             super.finishOnSuccess(affected, next)
             // remove previous SendBuffered
-            if (affected is SendBuffered<*>)
-                affected.remove()
+            (affected as? SendBuffered<*>)?.remove()
         }
     }
 
@@ -315,11 +315,11 @@
         }
     }
 
-    private inner class TryEnqueueSendDesc<E, R>(
+    private inner class TryEnqueueSendDesc<R>(
         element: E,
         select: SelectInstance<R>,
-        block: suspend () -> R
-    ) : AddLastDesc<SendSelect<R>>(queue, SendSelect(element, select, block)) {
+        block: suspend (SendChannel<E>) -> R
+    ) : AddLastDesc<SendSelect<E, R>>(queue, SendSelect(element, this@AbstractSendChannel, select, block)) {
         override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
             if (affected is ReceiveOrClosed<*>) {
                 return affected as? Closed<*> ?: ENQUEUE_FAILED
@@ -339,7 +339,14 @@
         }
     }
 
-    override fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R) {
+    final override val onSend: SelectClause2<E, SendChannel<E>>
+        get() = object : SelectClause2<E, SendChannel<E>> {
+            override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
+                registerSelectSend(select, param, block)
+            }
+        }
+
+    private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
         while (true) {
             if (select.isSelected) return
             if (isFull) {
@@ -357,7 +364,7 @@
                     offerResult === ALREADY_SELECTED -> return
                     offerResult === OFFER_FAILED -> {} // retry
                     offerResult === OFFER_SUCCESS -> {
-                        block.startCoroutineUndispatched(select.completion)
+                        block.startCoroutineUndispatched(receiver = this, completion = select.completion)
                         return
                     }
                     offerResult is Closed<*> -> throw offerResult.sendException
@@ -369,17 +376,18 @@
 
     // ------ private ------
 
-    private class SendSelect<R>(
+    private class SendSelect<E, R>(
         override val pollResult: Any?,
+        @JvmField val channel: SendChannel<E>,
         @JvmField val select: SelectInstance<R>,
-        @JvmField val block: suspend () -> R
+        @JvmField val block: suspend (SendChannel<E>) -> R
     ) : LockFreeLinkedListNode(), Send, DisposableHandle {
         override fun tryResumeSend(idempotent: Any?): Any? =
             if (select.trySelect(idempotent)) SELECT_STARTED else null
 
         override fun completeResumeSend(token: Any) {
             check(token === SELECT_STARTED)
-            block.startCoroutine(select.completion)
+            block.startCoroutine(receiver = channel, completion = select.completion)
         }
 
         fun disposeOnSelect() {
@@ -390,7 +398,7 @@
             remove()
         }
 
-        override fun toString(): String = "SendSelect($pollResult)[$select]"
+        override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
     }
 
     private class SendBuffered<out E>(
@@ -614,8 +622,15 @@
         }
     }
 
+    final override val onReceive: SelectClause1<E>
+        get() = object : SelectClause1<E> {
+            override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
+                registerSelectReceive(select, block)
+            }
+        }
+
     @Suppress("UNCHECKED_CAST")
-    override fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
+    private fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
         while (true) {
             if (select.isSelected) return
             if (isEmpty) {
@@ -641,8 +656,15 @@
         }
     }
 
+    final override val onReceiveOrNull: SelectClause1<E?>
+        get() = object : SelectClause1<E?> {
+            override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
+                registerSelectReceiveOrNull(select, block)
+            }
+        }
+
     @Suppress("UNCHECKED_CAST")
-    override fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
+    private fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
         while (true) {
             if (select.isSelected) return
             if (isEmpty) {
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
index 9d8c89d..b2de4a9 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
@@ -17,6 +17,7 @@
 package kotlinx.coroutines.experimental.channels
 
 import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.selects.SelectClause2
 import kotlinx.coroutines.experimental.selects.SelectInstance
 import kotlin.coroutines.experimental.CoroutineContext
 
@@ -105,7 +106,7 @@
     parentContext: CoroutineContext,
     channel: Channel<E>,
     private val block: suspend ActorScope<E>.() -> Unit
-) : ActorCoroutine<E>(parentContext, channel, active = false) {
+) : ActorCoroutine<E>(parentContext, channel, active = false), SelectClause2<E, SendChannel<E>> {
     override val channel: Channel<E> get() = this
 
     override fun onStart() {
@@ -122,9 +123,13 @@
         return super.offer(element)
     }
 
-    override fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R) {
+    override val onSend: SelectClause2<E, SendChannel<E>>
+        get() = this
+
+    // registerSelectSend
+    override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
         start()
-        return super.registerSelectSend(select, element, block)
+        super.onSend.registerSelectClause2(select, param, block)
     }
 }
 
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
index 2bddca3..a5f096e 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -21,8 +21,8 @@
 import kotlinx.coroutines.experimental.Job
 import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
 import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
-import kotlinx.coroutines.experimental.selects.SelectBuilder
-import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlinx.coroutines.experimental.selects.SelectClause1
+import kotlinx.coroutines.experimental.selects.SelectClause2
 import kotlinx.coroutines.experimental.selects.select
 import kotlinx.coroutines.experimental.yield
 
@@ -66,12 +66,23 @@
      * Note, that this function does not check for cancellation when it is not suspended.
      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
      *
-     * This function can be used in [select] invocation with [onSend][SelectBuilder.onSend] clause.
+     * This function can be used in [select] invocation with [onSend] clause.
      * Use [offer] to try sending to this channel without waiting.
      */
     public suspend fun send(element: E)
 
     /**
+     * Clause for [select] expression of [send] suspending function that selects when the element that is specified
+     * as parameter is sent to the channel. When the clause is selected the reference to this channel
+     * is passed into the corresponding block.
+     *
+     * The [select] invocation fails with [ClosedSendChannelException] if the channel
+     * [isClosedForSend][SendChannel.isClosedForSend] _normally_ or with the original
+     * [close][SendChannel.close] cause exception if the channel has _failed_.
+     */
+    public val onSend: SelectClause2<E, SendChannel<E>>
+
+    /**
      * Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
      * and returns `true`. Otherwise, it returns `false` immediately
      * or throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
@@ -80,12 +91,6 @@
     public fun offer(element: E): Boolean
 
     /**
-     * Registers [onSend][SelectBuilder.onSend] select clause.
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    public fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R)
-
-    /**
      * Closes this channel with an optional exceptional [cause].
      * This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
      * Conceptually, its sends a special "close token" over this channel. Immediately after invocation of this function
@@ -137,12 +142,21 @@
      * Note, that this function does not check for cancellation when it is not suspended.
      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
      *
-     * This function can be used in [select] invocation with [onReceive][SelectBuilder.onReceive] clause.
+     * This function can be used in [select] invocation with [onReceive] clause.
      * Use [poll] to try receiving from this channel without waiting.
      */
     public suspend fun receive(): E
 
     /**
+     * Clause for [select] expression of [receive] suspending function that selects with the element that
+     * is received from the channel.
+     * The [select] invocation fails with [ClosedReceiveChannelException] if the channel
+     * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_ or with the original
+     * [close][SendChannel.close] cause exception if the channel has _failed_.
+     */
+    public val onReceive: SelectClause1<E>
+
+    /**
      * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
      * or returns `null` if the channel is [closed][isClosedForReceive] _normally_,
      * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
@@ -159,12 +173,20 @@
      * Note, that this function does not check for cancellation when it is not suspended.
      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
      *
-     * This function can be used in [select] invocation with [onReceiveOrNull][SelectBuilder.onReceiveOrNull] clause.
+     * This function can be used in [select] invocation with [onReceiveOrNull] clause.
      * Use [poll] to try receiving from this channel without waiting.
      */
     public suspend fun receiveOrNull(): E?
 
     /**
+     * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
+     * is received from the channel or selects with `null` if if the channel
+     * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_. The [select] invocation fails with
+     * the original [close][SendChannel.close] cause exception if the channel has _failed_.
+     */
+    public val onReceiveOrNull: SelectClause1<E?>
+
+    /**
      * Retrieves and removes the head of this queue, or returns `null` if this queue [isEmpty]
      * or is [closed][isClosedForReceive] _normally_,
      * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
@@ -177,18 +199,6 @@
      * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
      */
     public operator fun iterator(): ChannelIterator<E>
-
-    /**
-     * Registers [onReceive][SelectBuilder.onReceive] select clause.
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    public fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R)
-
-    /**
-     * Registers [onReceiveOrNull][SelectBuilder.onReceiveOrNull] select clause.
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    public fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R)
 }
 
 /**
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
index 1132bec..a2ee21c 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
@@ -20,6 +20,7 @@
 import kotlinx.atomicfu.loop
 import kotlinx.coroutines.experimental.internal.Symbol
 import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
+import kotlinx.coroutines.experimental.selects.SelectClause2
 import kotlinx.coroutines.experimental.selects.SelectInstance
 
 /**
@@ -233,13 +234,20 @@
         }
     }
 
-    override fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R) {
+    override val onSend: SelectClause2<E, SendChannel<E>>
+        get() = object : SelectClause2<E, SendChannel<E>> {
+            override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
+                registerSelectSend(select, param, block)
+            }
+        }
+
+    private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
         if (!select.trySelect(null)) return
         offerInternal(element)?.let {
             select.resumeSelectCancellableWithException(it.sendException)
             return
         }
-        block.startCoroutineUndispatched(select.completion)
+        block.startCoroutineUndispatched(receiver = this, completion = select.completion)
     }
 
     private class Subscriber<E>(
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
index 1bfff3c..0f0c55f 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
@@ -19,8 +19,6 @@
 import kotlinx.atomicfu.atomic
 import kotlinx.atomicfu.loop
 import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.channels.ClosedReceiveChannelException
-import kotlinx.coroutines.experimental.channels.ClosedSendChannelException
 import kotlinx.coroutines.experimental.channels.ReceiveChannel
 import kotlinx.coroutines.experimental.channels.SendChannel
 import kotlinx.coroutines.experimental.internal.*
@@ -37,49 +35,25 @@
  */
 public interface SelectBuilder<in R> {
     /**
-     * Clause for [Job.join] suspending function that selects the given [block] when the job is complete.
-     * This clause never fails, even if the job completes exceptionally.
+     * Registers clause in this [select] expression without additional parameters that does not select any value.
      */
-    public fun Job.onJoin(block: suspend () -> R)
+    public operator fun SelectClause0.invoke(block: suspend () -> R)
 
     /**
-     * Clause for [Deferred.await] suspending function that selects the given [block] with the deferred value is
-     * resolved. The [select] invocation fails if the deferred value completes exceptionally (either fails or
-     * it cancelled).
+     * Registers clause in this [select] expression without additional parameters that selects value of type [Q].
      */
-    public fun <T> Deferred<T>.onAwait(block: suspend (T) -> R)
+    public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
 
     /**
-     * Clause for [SendChannel.send] suspending function that selects the given [block] when the [element] is sent to
-     * the channel. The [select] invocation fails with [ClosedSendChannelException] if the channel
-     * [isClosedForSend][SendChannel.isClosedForSend] _normally_ or with the original
-     * [close][SendChannel.close] cause exception if the channel has _failed_.
+     * Registers clause in this [select] expression with additional parameter of type [P] that selects value of type [Q].
      */
-    public fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R)
+    public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
 
     /**
-     * Clause for [ReceiveChannel.receive] suspending function that selects the given [block] with the element that
-     * is received from the channel. The [select] invocation fails with [ClosedReceiveChannelException] if the channel
-     * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_ or with the original
-     * [close][SendChannel.close] cause exception if the channel has _failed_.
+     * Registers clause in this [select] expression with additional parameter nullable parameter of type [P]
+     * with the `null` value for this parameter that selects value of type [Q].
      */
-    public fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R)
-
-    /**
-     * Clause for [ReceiveChannel.receiveOrNull] suspending function that selects the given [block] with the element that
-     * is received from the channel or selects the given [block] with `null` if if the channel
-     * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_. The [select] invocation fails with
-     * the original [close][SendChannel.close] cause exception if the channel has _failed_.
-     */
-    public fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R)
-
-    /**
-     * Clause for [Mutex.lock] suspending function that selects the given [block] when the mutex is locked.
-     *
-     * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
-     *        is already locked with the same token (same identity), this clause throws [IllegalStateException].
-     */
-    public fun Mutex.onLock(owner: Any? = null, block: suspend () -> R)
+    public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R) = invoke(null, block)
 
     /**
      * Clause that selects the given [block] after a specified timeout passes.
@@ -88,6 +62,63 @@
      * @param unit timeout unit (milliseconds by default)
      */
     public fun onTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> R)
+
+    /** @suppress **Deprecated: for binary compatibility only **/
+    @Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+    public fun Job.onJoin(block: suspend () -> R) { onJoin(block) }
+
+    /** @suppress **Deprecated: for binary compatibility only **/
+    @Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+    public fun <T> Deferred<T>.onAwait(block: suspend (T) -> R) { onAwait(block) }
+
+    /** @suppress **Deprecated: for binary compatibility only **/
+    @Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+    public fun Mutex.onLock(owner: Any? = null, block: suspend () -> R) { onLock { block() } }
+
+    /** @suppress **Deprecated: for binary compatibility only **/
+    @Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+    public fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R) { onSend(element) { block() } }
+
+    /** @suppress **Deprecated: for binary compatibility only **/
+    @Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+    public fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R) { onReceive(block) }
+
+    /** @suppress **Deprecated: for binary compatibility only **/
+    @Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+    public fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R) { onReceiveOrNull(block) }
+}
+
+/**
+ * Clause for [select] expression without additional parameters that does not select any value.
+ */
+public interface SelectClause0 {
+    /**
+     * Registers this clause with the specified [select] instance and [block] of code.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    public fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R)
+}
+
+/**
+ * Clause for [select] expression without additional parameters that selects value of type [Q].
+ */
+public interface SelectClause1<out Q> {
+    /**
+     * Registers this clause with the specified [select] instance and [block] of code.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    public fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (Q) -> R)
+}
+
+/**
+ * Clause for [select] expression with additional parameter of type [P] that selects value of type [Q].
+ */
+public interface SelectClause2<in P, out Q> {
+    /**
+     * Registers this clause with the specified [select] instance and [block] of code.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    public fun <R> registerSelectClause2(select: SelectInstance<R>, param: P, block: suspend (Q) -> R)
 }
 
 /**
@@ -120,15 +151,18 @@
     /**
      * Returns completion continuation of this select instance.
      * This select instance must be _selected_ first.
-     * All resumption through this instance happen _directly_ (as if `mode` is [MODE_DIRECT]).
+     * All resumption through this instance happen _directly_ without going through dispatcher ([MODE_DIRECT]).
      */
     public val completion: Continuation<R>
 
     /**
-     * Resumes this instance with [MODE_CANCELLABLE].
+     * Resumes this instance in a cancellable way ([MODE_CANCELLABLE]).
      */
     public fun resumeSelectCancellableWithException(exception: Throwable)
 
+    /**
+     * Disposes the specified handle when this instance is selected.
+     */
     public fun disposeOnSelect(handle: DisposableHandle)
 }
 
@@ -393,28 +427,16 @@
         }
     }
 
-    override fun Job.onJoin(block: suspend () -> R) {
-        registerSelectJoin(this@SelectBuilderImpl, block)
+    override fun SelectClause0.invoke(block: suspend () -> R) {
+        registerSelectClause0(this@SelectBuilderImpl, block)
     }
 
-    override fun <T> Deferred<T>.onAwait(block: suspend (T) -> R) {
-        registerSelectAwait(this@SelectBuilderImpl, block)
+    override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
+        registerSelectClause1(this@SelectBuilderImpl, block)
     }
 
-    override fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R) {
-        registerSelectSend(this@SelectBuilderImpl, element, block)
-    }
-
-    override fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R) {
-        registerSelectReceive(this@SelectBuilderImpl, block)
-    }
-
-    override fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R) {
-        registerSelectReceiveOrNull(this@SelectBuilderImpl, block)
-    }
-
-    override fun Mutex.onLock(owner: Any?, block: suspend () -> R) {
-        registerSelectLock(this@SelectBuilderImpl, owner, block)
+    override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) {
+        registerSelectClause2(this@SelectBuilderImpl, param, block)
     }
 
     override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt
index b6b84bc..cf79b47 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt
@@ -16,11 +16,6 @@
 
 package kotlinx.coroutines.experimental.selects
 
-import kotlinx.coroutines.experimental.Deferred
-import kotlinx.coroutines.experimental.Job
-import kotlinx.coroutines.experimental.channels.ReceiveChannel
-import kotlinx.coroutines.experimental.channels.SendChannel
-import kotlinx.coroutines.experimental.sync.Mutex
 import java.util.*
 import java.util.concurrent.TimeUnit
 import kotlin.coroutines.experimental.Continuation
@@ -69,28 +64,16 @@
         return instance.getResult()
     }
 
-    override fun Job.onJoin(block: suspend () -> R) {
-        clauses += { registerSelectJoin(instance, block) }
+    override fun SelectClause0.invoke(block: suspend () -> R) {
+        clauses += { registerSelectClause0(instance, block) }
     }
 
-    override fun <T> Deferred<T>.onAwait(block: suspend (T) -> R) {
-        clauses += { registerSelectAwait(instance, block) }
+    override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
+        clauses += { registerSelectClause1(instance, block) }
     }
 
-    override fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R) {
-        clauses += { registerSelectSend(instance, element, block) }
-    }
-
-    override fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R) {
-        clauses += { registerSelectReceive(instance, block) }
-    }
-
-    override fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R) {
-        clauses += { registerSelectReceiveOrNull(instance, block) }
-    }
-
-    override fun Mutex.onLock(owner: Any?, block: suspend () -> R) {
-        clauses += { registerSelectLock(instance, owner, block) }
+    override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) {
+        clauses += { registerSelectClause2(instance, param, block) }
     }
 
     override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt
index 9028ba3..3130085 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt
@@ -22,7 +22,7 @@
 import kotlinx.coroutines.experimental.internal.*
 import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
 import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
-import kotlinx.coroutines.experimental.selects.SelectBuilder
+import kotlinx.coroutines.experimental.selects.SelectClause2
 import kotlinx.coroutines.experimental.selects.SelectInstance
 import kotlinx.coroutines.experimental.selects.select
 import kotlin.coroutines.experimental.startCoroutine
@@ -76,7 +76,7 @@
      * Note, that this function does not check for cancellation when it is not suspended.
      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
      *
-     * This function can be used in [select] invocation with [onLock][SelectBuilder.onLock] clause.
+     * This function can be used in [select] invocation with [onLock] clause.
      * Use [tryLock] to try acquire lock without waiting.
      *
      * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
@@ -85,10 +85,11 @@
     public suspend fun lock(owner: Any? = null)
 
     /**
-     * Registers [onLock][SelectBuilder.onLock] select clause.
-     * @suppress **This is unstable API and it is subject to change.**
+     * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
+     * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
+     * the reference to this mutex is passed into the corresponding block.
      */
-    public fun <R> registerSelectLock(select: SelectInstance<R>, owner: Any?, block: suspend () -> R)
+    public val onLock: SelectClause2<Any?, Mutex>
 
     /**
      * Checks mutex locked by owner
@@ -169,7 +170,7 @@
     override fun toString(): String = "Empty[$locked]"
 }
 
-internal class MutexImpl(locked: Boolean) : Mutex {
+internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
     // State is: Empty | LockedQueue | OpDescriptor
     // shared objects while we have no waiters
     private val _state = atomic<Any?>(if (locked) EmptyLocked else EmptyUnlocked)
@@ -251,7 +252,12 @@
         }
     }
 
-    override fun <R> registerSelectLock(select: SelectInstance<R>, owner: Any?, block: suspend () -> R) {
+    override val onLock: SelectClause2<Any?, Mutex>
+        get() = this
+
+    // registerSelectLock
+    @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
+    override fun <R> registerSelectClause2(select: SelectInstance<R>, owner: Any?, block: suspend (Mutex) -> R) {
         while (true) { // lock-free loop on state
             if (select.isSelected) return
             val state = _state.value
@@ -264,7 +270,7 @@
                         val failure = select.performAtomicTrySelect(TryLockDesc(this, owner))
                         when {
                             failure == null -> { // success
-                                block.startCoroutineUndispatched(select.completion)
+                                block.startCoroutineUndispatched(receiver = this, completion = select.completion)
                                 return
                             }
                             failure === ALREADY_SELECTED -> return // already selected -- bail out
@@ -325,8 +331,8 @@
         owner: Any?,
         queue: LockedQueue,
         select: SelectInstance<R>,
-        block: suspend () -> R
-    ) : AddLastDesc<LockSelect<R>>(queue, LockSelect(owner, select, block)) {
+        block: suspend (Mutex) -> R
+    ) : AddLastDesc<LockSelect<R>>(queue, LockSelect(owner, mutex, select, block)) {
         override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
             if (mutex._state.value !== queue) return ENQUEUE_FAIL
             return super.onPrepare(affected, next)
@@ -455,15 +461,16 @@
 
     private class LockSelect<R>(
         owner: Any?,
+        @JvmField val mutex: Mutex,
         @JvmField val select: SelectInstance<R>,
-        @JvmField val block: suspend () -> R
+        @JvmField val block: suspend (Mutex) -> R
     ) : LockWaiter(owner) {
         override fun tryResumeLockWaiter(): Any? = if (select.trySelect(null)) SELECT_SUCCESS else null
         override fun completeResumeLockWaiter(token: Any) {
             check(token === SELECT_SUCCESS)
-            block.startCoroutine(select.completion)
+            block.startCoroutine(receiver = mutex, completion = select.completion)
         }
-        override fun toString(): String = "LockSelect[$owner, $select]"
+        override fun toString(): String = "LockSelect[$owner, $mutex, $select]"
     }
 
     // atomic unlock operation that checks that waiters queue is empty
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
index f940c1b..f0b55c2 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
@@ -16,7 +16,7 @@
 
 package kotlinx.coroutines.experimental.channels
 
-import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlinx.coroutines.experimental.selects.SelectClause1
 
 enum class TestChannelKind {
     RENDEZVOUS {
@@ -70,8 +70,8 @@
     suspend override fun receiveOrNull(): E? = sub.receiveOrNull()
     override fun poll(): E? = sub.poll()
     override fun iterator(): ChannelIterator<E> = sub.iterator()
-    override fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) =
-        sub.registerSelectReceive(select, block)
-    override fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) =
-        sub.registerSelectReceiveOrNull(select, block)
+    override val onReceive: SelectClause1<E>
+        get() = sub.onReceive
+    override val onReceiveOrNull: SelectClause1<E?>
+        get() = sub.onReceiveOrNull
 }
diff --git a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt
index 11dc70e..224357c 100644
--- a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt
@@ -24,6 +24,7 @@
 import kotlinx.coroutines.experimental.channels.SendChannel
 import kotlinx.coroutines.experimental.handleCoroutineException
 import kotlinx.coroutines.experimental.newCoroutineContext
+import kotlinx.coroutines.experimental.selects.SelectClause2
 import kotlinx.coroutines.experimental.selects.SelectInstance
 import kotlinx.coroutines.experimental.sync.Mutex
 import org.reactivestreams.Publisher
@@ -49,7 +50,7 @@
 public fun <T> publish(
     context: CoroutineContext,
     block: suspend ProducerScope<T>.() -> Unit
-): Publisher<T> = Publisher<T> { subscriber ->
+): Publisher<T> = Publisher { subscriber ->
     val newContext = newCoroutineContext(context)
     val coroutine = PublisherCoroutine(newContext, subscriber)
     coroutine.initParentJob(context[Job])
@@ -61,10 +62,10 @@
 private const val CLOSED = -1L    // closed, but have not signalled onCompleted/onError yet
 private const val SIGNALLED = -2L  // already signalled subscriber onCompleted/onError
 
-private class PublisherCoroutine<T>(
+private class PublisherCoroutine<in T>(
     parentContext: CoroutineContext,
     private val subscriber: Subscriber<T>
-) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription {
+) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
     override val channel: SendChannel<T> get() = this
 
     // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
@@ -85,7 +86,7 @@
         return true
     }
 
-    public suspend override fun send(element: T): Unit {
+    public suspend override fun send(element: T) {
         // fast-path -- try send without suspension
         if (offer(element)) return
         // slow-path does suspend
@@ -97,11 +98,17 @@
         doLockedNext(element)
     }
 
-    override fun <R> registerSelectSend(select: SelectInstance<R>, element: T, block: suspend () -> R) =
-        mutex.registerSelectLock(select, null) {
+    override val onSend: SelectClause2<T, SendChannel<T>>
+        get() = this
+
+    // registerSelectSend
+    @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
+    override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+        mutex.onLock.registerSelectClause2(select, null) {
             doLockedNext(element)
-            block()
+            block(this)
         }
+    }
 
     // assert: mutex.isLocked()
     private fun doLockedNext(elem: T) {
diff --git a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt
index f7c4b5e..5688d2d 100644
--- a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt
@@ -24,6 +24,7 @@
 import kotlinx.coroutines.experimental.channels.SendChannel
 import kotlinx.coroutines.experimental.handleCoroutineException
 import kotlinx.coroutines.experimental.newCoroutineContext
+import kotlinx.coroutines.experimental.selects.SelectClause2
 import kotlinx.coroutines.experimental.selects.SelectInstance
 import kotlinx.coroutines.experimental.sync.Mutex
 import rx.Observable
@@ -63,10 +64,10 @@
 private const val CLOSED = -1L    // closed, but have not signalled onCompleted/onError yet
 private const val SIGNALLED = -2L  // already signalled subscriber onCompleted/onError
 
-private class RxObservableCoroutine<T>(
+private class RxObservableCoroutine<in T>(
     parentContext: CoroutineContext,
     private val subscriber: Subscriber<T>
-) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Producer, Subscription {
+) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Producer, Subscription, SelectClause2<T, SendChannel<T>> {
     override val channel: SendChannel<T> get() = this
 
     // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
@@ -87,7 +88,7 @@
         return true
     }
 
-    public suspend override fun send(element: T): Unit {
+    public suspend override fun send(element: T) {
         // fast-path -- try send without suspension
         if (offer(element)) return
         // slow-path does suspend
@@ -99,11 +100,17 @@
         doLockedNext(element)
     }
 
-    override fun <R> registerSelectSend(select: SelectInstance<R>, element: T, block: suspend () -> R) =
-        mutex.registerSelectLock(select, null) {
+    override val onSend: SelectClause2<T, SendChannel<T>>
+        get() = this
+
+    // registerSelectSend
+    @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
+    override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+        mutex.onLock.registerSelectClause2(select, null) {
             doLockedNext(element)
-            block()
+            block(this)
         }
+    }
 
     // assert: mutex.isLocked()
     private fun doLockedNext(elem: T) {
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt
index 5409e44..2e81e23 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt
@@ -27,6 +27,7 @@
 import kotlinx.coroutines.experimental.channels.SendChannel
 import kotlinx.coroutines.experimental.handleCoroutineException
 import kotlinx.coroutines.experimental.newCoroutineContext
+import kotlinx.coroutines.experimental.selects.SelectClause2
 import kotlinx.coroutines.experimental.selects.SelectInstance
 import kotlinx.coroutines.experimental.sync.Mutex
 import kotlin.coroutines.experimental.CoroutineContext
@@ -65,7 +66,7 @@
 private class RxObservableCoroutine<T>(
     parentContext: CoroutineContext,
     private val subscriber: ObservableEmitter<T>
-) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Cancellable {
+) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Cancellable, SelectClause2<T, SendChannel<T>> {
     override val channel: SendChannel<T> get() = this
 
     // Mutex is locked when while subscriber.onXXX is being invoked
@@ -98,11 +99,17 @@
         doLockedNext(element)
     }
 
-    override fun <R> registerSelectSend(select: SelectInstance<R>, element: T, block: suspend () -> R) =
-        mutex.registerSelectLock(select, null) {
+    override val onSend: SelectClause2<T, SendChannel<T>>
+        get() = this
+
+    // registerSelectSend
+    @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
+    override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+        mutex.onLock.registerSelectClause2(select, null) {
             doLockedNext(element)
-            block()
+            block(this)
         }
+    }
 
     // assert: mutex.isLocked()
     private fun doLockedNext(elem: T) {